diff options
author | Dana Powers <dana.powers@gmail.com> | 2018-02-05 17:35:44 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2018-02-05 17:35:44 -0800 |
commit | aa838e15ff33b4f3546c0f68d9ddb8638fd637ee (patch) | |
tree | f63ee51d9362cd42d756378b442018bf461de742 | |
parent | 441aeb864519d2f574650e24a327423308adca03 (diff) | |
download | kafka-python-protocol_1_0_0.tar.gz |
Add Request/Response structs for kafka broker 1.0.0protocol_1_0_0
-rw-r--r-- | kafka/conn.py | 1 | ||||
-rw-r--r-- | kafka/protocol/admin.py | 41 | ||||
-rw-r--r-- | kafka/protocol/fetch.py | 28 | ||||
-rw-r--r-- | kafka/protocol/group.py | 2 | ||||
-rw-r--r-- | kafka/protocol/metadata.py | 43 | ||||
-rw-r--r-- | kafka/protocol/produce.py | 88 |
6 files changed, 167 insertions, 36 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 5ff27d5..4fe5e21 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -818,6 +818,7 @@ class BrokerConnection(object): # in reverse order. As soon as we find one that works, return it test_cases = [ # format (<broker version>, <needed struct>) + ((1, 0, 0), MetadataRequest[5]), ((0, 11, 0), MetadataRequest[4]), ((0, 10, 2), OffsetFetchRequest[2]), ((0, 10, 1), MetadataRequest[2]), diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 09746bf..b787c5f 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -286,6 +286,12 @@ class SaslHandShakeResponse_v0(Response): ) +class SaslHandShakeResponse_v1(Response): + API_KEY = 17 + API_VERSION = 1 + SCHEMA = SaslHandShakeResponse_v0.SCHEMA + + class SaslHandShakeRequest_v0(Request): API_KEY = 17 API_VERSION = 0 @@ -294,5 +300,36 @@ class SaslHandShakeRequest_v0(Request): ('mechanism', String('utf-8')) ) -SaslHandShakeRequest = [SaslHandShakeRequest_v0] -SaslHandShakeResponse = [SaslHandShakeResponse_v0] + +class SaslHandShakeRequest_v1(Request): + API_KEY = 17 + API_VERSION = 1 + RESPONSE_TYPE = SaslHandShakeResponse_v1 + SCHEMA = SaslHandShakeRequest_v0.SCHEMA + + +SaslHandShakeRequest = [SaslHandShakeRequest_v0, SaslHandShakeRequest_v1] +SaslHandShakeResponse = [SaslHandShakeResponse_v0, SaslHandShakeResponse_v1] + + +class SaslAuthenticateResponse_v0(Request): + API_KEY = 36 + API_VERSION = 0 + SCHEMA = Schema( + ('error_code', Int16), + ('error_message', String('utf-8')), + ('sasl_auth_bytes', Bytes) + ) + + +class SaslAuthenticateRequest_v0(Request): + API_KEY = 36 + API_VERSION = 0 + RESPONSE_TYPE = SaslAuthenticateResponse_v0 + SCHEMA = Schema( + ('sasl_auth_bytes', Bytes) + ) + + +SaslAuthenticateRequest = [SaslAuthenticateRequest_v0] +SaslAuthenticateResponse = [SaslAuthenticateResponse_v0] diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py index 0b03845..5fc17e0 100644 --- a/kafka/protocol/fetch.py +++ b/kafka/protocol/fetch.py @@ -84,6 +84,16 @@ class FetchResponse_v5(Response): ) +class FetchResponse_v6(Response): + """ + Same as FetchResponse_v5. The version number is bumped up to indicate that the client supports KafkaStorageException. + The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 5 + """ + API_KEY = 1 + API_VERSION = 6 + SCHEMA = FetchResponse_v5.SCHEMA + + class FetchRequest_v0(Request): API_KEY = 1 API_VERSION = 0 @@ -174,11 +184,25 @@ class FetchRequest_v5(Request): ) +class FetchRequest_v6(Request): + """ + The body of FETCH_REQUEST_V6 is the same as FETCH_REQUEST_V5. + The version number is bumped up to indicate that the client supports KafkaStorageException. + The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 5 + """ + API_KEY = 1 + API_VERSION = 6 + RESPONSE_TYPE = FetchResponse_v6 + SCHEMA = FetchRequest_v5.SCHEMA + + FetchRequest = [ FetchRequest_v0, FetchRequest_v1, FetchRequest_v2, - FetchRequest_v3, FetchRequest_v4, FetchRequest_v5 + FetchRequest_v3, FetchRequest_v4, FetchRequest_v5, + FetchRequest_v6 ] FetchResponse = [ FetchResponse_v0, FetchResponse_v1, FetchResponse_v2, - FetchResponse_v3, FetchResponse_v4, FetchResponse_v5 + FetchResponse_v3, FetchResponse_v4, FetchResponse_v5, + FetchResponse_v6 ] diff --git a/kafka/protocol/group.py b/kafka/protocol/group.py index c6acca8..db84427 100644 --- a/kafka/protocol/group.py +++ b/kafka/protocol/group.py @@ -87,7 +87,7 @@ JoinGroupRequest = [ JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2 ] JoinGroupResponse = [ - JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v1 + JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v2 ] diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py index 2be8209..2aafdd3 100644 --- a/kafka/protocol/metadata.py +++ b/kafka/protocol/metadata.py @@ -102,6 +102,32 @@ class MetadataResponse_v4(Response): SCHEMA = MetadataResponse_v3.SCHEMA +class MetadataResponse_v5(Response): + API_KEY = 3 + API_VERSION = 5 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('brokers', Array( + ('node_id', Int32), + ('host', String('utf-8')), + ('port', Int32), + ('rack', String('utf-8')))), + ('cluster_id', String('utf-8')), + ('controller_id', Int32), + ('topics', Array( + ('error_code', Int16), + ('topic', String('utf-8')), + ('is_internal', Boolean), + ('partitions', Array( + ('error_code', Int16), + ('partition', Int32), + ('leader', Int32), + ('replicas', Array(Int32)), + ('isr', Array(Int32)), + ('offline_replicas', Array(Int32)))))) + ) + + class MetadataRequest_v0(Request): API_KEY = 3 API_VERSION = 0 @@ -151,11 +177,24 @@ class MetadataRequest_v4(Request): NO_TOPICS = None # Empty array (len 0) for topics returns no topics +class MetadataRequest_v5(Request): + """ + The v5 metadata request is the same as v4. + An additional field for offline_replicas has been added to the v5 metadata response + """ + API_KEY = 3 + API_VERSION = 5 + RESPONSE_TYPE = MetadataResponse_v5 + SCHEMA = MetadataRequest_v4.SCHEMA + ALL_TOPICS = -1 # Null Array (len -1) for topics returns all topics + NO_TOPICS = None # Empty array (len 0) for topics returns no topics + + MetadataRequest = [ MetadataRequest_v0, MetadataRequest_v1, MetadataRequest_v2, - MetadataRequest_v3, MetadataRequest_v4 + MetadataRequest_v3, MetadataRequest_v4, MetadataRequest_v5 ] MetadataResponse = [ MetadataResponse_v0, MetadataResponse_v1, MetadataResponse_v2, - MetadataResponse_v3, MetadataResponse_v4 + MetadataResponse_v3, MetadataResponse_v4, MetadataResponse_v5 ] diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py index 34ff949..5fbddec 100644 --- a/kafka/protocol/produce.py +++ b/kafka/protocol/produce.py @@ -52,52 +52,67 @@ class ProduceResponse_v3(Response): SCHEMA = ProduceResponse_v2.SCHEMA -class ProduceRequest_v0(Request): +class ProduceResponse_v4(Response): + """ + The version number is bumped up to indicate that the client supports KafkaStorageException. + The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 3 + """ API_KEY = 0 - API_VERSION = 0 - RESPONSE_TYPE = ProduceResponse_v0 + API_VERSION = 4 + SCHEMA = ProduceResponse_v3.SCHEMA + + +class ProduceResponse_v5(Response): + API_KEY = 0 + API_VERSION = 5 SCHEMA = Schema( - ('required_acks', Int16), - ('timeout', Int32), ('topics', Array( ('topic', String('utf-8')), ('partitions', Array( ('partition', Int32), - ('messages', Bytes))))) + ('error_code', Int16), + ('offset', Int64), + ('timestamp', Int64), + ('log_start_offset', Int64))))), + ('throttle_time_ms', Int32) ) + +class ProduceRequest(Request): + API_KEY = 0 + def expect_response(self): if self.required_acks == 0: # pylint: disable=no-member return False return True -class ProduceRequest_v1(Request): - API_KEY = 0 +class ProduceRequest_v0(ProduceRequest): + API_VERSION = 0 + RESPONSE_TYPE = ProduceResponse_v0 + SCHEMA = Schema( + ('required_acks', Int16), + ('timeout', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('messages', Bytes))))) + ) + + +class ProduceRequest_v1(ProduceRequest): API_VERSION = 1 RESPONSE_TYPE = ProduceResponse_v1 SCHEMA = ProduceRequest_v0.SCHEMA - def expect_response(self): - if self.required_acks == 0: # pylint: disable=no-member - return False - return True - - -class ProduceRequest_v2(Request): - API_KEY = 0 +class ProduceRequest_v2(ProduceRequest): API_VERSION = 2 RESPONSE_TYPE = ProduceResponse_v2 SCHEMA = ProduceRequest_v1.SCHEMA - def expect_response(self): - if self.required_acks == 0: # pylint: disable=no-member - return False - return True - -class ProduceRequest_v3(Request): - API_KEY = 0 +class ProduceRequest_v3(ProduceRequest): API_VERSION = 3 RESPONSE_TYPE = ProduceResponse_v3 SCHEMA = Schema( @@ -111,17 +126,32 @@ class ProduceRequest_v3(Request): ('messages', Bytes))))) ) - def expect_response(self): - if self.required_acks == 0: # pylint: disable=no-member - return False - return True + +class ProduceRequest_v4(ProduceRequest): + """ + The version number is bumped up to indicate that the client supports KafkaStorageException. + The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 3 + """ + API_VERSION = 4 + RESPONSE_TYPE = ProduceResponse_v4 + SCHEMA = ProduceRequest_v3.SCHEMA + + +class ProduceRequest_v5(ProduceRequest): + """ + Same as v4. The version number is bumped since the v5 response includes an additional + partition level field: the log_start_offset. + """ + API_VERSION = 5 + RESPONSE_TYPE = ProduceResponse_v5 + SCHEMA = ProduceRequest_v4.SCHEMA ProduceRequest = [ ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2, - ProduceRequest_v3 + ProduceRequest_v3, ProduceRequest_v4, ProduceRequest_v5 ] ProduceResponse = [ ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2, - ProduceResponse_v2 + ProduceResponse_v3, ProduceResponse_v4, ProduceResponse_v5 ] |