summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRauli Ikonen <rauli@aiven.io>2020-11-15 20:19:58 +0200
committerGitHub <noreply@github.com>2020-11-15 10:19:58 -0800
commit6c87155bbd855f6bba1ba30b2b6227e66ea79baa (patch)
treea4472d0fa125e75cd5ce8709db1d5bb81e0e5830
parent12325c09baefae2396f1083bc8b037603721198c (diff)
downloadkafka-python-6c87155bbd855f6bba1ba30b2b6227e66ea79baa.tar.gz
KafkaConsumer: Exit poll if consumer is closed (#2152)
-rw-r--r--kafka/consumer/group.py6
1 files changed, 4 insertions, 2 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 26408c3..4fd57ae 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -651,7 +651,7 @@ class KafkaConsumer(six.Iterator):
# Poll for new data until the timeout expires
start = time.time()
remaining = timeout_ms
- while True:
+ while not self._closed:
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
if records:
return records
@@ -660,7 +660,9 @@ class KafkaConsumer(six.Iterator):
remaining = timeout_ms - elapsed_ms
if remaining <= 0:
- return {}
+ break
+
+ return {}
def _poll_once(self, timeout_ms, max_records, update_offsets=True):
"""Do one round of polling. In addition to checking for new data, this does