diff options
author | Rauli Ikonen <rauli@aiven.io> | 2020-11-15 20:19:58 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-15 10:19:58 -0800 |
commit | 6c87155bbd855f6bba1ba30b2b6227e66ea79baa (patch) | |
tree | a4472d0fa125e75cd5ce8709db1d5bb81e0e5830 | |
parent | 12325c09baefae2396f1083bc8b037603721198c (diff) | |
download | kafka-python-6c87155bbd855f6bba1ba30b2b6227e66ea79baa.tar.gz |
KafkaConsumer: Exit poll if consumer is closed (#2152)
-rw-r--r-- | kafka/consumer/group.py | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 26408c3..4fd57ae 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -651,7 +651,7 @@ class KafkaConsumer(six.Iterator): # Poll for new data until the timeout expires start = time.time() remaining = timeout_ms - while True: + while not self._closed: records = self._poll_once(remaining, max_records, update_offsets=update_offsets) if records: return records @@ -660,7 +660,9 @@ class KafkaConsumer(six.Iterator): remaining = timeout_ms - elapsed_ms if remaining <= 0: - return {} + break + + return {} def _poll_once(self, timeout_ms, max_records, update_offsets=True): """Do one round of polling. In addition to checking for new data, this does |