summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-10 19:53:07 -0800
committerDana Powers <dana.powers@rd.io>2016-01-10 21:24:33 -0800
commitd2f136073cac0c8379f357cd76b0ea163fd22a99 (patch)
treec49f1c7d7276ee7de66a8082b830ff4c22eb20ad
parent5fa8c88d6f369b3eceae7f34296b56cfd92d1f90 (diff)
downloadkafka-python-d2f136073cac0c8379f357cd76b0ea163fd22a99.tar.gz
Receive all available responses in client._poll
-rw-r--r--kafka/client_async.py7
1 files changed, 4 insertions, 3 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 3a1922e..88b8ec6 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -354,11 +354,12 @@ class KafkaClient(object):
ready, _, _ = select.select(list(sockets.keys()), [], [], timeout)
responses = []
- # list, not iterator, because inline callbacks may add to self._conns
for sock in ready:
conn = sockets[sock]
- response = conn.recv() # Note: conn.recv runs callbacks / errbacks
- if response:
+ while conn.in_flight_requests:
+ response = conn.recv() # Note: conn.recv runs callbacks / errbacks
+ if not response:
+ break
responses.append(response)
return responses