summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-03-14 00:29:08 -0700
committerDana Powers <dana.powers@gmail.com>2016-03-14 00:29:08 -0700
commit6ded42f3c4caf4c753f19776d9e2dfaceb484ebb (patch)
treed1963fd24664ed909d31e6c4a6ba442f89a45c1f /kafka
parent0330036bef996815c5ef384ab6803697816e4189 (diff)
downloadkafka-python-6ded42f3c4caf4c753f19776d9e2dfaceb484ebb.tar.gz
Fix producer threading bug that could crash sender (dict changing during iteration)accumulator_bugfix
Diffstat (limited to 'kafka')
-rw-r--r--kafka/producer/record_accumulator.py6
1 files changed, 4 insertions, 2 deletions
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 958d207..19dc199 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -248,11 +248,12 @@ class RecordAccumulator(object):
expired_batches = []
to_remove = []
count = 0
- for tp, dq in six.iteritems(self._batches):
+ for tp in list(self._batches.keys()):
assert tp in self._tp_locks, 'TopicPartition not in locks dict'
with self._tp_locks[tp]:
# iterate over the batches and expire them if they have stayed
# in accumulator for more than request_timeout_ms
+ dq = self._batches[tp]
for batch in dq:
# check if the batch is expired
if batch.maybe_expire(request_timeout_ms,
@@ -367,8 +368,9 @@ class RecordAccumulator(object):
def has_unsent(self):
"""Return whether there is any unsent record in the accumulator."""
- for tp, dq in six.iteritems(self._batches):
+ for tp in list(self._batches.keys()):
with self._tp_locks[tp]:
+ dq = self._batches[tp]
if len(dq):
return True
return False