summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhuangcuiyang <harlan51360@outlook.com>2020-09-08 05:59:24 +0800
committerGitHub <noreply@github.com>2020-09-07 14:59:24 -0700
commit91daea329bb40ed80bddef4635770d24b670b0c6 (patch)
treea48912cf14390f8747e09155efd1ec281499fb38
parentbd557dabd487cc44c11bf003600c82477ea5de11 (diff)
downloadkafka-python-91daea329bb40ed80bddef4635770d24b670b0c6.tar.gz
Fix #1985: fix consumer deadlock when heartbeat thread request timeout (#2064)
-rw-r--r--kafka/coordinator/base.py20
1 files changed, 12 insertions, 8 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index cd110ce..5e41309 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -242,7 +242,7 @@ class BaseCoordinator(object):
"""Block until the coordinator for this group is known
(and we have an active connection -- java client uses unsent queue).
"""
- with self._lock:
+ with self._client._lock, self._lock:
while self.coordinator_unknown():
# Prior to 0.8.2 there was no group coordinator
@@ -345,7 +345,7 @@ class BaseCoordinator(object):
def ensure_active_group(self):
"""Ensure that the group is active (i.e. joined and synced)"""
- with self._lock:
+ with self._client._lock, self._lock:
if self._heartbeat_thread is None:
self._start_heartbeat_thread()
@@ -763,7 +763,7 @@ class BaseCoordinator(object):
def maybe_leave_group(self):
"""Leave the current group and reset local generation/memberId."""
- with self._lock:
+ with self._client._lock, self._lock:
if (not self.coordinator_unknown()
and self.state is not MemberState.UNJOINED
and self._generation is not Generation.NO_GENERATION):
@@ -946,6 +946,15 @@ class HeartbeatThread(threading.Thread):
log.debug('Heartbeat thread closed')
def _run_once(self):
+ with self.coordinator._client._lock, self.coordinator._lock:
+ if self.enabled and self.coordinator.state is MemberState.STABLE:
+ # TODO: When consumer.wakeup() is implemented, we need to
+ # disable here to prevent propagating an exception to this
+ # heartbeat thread
+ # must get client._lock, or maybe deadlock at heartbeat
+ # failure callbak in consumer poll
+ self.coordinator._client.poll(timeout_ms=0)
+
with self.coordinator._lock:
if not self.enabled:
log.debug('Heartbeat disabled. Waiting')
@@ -961,11 +970,6 @@ class HeartbeatThread(threading.Thread):
self.disable()
return
- # TODO: When consumer.wakeup() is implemented, we need to
- # disable here to prevent propagating an exception to this
- # heartbeat thread
- self.coordinator._client.poll(timeout_ms=0)
-
if self.coordinator.coordinator_unknown():
future = self.coordinator.lookup_coordinator()
if not future.is_done or future.failed():