summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-03-13 18:46:28 -0700
committerJeff Widman <jeff@jeffwidman.com>2019-03-13 18:46:28 -0700
commit921c553b6a62a34044e4ae444af65abea3717faa (patch)
treea9bb09665c1b557f95b8c872005fa8075a4482fb
parent7965460a7253a5f5c23e7343c0c06c40e40f471e (diff)
downloadkafka-python-921c553b6a62a34044e4ae444af65abea3717faa.tar.gz
Attempt to join heartbeat thread during close() (#1735)
Underlying issue here is a race on consumer.close() between the client, the connections/sockets, and the heartbeat thread. Although the heartbeat thread is signaled to close, we do not block for it. So when we go on to close the client and its underlying connections, if the heartbeat is still doing work it can cause errors/crashes if it attempts to access the now closed objects (selectors and/or sockets, primarily). So this commit adds a blocking thread join to the heartbeat close. This may cause some additional blocking time on consumer.close() while the heartbeat thread finishes. But it should be small in average case and in the worst case will be no longer than the heartbeat_timeout_ms (though if we timeout the join, race errors may still occur). Fix #1666
-rw-r--r--kafka/coordinator/base.py9
1 files changed, 6 insertions, 3 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 664e8d2..e538fda 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -752,9 +752,8 @@ class BaseCoordinator(object):
def close(self):
"""Close the coordinator, leave the current group,
and reset local generation / member_id"""
- with self._client._lock, self._lock:
- self._close_heartbeat_thread()
- self.maybe_leave_group()
+ self._close_heartbeat_thread()
+ self.maybe_leave_group()
def maybe_leave_group(self):
"""Leave the current group and reset local generation/memberId."""
@@ -918,6 +917,10 @@ class HeartbeatThread(threading.Thread):
self.closed = True
with self.coordinator._lock:
self.coordinator._lock.notify()
+ if self.is_alive():
+ self.join(self.coordinator.config['heartbeat_interval_ms'] / 1000)
+ if self.is_alive():
+ log.warning("Heartbeat thread did not fully terminate during close")
def run(self):
try: