summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-05 13:04:24 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-06 14:48:13 -0700
commit3d16f2ff5f75380c8a9fce846f35e92bb5bfb935 (patch)
tree169a0e740992d6bff7b4e46dbf047d14429b5d82
parent331442ee0fcc0d888c2b2d2ed4f2a339d167b4a2 (diff)
downloadkafka-python-kafka-2136.tar.gz
KAFKA-2136: support Fetch and Produce v1 (throttle_time_ms)kafka-2136
-rw-r--r--kafka/consumer/fetcher.py6
-rw-r--r--kafka/producer/sender.py8
-rw-r--r--kafka/protocol/fetch.py26
-rw-r--r--kafka/protocol/legacy.py6
-rw-r--r--kafka/protocol/produce.py25
5 files changed, 57 insertions, 14 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 8ce573b..1f0619b 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -37,6 +37,7 @@ class Fetcher(six.Iterator):
'max_partition_fetch_bytes': 1048576,
'check_crcs': True,
'iterator_refetch_records': 1, # undocumented -- interface may change
+ 'api_version': (0, 8, 0),
}
def __init__(self, client, subscriptions, **configs):
@@ -531,7 +532,7 @@ class Fetcher(six.Iterator):
FetchRequests skipped if no leader, or node has requests in flight
Returns:
- dict: {node_id: FetchRequest, ...}
+ dict: {node_id: FetchRequest, ...} (version depends on api_version)
"""
# create the fetch info as a dict of lists of partition info tuples
# which can be passed to FetchRequest() via .items()
@@ -564,9 +565,10 @@ class Fetcher(six.Iterator):
log.debug("Adding fetch request for partition %s at offset %d",
partition, position)
+ version = 1 if self.config['api_version'] >= (0, 9) else 0
requests = {}
for node_id, partition_data in six.iteritems(fetchable):
- requests[node_id] = FetchRequest[0](
+ requests[node_id] = FetchRequest[version](
-1, # replica_id
self.config['fetch_max_wait_ms'],
self.config['fetch_min_bytes'],
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 2201261..bf7c163 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -27,6 +27,7 @@ class Sender(threading.Thread):
'retries': 0,
'request_timeout_ms': 30000,
'client_id': 'kafka-python-' + __version__,
+ 'api_version': (0, 8, 0),
}
def __init__(self, client, metadata, accumulator, **configs):
@@ -232,7 +233,7 @@ class Sender(threading.Thread):
collated: {node_id: [RecordBatch]}
Returns:
- dict: {node_id: ProduceRequest}
+ dict: {node_id: ProduceRequest} (version depends on api_version)
"""
requests = {}
for node_id, batches in six.iteritems(collated):
@@ -245,7 +246,7 @@ class Sender(threading.Thread):
"""Create a produce request from the given record batches.
Returns:
- ProduceRequest
+ ProduceRequest (version depends on api_version)
"""
produce_records_by_partition = collections.defaultdict(dict)
for batch in batches:
@@ -256,7 +257,8 @@ class Sender(threading.Thread):
buf = batch.records.buffer()
produce_records_by_partition[topic][partition] = buf
- return ProduceRequest[0](
+ version = 1 if self.config['api_version'] >= (0, 9) else 0
+ return ProduceRequest[version](
required_acks=acks,
timeout=timeout,
topics=[(topic, list(partition_info.items()))
diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py
index eeda4e7..6aba972 100644
--- a/kafka/protocol/fetch.py
+++ b/kafka/protocol/fetch.py
@@ -17,6 +17,21 @@ class FetchResponse_v0(Struct):
)
+class FetchResponse_v1(Struct):
+ API_KEY = 1
+ API_VERSION = 1
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('topics', Array(
+ ('topics', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16),
+ ('highwater_offset', Int64),
+ ('message_set', MessageSet)))))
+ )
+
+
class FetchRequest_v0(Struct):
API_KEY = 1
API_VERSION = 0
@@ -34,5 +49,12 @@ class FetchRequest_v0(Struct):
)
-FetchRequest = [FetchRequest_v0]
-FetchResponse = [FetchResponse_v0]
+class FetchRequest_v1(Struct):
+ API_KEY = 1
+ API_VERSION = 1
+ RESPONSE_TYPE = FetchResponse_v1
+ SCHEMA = FetchRequest_v0.SCHEMA
+
+
+FetchRequest = [FetchRequest_v0, FetchRequest_v1]
+FetchResponse = [FetchResponse_v0, FetchResponse_v1]
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
index 2eddf3b..08d2d01 100644
--- a/kafka/protocol/legacy.py
+++ b/kafka/protocol/legacy.py
@@ -336,11 +336,7 @@ class KafkaProtocol(object):
payloads: list of OffsetFetchRequestPayload
from_kafka: bool, default False, set True for Kafka-committed offsets
"""
- if from_kafka:
- version = 1
- else:
- version = 0
-
+ version = 1 if from_kafka else 0
return kafka.protocol.commit.OffsetFetchRequest[version](
consumer_group=group,
topics=[(
diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py
index 5753f64..e0b8622 100644
--- a/kafka/protocol/produce.py
+++ b/kafka/protocol/produce.py
@@ -16,6 +16,20 @@ class ProduceResponse_v0(Struct):
)
+class ProduceResponse_v1(Struct):
+ API_KEY = 0
+ API_VERSION = 1
+ SCHEMA = Schema(
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16),
+ ('offset', Int64))))),
+ ('throttle_time_ms', Int32)
+ )
+
+
class ProduceRequest_v0(Struct):
API_KEY = 0
API_VERSION = 0
@@ -31,5 +45,12 @@ class ProduceRequest_v0(Struct):
)
-ProduceRequest = [ProduceRequest_v0]
-ProduceResponse = [ProduceResponse_v0]
+class ProduceRequest_v1(Struct):
+ API_KEY = 0
+ API_VERSION = 1
+ RESPONSE_TYPE = ProduceResponse_v1
+ SCHEMA = ProduceRequest_v0.SCHEMA
+
+
+ProduceRequest = [ProduceRequest_v0, ProduceRequest_v1]
+ProduceResponse = [ProduceResponse_v0, ProduceResponse_v1]