summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-06 11:47:41 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-06 11:47:41 -0700
commit78ad43600c469c05a5b0e32c6be27048749cd58e (patch)
tree8e5b4da7c07101eb96ff90f7fc8da38ddd25c34c
parent3ef15f9d60af01ce397737b4d356618385b8884f (diff)
downloadkafka-python-fetch.tar.gz
Dont send FetchRequest for (obviously) pending datafetch
-rw-r--r--kafka/consumer/fetcher.py15
1 files changed, 12 insertions, 3 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 71d2ed2..4769c2e 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -537,15 +537,24 @@ class Fetcher(six.Iterator):
# which can be passed to FetchRequest() via .items()
fetchable = collections.defaultdict(lambda: collections.defaultdict(list))
+ # avoid re-fetching pending offsets
+ pending = set()
+ for fetch_offset, tp, _ in self._records:
+ pending.add((tp, fetch_offset))
+
for partition in self._subscriptions.fetchable_partitions():
node_id = self._client.cluster.leader_for_partition(partition)
+ position = self._subscriptions.assignment[partition].position
+
+ # fetch if there is a leader, no in-flight requests, and no _records
if node_id is None or node_id == -1:
log.debug("No leader found for partition %s."
" Requesting metadata update", partition)
self._client.cluster.request_update()
- elif self._client.in_flight_request_count(node_id) == 0:
- # fetch if there is a leader and no in-flight requests
- position = self._subscriptions.assignment[partition].position
+
+ elif ((partition, position) not in pending and
+ self._client.in_flight_request_count(node_id) == 0):
+
partition_info = (
partition.partition,
position,