From 3d16f2ff5f75380c8a9fce846f35e92bb5bfb935 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 5 Apr 2016 13:04:24 -0700 Subject: KAFKA-2136: support Fetch and Produce v1 (throttle_time_ms) --- kafka/consumer/fetcher.py | 6 ++++-- kafka/producer/sender.py | 8 +++++--- kafka/protocol/fetch.py | 26 ++++++++++++++++++++++++-- kafka/protocol/legacy.py | 6 +----- kafka/protocol/produce.py | 25 +++++++++++++++++++++++-- 5 files changed, 57 insertions(+), 14 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 8ce573b..1f0619b 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -37,6 +37,7 @@ class Fetcher(six.Iterator): 'max_partition_fetch_bytes': 1048576, 'check_crcs': True, 'iterator_refetch_records': 1, # undocumented -- interface may change + 'api_version': (0, 8, 0), } def __init__(self, client, subscriptions, **configs): @@ -531,7 +532,7 @@ class Fetcher(six.Iterator): FetchRequests skipped if no leader, or node has requests in flight Returns: - dict: {node_id: FetchRequest, ...} + dict: {node_id: FetchRequest, ...} (version depends on api_version) """ # create the fetch info as a dict of lists of partition info tuples # which can be passed to FetchRequest() via .items() @@ -564,9 +565,10 @@ class Fetcher(six.Iterator): log.debug("Adding fetch request for partition %s at offset %d", partition, position) + version = 1 if self.config['api_version'] >= (0, 9) else 0 requests = {} for node_id, partition_data in six.iteritems(fetchable): - requests[node_id] = FetchRequest[0]( + requests[node_id] = FetchRequest[version]( -1, # replica_id self.config['fetch_max_wait_ms'], self.config['fetch_min_bytes'], diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 2201261..bf7c163 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -27,6 +27,7 @@ class Sender(threading.Thread): 'retries': 0, 'request_timeout_ms': 30000, 'client_id': 'kafka-python-' + __version__, + 'api_version': (0, 8, 0), } def __init__(self, client, metadata, accumulator, **configs): @@ -232,7 +233,7 @@ class Sender(threading.Thread): collated: {node_id: [RecordBatch]} Returns: - dict: {node_id: ProduceRequest} + dict: {node_id: ProduceRequest} (version depends on api_version) """ requests = {} for node_id, batches in six.iteritems(collated): @@ -245,7 +246,7 @@ class Sender(threading.Thread): """Create a produce request from the given record batches. Returns: - ProduceRequest + ProduceRequest (version depends on api_version) """ produce_records_by_partition = collections.defaultdict(dict) for batch in batches: @@ -256,7 +257,8 @@ class Sender(threading.Thread): buf = batch.records.buffer() produce_records_by_partition[topic][partition] = buf - return ProduceRequest[0]( + version = 1 if self.config['api_version'] >= (0, 9) else 0 + return ProduceRequest[version]( required_acks=acks, timeout=timeout, topics=[(topic, list(partition_info.items())) diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py index eeda4e7..6aba972 100644 --- a/kafka/protocol/fetch.py +++ b/kafka/protocol/fetch.py @@ -17,6 +17,21 @@ class FetchResponse_v0(Struct): ) +class FetchResponse_v1(Struct): + API_KEY = 1 + API_VERSION = 1 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('topics', Array( + ('topics', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('highwater_offset', Int64), + ('message_set', MessageSet))))) + ) + + class FetchRequest_v0(Struct): API_KEY = 1 API_VERSION = 0 @@ -34,5 +49,12 @@ class FetchRequest_v0(Struct): ) -FetchRequest = [FetchRequest_v0] -FetchResponse = [FetchResponse_v0] +class FetchRequest_v1(Struct): + API_KEY = 1 + API_VERSION = 1 + RESPONSE_TYPE = FetchResponse_v1 + SCHEMA = FetchRequest_v0.SCHEMA + + +FetchRequest = [FetchRequest_v0, FetchRequest_v1] +FetchResponse = [FetchResponse_v0, FetchResponse_v1] diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index 2eddf3b..08d2d01 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -336,11 +336,7 @@ class KafkaProtocol(object): payloads: list of OffsetFetchRequestPayload from_kafka: bool, default False, set True for Kafka-committed offsets """ - if from_kafka: - version = 1 - else: - version = 0 - + version = 1 if from_kafka else 0 return kafka.protocol.commit.OffsetFetchRequest[version]( consumer_group=group, topics=[( diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py index 5753f64..e0b8622 100644 --- a/kafka/protocol/produce.py +++ b/kafka/protocol/produce.py @@ -16,6 +16,20 @@ class ProduceResponse_v0(Struct): ) +class ProduceResponse_v1(Struct): + API_KEY = 0 + API_VERSION = 1 + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('offset', Int64))))), + ('throttle_time_ms', Int32) + ) + + class ProduceRequest_v0(Struct): API_KEY = 0 API_VERSION = 0 @@ -31,5 +45,12 @@ class ProduceRequest_v0(Struct): ) -ProduceRequest = [ProduceRequest_v0] -ProduceResponse = [ProduceResponse_v0] +class ProduceRequest_v1(Struct): + API_KEY = 0 + API_VERSION = 1 + RESPONSE_TYPE = ProduceResponse_v1 + SCHEMA = ProduceRequest_v0.SCHEMA + + +ProduceRequest = [ProduceRequest_v0, ProduceRequest_v1] +ProduceResponse = [ProduceResponse_v0, ProduceResponse_v1] -- cgit v1.2.1