summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2018-02-05 17:35:44 -0800
committerDana Powers <dana.powers@gmail.com>2018-02-05 17:35:44 -0800
commitaa838e15ff33b4f3546c0f68d9ddb8638fd637ee (patch)
treef63ee51d9362cd42d756378b442018bf461de742
parent441aeb864519d2f574650e24a327423308adca03 (diff)
downloadkafka-python-protocol_1_0_0.tar.gz
Add Request/Response structs for kafka broker 1.0.0protocol_1_0_0
-rw-r--r--kafka/conn.py1
-rw-r--r--kafka/protocol/admin.py41
-rw-r--r--kafka/protocol/fetch.py28
-rw-r--r--kafka/protocol/group.py2
-rw-r--r--kafka/protocol/metadata.py43
-rw-r--r--kafka/protocol/produce.py88
6 files changed, 167 insertions, 36 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 5ff27d5..4fe5e21 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -818,6 +818,7 @@ class BrokerConnection(object):
# in reverse order. As soon as we find one that works, return it
test_cases = [
# format (<broker version>, <needed struct>)
+ ((1, 0, 0), MetadataRequest[5]),
((0, 11, 0), MetadataRequest[4]),
((0, 10, 2), OffsetFetchRequest[2]),
((0, 10, 1), MetadataRequest[2]),
diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py
index 09746bf..b787c5f 100644
--- a/kafka/protocol/admin.py
+++ b/kafka/protocol/admin.py
@@ -286,6 +286,12 @@ class SaslHandShakeResponse_v0(Response):
)
+class SaslHandShakeResponse_v1(Response):
+ API_KEY = 17
+ API_VERSION = 1
+ SCHEMA = SaslHandShakeResponse_v0.SCHEMA
+
+
class SaslHandShakeRequest_v0(Request):
API_KEY = 17
API_VERSION = 0
@@ -294,5 +300,36 @@ class SaslHandShakeRequest_v0(Request):
('mechanism', String('utf-8'))
)
-SaslHandShakeRequest = [SaslHandShakeRequest_v0]
-SaslHandShakeResponse = [SaslHandShakeResponse_v0]
+
+class SaslHandShakeRequest_v1(Request):
+ API_KEY = 17
+ API_VERSION = 1
+ RESPONSE_TYPE = SaslHandShakeResponse_v1
+ SCHEMA = SaslHandShakeRequest_v0.SCHEMA
+
+
+SaslHandShakeRequest = [SaslHandShakeRequest_v0, SaslHandShakeRequest_v1]
+SaslHandShakeResponse = [SaslHandShakeResponse_v0, SaslHandShakeResponse_v1]
+
+
+class SaslAuthenticateResponse_v0(Request):
+ API_KEY = 36
+ API_VERSION = 0
+ SCHEMA = Schema(
+ ('error_code', Int16),
+ ('error_message', String('utf-8')),
+ ('sasl_auth_bytes', Bytes)
+ )
+
+
+class SaslAuthenticateRequest_v0(Request):
+ API_KEY = 36
+ API_VERSION = 0
+ RESPONSE_TYPE = SaslAuthenticateResponse_v0
+ SCHEMA = Schema(
+ ('sasl_auth_bytes', Bytes)
+ )
+
+
+SaslAuthenticateRequest = [SaslAuthenticateRequest_v0]
+SaslAuthenticateResponse = [SaslAuthenticateResponse_v0]
diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py
index 0b03845..5fc17e0 100644
--- a/kafka/protocol/fetch.py
+++ b/kafka/protocol/fetch.py
@@ -84,6 +84,16 @@ class FetchResponse_v5(Response):
)
+class FetchResponse_v6(Response):
+ """
+ Same as FetchResponse_v5. The version number is bumped up to indicate that the client supports KafkaStorageException.
+ The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 5
+ """
+ API_KEY = 1
+ API_VERSION = 6
+ SCHEMA = FetchResponse_v5.SCHEMA
+
+
class FetchRequest_v0(Request):
API_KEY = 1
API_VERSION = 0
@@ -174,11 +184,25 @@ class FetchRequest_v5(Request):
)
+class FetchRequest_v6(Request):
+ """
+ The body of FETCH_REQUEST_V6 is the same as FETCH_REQUEST_V5.
+ The version number is bumped up to indicate that the client supports KafkaStorageException.
+ The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 5
+ """
+ API_KEY = 1
+ API_VERSION = 6
+ RESPONSE_TYPE = FetchResponse_v6
+ SCHEMA = FetchRequest_v5.SCHEMA
+
+
FetchRequest = [
FetchRequest_v0, FetchRequest_v1, FetchRequest_v2,
- FetchRequest_v3, FetchRequest_v4, FetchRequest_v5
+ FetchRequest_v3, FetchRequest_v4, FetchRequest_v5,
+ FetchRequest_v6
]
FetchResponse = [
FetchResponse_v0, FetchResponse_v1, FetchResponse_v2,
- FetchResponse_v3, FetchResponse_v4, FetchResponse_v5
+ FetchResponse_v3, FetchResponse_v4, FetchResponse_v5,
+ FetchResponse_v6
]
diff --git a/kafka/protocol/group.py b/kafka/protocol/group.py
index c6acca8..db84427 100644
--- a/kafka/protocol/group.py
+++ b/kafka/protocol/group.py
@@ -87,7 +87,7 @@ JoinGroupRequest = [
JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2
]
JoinGroupResponse = [
- JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v1
+ JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v2
]
diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py
index 2be8209..2aafdd3 100644
--- a/kafka/protocol/metadata.py
+++ b/kafka/protocol/metadata.py
@@ -102,6 +102,32 @@ class MetadataResponse_v4(Response):
SCHEMA = MetadataResponse_v3.SCHEMA
+class MetadataResponse_v5(Response):
+ API_KEY = 3
+ API_VERSION = 5
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('brokers', Array(
+ ('node_id', Int32),
+ ('host', String('utf-8')),
+ ('port', Int32),
+ ('rack', String('utf-8')))),
+ ('cluster_id', String('utf-8')),
+ ('controller_id', Int32),
+ ('topics', Array(
+ ('error_code', Int16),
+ ('topic', String('utf-8')),
+ ('is_internal', Boolean),
+ ('partitions', Array(
+ ('error_code', Int16),
+ ('partition', Int32),
+ ('leader', Int32),
+ ('replicas', Array(Int32)),
+ ('isr', Array(Int32)),
+ ('offline_replicas', Array(Int32))))))
+ )
+
+
class MetadataRequest_v0(Request):
API_KEY = 3
API_VERSION = 0
@@ -151,11 +177,24 @@ class MetadataRequest_v4(Request):
NO_TOPICS = None # Empty array (len 0) for topics returns no topics
+class MetadataRequest_v5(Request):
+ """
+ The v5 metadata request is the same as v4.
+ An additional field for offline_replicas has been added to the v5 metadata response
+ """
+ API_KEY = 3
+ API_VERSION = 5
+ RESPONSE_TYPE = MetadataResponse_v5
+ SCHEMA = MetadataRequest_v4.SCHEMA
+ ALL_TOPICS = -1 # Null Array (len -1) for topics returns all topics
+ NO_TOPICS = None # Empty array (len 0) for topics returns no topics
+
+
MetadataRequest = [
MetadataRequest_v0, MetadataRequest_v1, MetadataRequest_v2,
- MetadataRequest_v3, MetadataRequest_v4
+ MetadataRequest_v3, MetadataRequest_v4, MetadataRequest_v5
]
MetadataResponse = [
MetadataResponse_v0, MetadataResponse_v1, MetadataResponse_v2,
- MetadataResponse_v3, MetadataResponse_v4
+ MetadataResponse_v3, MetadataResponse_v4, MetadataResponse_v5
]
diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py
index 34ff949..5fbddec 100644
--- a/kafka/protocol/produce.py
+++ b/kafka/protocol/produce.py
@@ -52,52 +52,67 @@ class ProduceResponse_v3(Response):
SCHEMA = ProduceResponse_v2.SCHEMA
-class ProduceRequest_v0(Request):
+class ProduceResponse_v4(Response):
+ """
+ The version number is bumped up to indicate that the client supports KafkaStorageException.
+ The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 3
+ """
API_KEY = 0
- API_VERSION = 0
- RESPONSE_TYPE = ProduceResponse_v0
+ API_VERSION = 4
+ SCHEMA = ProduceResponse_v3.SCHEMA
+
+
+class ProduceResponse_v5(Response):
+ API_KEY = 0
+ API_VERSION = 5
SCHEMA = Schema(
- ('required_acks', Int16),
- ('timeout', Int32),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
- ('messages', Bytes)))))
+ ('error_code', Int16),
+ ('offset', Int64),
+ ('timestamp', Int64),
+ ('log_start_offset', Int64))))),
+ ('throttle_time_ms', Int32)
)
+
+class ProduceRequest(Request):
+ API_KEY = 0
+
def expect_response(self):
if self.required_acks == 0: # pylint: disable=no-member
return False
return True
-class ProduceRequest_v1(Request):
- API_KEY = 0
+class ProduceRequest_v0(ProduceRequest):
+ API_VERSION = 0
+ RESPONSE_TYPE = ProduceResponse_v0
+ SCHEMA = Schema(
+ ('required_acks', Int16),
+ ('timeout', Int32),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('messages', Bytes)))))
+ )
+
+
+class ProduceRequest_v1(ProduceRequest):
API_VERSION = 1
RESPONSE_TYPE = ProduceResponse_v1
SCHEMA = ProduceRequest_v0.SCHEMA
- def expect_response(self):
- if self.required_acks == 0: # pylint: disable=no-member
- return False
- return True
-
-
-class ProduceRequest_v2(Request):
- API_KEY = 0
+class ProduceRequest_v2(ProduceRequest):
API_VERSION = 2
RESPONSE_TYPE = ProduceResponse_v2
SCHEMA = ProduceRequest_v1.SCHEMA
- def expect_response(self):
- if self.required_acks == 0: # pylint: disable=no-member
- return False
- return True
-
-class ProduceRequest_v3(Request):
- API_KEY = 0
+class ProduceRequest_v3(ProduceRequest):
API_VERSION = 3
RESPONSE_TYPE = ProduceResponse_v3
SCHEMA = Schema(
@@ -111,17 +126,32 @@ class ProduceRequest_v3(Request):
('messages', Bytes)))))
)
- def expect_response(self):
- if self.required_acks == 0: # pylint: disable=no-member
- return False
- return True
+
+class ProduceRequest_v4(ProduceRequest):
+ """
+ The version number is bumped up to indicate that the client supports KafkaStorageException.
+ The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 3
+ """
+ API_VERSION = 4
+ RESPONSE_TYPE = ProduceResponse_v4
+ SCHEMA = ProduceRequest_v3.SCHEMA
+
+
+class ProduceRequest_v5(ProduceRequest):
+ """
+ Same as v4. The version number is bumped since the v5 response includes an additional
+ partition level field: the log_start_offset.
+ """
+ API_VERSION = 5
+ RESPONSE_TYPE = ProduceResponse_v5
+ SCHEMA = ProduceRequest_v4.SCHEMA
ProduceRequest = [
ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2,
- ProduceRequest_v3
+ ProduceRequest_v3, ProduceRequest_v4, ProduceRequest_v5
]
ProduceResponse = [
ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2,
- ProduceResponse_v2
+ ProduceResponse_v3, ProduceResponse_v4, ProduceResponse_v5
]