summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-03-12 12:16:05 -0800
committerDana Powers <dana.powers@gmail.com>2016-03-12 18:21:32 -0800
commit561a678d1de1604262be43d47919fa68bdf17b17 (patch)
tree9cef94dc5ff8315dcbb970f0eaf09e98f374f121
parentfb0b49827ff78bebd0a84c86d890394b00795bcf (diff)
downloadkafka-python-consumer_heartbeat_fixes.tar.gz
Consumer should timeout internal iterator if heartbeat ttl is expiredconsumer_heartbeat_fixes
-rw-r--r--kafka/consumer/group.py18
1 files changed, 15 insertions, 3 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 637ef93..9db4b5d 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -752,9 +752,21 @@ class KafkaConsumer(six.Iterator):
self._fetcher.init_fetches()
def _next_timeout(self):
- return min(self._consumer_timeout,
- self._client._delayed_tasks.next_at() + time.time(),
- self._client.cluster.ttl() / 1000.0 + time.time())
+ timeout = min(self._consumer_timeout,
+ self._client._delayed_tasks.next_at() + time.time(),
+ self._client.cluster.ttl() / 1000.0 + time.time())
+
+ # Although the delayed_tasks timeout above should cover processing
+ # HeartbeatRequests, it is still possible that HeartbeatResponses
+ # are left unprocessed during a long _fetcher iteration without
+ # an intermediate poll(). And because tasks are responsible for
+ # rescheduling themselves, an unprocessed response will prevent
+ # the next heartbeat from being sent. This check should help
+ # avoid that.
+ if self._use_consumer_group():
+ heartbeat = time.time() + self._coordinator.heartbeat.ttl()
+ timeout = min(timeout, heartbeat)
+ return timeout
def __iter__(self): # pylint: disable=non-iterator-returned
return self