diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-03-13 18:46:28 -0700 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2019-03-13 18:46:28 -0700 |
commit | 921c553b6a62a34044e4ae444af65abea3717faa (patch) | |
tree | a9bb09665c1b557f95b8c872005fa8075a4482fb | |
parent | 7965460a7253a5f5c23e7343c0c06c40e40f471e (diff) | |
download | kafka-python-921c553b6a62a34044e4ae444af65abea3717faa.tar.gz |
Attempt to join heartbeat thread during close() (#1735)
Underlying issue here is a race on consumer.close() between the client, the connections/sockets, and the heartbeat thread. Although the heartbeat thread is signaled to close, we do not block for it. So when we go on to close the client and its underlying connections, if the heartbeat is still doing work it can cause errors/crashes if it attempts to access the now closed objects (selectors and/or sockets, primarily).
So this commit adds a blocking thread join to the heartbeat close. This may cause some additional blocking time on consumer.close() while the heartbeat thread finishes. But it should be small in average case and in the worst case will be no longer than the heartbeat_timeout_ms (though if we timeout the join, race errors may still occur).
Fix #1666
-rw-r--r-- | kafka/coordinator/base.py | 9 |
1 files changed, 6 insertions, 3 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 664e8d2..e538fda 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -752,9 +752,8 @@ class BaseCoordinator(object): def close(self): """Close the coordinator, leave the current group, and reset local generation / member_id""" - with self._client._lock, self._lock: - self._close_heartbeat_thread() - self.maybe_leave_group() + self._close_heartbeat_thread() + self.maybe_leave_group() def maybe_leave_group(self): """Leave the current group and reset local generation/memberId.""" @@ -918,6 +917,10 @@ class HeartbeatThread(threading.Thread): self.closed = True with self.coordinator._lock: self.coordinator._lock.notify() + if self.is_alive(): + self.join(self.coordinator.config['heartbeat_interval_ms'] / 1000) + if self.is_alive(): + log.warning("Heartbeat thread did not fully terminate during close") def run(self): try: |