diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-02-02 07:55:39 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-02-02 07:55:39 -0800 |
commit | 22a1b6dc70736089a96338602181da934621a9b9 (patch) | |
tree | 27f8e75cfa0c75dea8835a6abbddc10bed7a6224 | |
parent | 2a783d047aa97cef80ba964cdd2d8dcaaebb4f66 (diff) | |
download | kafka-python-22a1b6dc70736089a96338602181da934621a9b9.tar.gz |
Fix accumulator bug: expired batches should be removed from the internal queue
-rw-r--r-- | kafka/producer/record_accumulator.py | 10 |
1 files changed, 10 insertions, 0 deletions
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 6a762eb..c62926d 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -243,6 +243,7 @@ class RecordAccumulator(object): list of RecordBatch that were expired """ expired_batches = [] + to_remove = [] count = 0 for tp, dq in six.iteritems(self._batches): assert tp in self._tp_locks, 'TopicPartition not in locks dict' @@ -254,11 +255,20 @@ class RecordAccumulator(object): if batch.maybe_expire(request_timeout_ms, self.config['linger_ms']): expired_batches.append(batch) + to_remove.append(batch) count += 1 self.deallocate(batch) elif not batch.in_retry(): break + # Python does not allow us to mutate the dq during iteration + # Assuming expired batches are infrequent, this is better than + # creating a new copy of the deque for iteration on every loop + if to_remove: + for batch in to_remove: + dq.remove(batch) + to_remove = [] + if expired_batches: log.debug("Expired %d batches in accumulator", count) # trace |