summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-02-02 07:55:39 -0800
committerDana Powers <dana.powers@gmail.com>2016-02-02 07:55:39 -0800
commit22a1b6dc70736089a96338602181da934621a9b9 (patch)
tree27f8e75cfa0c75dea8835a6abbddc10bed7a6224
parent2a783d047aa97cef80ba964cdd2d8dcaaebb4f66 (diff)
downloadkafka-python-22a1b6dc70736089a96338602181da934621a9b9.tar.gz
Fix accumulator bug: expired batches should be removed from the internal queue
-rw-r--r--kafka/producer/record_accumulator.py10
1 files changed, 10 insertions, 0 deletions
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 6a762eb..c62926d 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -243,6 +243,7 @@ class RecordAccumulator(object):
list of RecordBatch that were expired
"""
expired_batches = []
+ to_remove = []
count = 0
for tp, dq in six.iteritems(self._batches):
assert tp in self._tp_locks, 'TopicPartition not in locks dict'
@@ -254,11 +255,20 @@ class RecordAccumulator(object):
if batch.maybe_expire(request_timeout_ms,
self.config['linger_ms']):
expired_batches.append(batch)
+ to_remove.append(batch)
count += 1
self.deallocate(batch)
elif not batch.in_retry():
break
+ # Python does not allow us to mutate the dq during iteration
+ # Assuming expired batches are infrequent, this is better than
+ # creating a new copy of the deque for iteration on every loop
+ if to_remove:
+ for batch in to_remove:
+ dq.remove(batch)
+ to_remove = []
+
if expired_batches:
log.debug("Expired %d batches in accumulator", count) # trace