summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2018-02-08 11:59:54 -0800
committerGitHub <noreply@github.com>2018-02-08 11:59:54 -0800
commit41aa0342f8bfa6f2ced61be7a8b0f2cd28fbb671 (patch)
treeff11cdf2eb24c41e764dadfecfcfcb3f79d884c5
parent68068cac13c4cacbe3122cdcba39aa0d3c060b99 (diff)
downloadkafka-python-41aa0342f8bfa6f2ced61be7a8b0f2cd28fbb671.tar.gz
Fix pending completion IndexError bug caused by multiple threads (#1372)
-rw-r--r--kafka/client_async.py10
1 files changed, 8 insertions, 2 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 4962d9f..24a5bef 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -665,8 +665,14 @@ class KafkaClient(object):
def _fire_pending_completed_requests(self):
responses = []
- while self._pending_completion:
- response, future = self._pending_completion.popleft()
+ while True:
+ try:
+ # We rely on deque.popleft remaining threadsafe
+ # to allow both the heartbeat thread and the main thread
+ # to process responses
+ response, future = self._pending_completion.popleft()
+ except IndexError:
+ break
future.success(response)
responses.append(response)
return responses