summaryrefslogtreecommitdiff
path: root/kafka/consumer/fetcher.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r--kafka/consumer/fetcher.py6
1 files changed, 4 insertions, 2 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 8ce573b..1f0619b 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -37,6 +37,7 @@ class Fetcher(six.Iterator):
'max_partition_fetch_bytes': 1048576,
'check_crcs': True,
'iterator_refetch_records': 1, # undocumented -- interface may change
+ 'api_version': (0, 8, 0),
}
def __init__(self, client, subscriptions, **configs):
@@ -531,7 +532,7 @@ class Fetcher(six.Iterator):
FetchRequests skipped if no leader, or node has requests in flight
Returns:
- dict: {node_id: FetchRequest, ...}
+ dict: {node_id: FetchRequest, ...} (version depends on api_version)
"""
# create the fetch info as a dict of lists of partition info tuples
# which can be passed to FetchRequest() via .items()
@@ -564,9 +565,10 @@ class Fetcher(six.Iterator):
log.debug("Adding fetch request for partition %s at offset %d",
partition, position)
+ version = 1 if self.config['api_version'] >= (0, 9) else 0
requests = {}
for node_id, partition_data in six.iteritems(fetchable):
- requests[node_id] = FetchRequest[0](
+ requests[node_id] = FetchRequest[version](
-1, # replica_id
self.config['fetch_max_wait_ms'],
self.config['fetch_min_bytes'],