summaryrefslogtreecommitdiff
path: root/kafka/structs.py
blob: 62f36dd4cc5a2d7985b0f1dabd8da382402cd139 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from __future__ import absolute_import

from collections import namedtuple


#  SimpleClient Payload Structs - Deprecated

# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
MetadataRequest = namedtuple("MetadataRequest",
    ["topics"])

MetadataResponse = namedtuple("MetadataResponse",
    ["brokers", "topics"])

# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ConsumerMetadataRequest
ConsumerMetadataRequest = namedtuple("ConsumerMetadataRequest",
    ["groups"])

ConsumerMetadataResponse = namedtuple("ConsumerMetadataResponse",
    ["error", "nodeId", "host", "port"])

# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI
ProduceRequestPayload = namedtuple("ProduceRequestPayload",
    ["topic", "partition", "messages"])

ProduceResponsePayload = namedtuple("ProduceResponsePayload",
    ["topic", "partition", "error", "offset"])

# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI
FetchRequestPayload = namedtuple("FetchRequestPayload",
    ["topic", "partition", "offset", "max_bytes"])

FetchResponsePayload = namedtuple("FetchResponsePayload",
    ["topic", "partition", "error", "highwaterMark", "messages"])

# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
OffsetRequestPayload = namedtuple("OffsetRequestPayload",
    ["topic", "partition", "time", "max_offsets"])

ListOffsetRequestPayload = namedtuple("ListOffsetRequestPayload",
    ["topic", "partition", "time"])

OffsetResponsePayload = namedtuple("OffsetResponsePayload",
    ["topic", "partition", "error", "offsets"])

ListOffsetResponsePayload = namedtuple("ListOffsetResponsePayload",
    ["topic", "partition", "error", "timestamp", "offset"])

# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
OffsetCommitRequestPayload = namedtuple("OffsetCommitRequestPayload",
    ["topic", "partition", "offset", "metadata"])

OffsetCommitResponsePayload = namedtuple("OffsetCommitResponsePayload",
    ["topic", "partition", "error"])

OffsetFetchRequestPayload = namedtuple("OffsetFetchRequestPayload",
    ["topic", "partition"])

OffsetFetchResponsePayload = namedtuple("OffsetFetchResponsePayload",
    ["topic", "partition", "offset", "metadata", "error"])



# Other useful structs
TopicPartition = namedtuple("TopicPartition",
    ["topic", "partition"])

BrokerMetadata = namedtuple("BrokerMetadata",
    ["nodeId", "host", "port", "rack"])

PartitionMetadata = namedtuple("PartitionMetadata",
    ["topic", "partition", "leader", "replicas", "isr", "error"])

OffsetAndMetadata = namedtuple("OffsetAndMetadata",
    ["offset", "metadata"])

OffsetAndTimestamp = namedtuple("OffsetAndTimestamp",
    ["offset", "timestamp"])


# Deprecated structs
OffsetAndMessage = namedtuple("OffsetAndMessage",
    ["offset", "message"])

Message = namedtuple("Message",
    ["magic", "attributes", "key", "value"])

KafkaMessage = namedtuple("KafkaMessage",
    ["topic", "partition", "offset", "key", "value"])


# Define retry policy for async producer
# Limit value: int >= 0, 0 means no retries
RetryOptions = namedtuple("RetryOptions",
    ["limit", "backoff_ms", "retry_on_timeouts"])


# Support legacy imports from kafka.common
from kafka.errors import *