summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-09-29 17:07:03 -0700
committerGitHub <noreply@github.com>2019-09-29 17:07:03 -0700
commit0f929bd866f1526fc5d18068c31903f1ae3393d2 (patch)
tree9b5e7173086e5d10345a82d65c64ea97d46a7fa5
parent392d674be6641078717a4d87e471916c9a4bbb22 (diff)
downloadkafka-python-0f929bd866f1526fc5d18068c31903f1ae3393d2.tar.gz
Change coordinator lock acquisition order (#1821)
-rw-r--r--kafka/client_async.py4
-rw-r--r--kafka/coordinator/base.py78
2 files changed, 39 insertions, 43 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 9b9cb8f..b002797 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -597,7 +597,9 @@ class KafkaClient(object):
self._poll(timeout / 1000)
- responses.extend(self._fire_pending_completed_requests())
+ # called without the lock to avoid deadlock potential
+ # if handlers need to acquire locks
+ responses.extend(self._fire_pending_completed_requests())
# If all we had was a timeout (future is None) - only do one poll
# If we do have a future, we keep looping until it is done
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 5cdbdcf..700c31f 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -243,7 +243,7 @@ class BaseCoordinator(object):
"""Block until the coordinator for this group is known
(and we have an active connection -- java client uses unsent queue).
"""
- with self._client._lock, self._lock:
+ with self._lock:
while self.coordinator_unknown():
# Prior to 0.8.2 there was no group coordinator
@@ -273,7 +273,7 @@ class BaseCoordinator(object):
self._find_coordinator_future = None
def lookup_coordinator(self):
- with self._client._lock, self._lock:
+ with self._lock:
if self._find_coordinator_future is not None:
return self._find_coordinator_future
@@ -346,7 +346,7 @@ class BaseCoordinator(object):
def ensure_active_group(self):
"""Ensure that the group is active (i.e. joined and synced)"""
- with self._client._lock, self._lock:
+ with self._lock:
if self._heartbeat_thread is None:
self._start_heartbeat_thread()
@@ -504,7 +504,7 @@ class BaseCoordinator(object):
log.debug("Received successful JoinGroup response for group %s: %s",
self.group_id, response)
self.sensors.join_latency.record((time.time() - send_time) * 1000)
- with self._client._lock, self._lock:
+ with self._lock:
if self.state is not MemberState.REBALANCING:
# if the consumer was woken up before a rebalance completes,
# we may have already left the group. In this case, we do
@@ -679,7 +679,7 @@ class BaseCoordinator(object):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
- with self._client._lock, self._lock:
+ with self._lock:
coordinator_id = self._client.cluster.add_group_coordinator(self.group_id, response)
if not coordinator_id:
# This could happen if coordinator metadata is different
@@ -761,7 +761,7 @@ class BaseCoordinator(object):
def maybe_leave_group(self):
"""Leave the current group and reset local generation/memberId."""
- with self._client._lock, self._lock:
+ with self._lock:
if (not self.coordinator_unknown()
and self.state is not MemberState.UNJOINED
and self._generation is not Generation.NO_GENERATION):
@@ -959,46 +959,40 @@ class HeartbeatThread(threading.Thread):
self.disable()
return
- # TODO: When consumer.wakeup() is implemented, we need to
- # disable here to prevent propagating an exception to this
- # heartbeat thread
- #
- # Release coordinator lock during client poll to avoid deadlocks
- # if/when connection errback needs coordinator lock
- self.coordinator._client.poll(timeout_ms=0)
-
- if self.coordinator.coordinator_unknown():
- future = self.coordinator.lookup_coordinator()
- if not future.is_done or future.failed():
- # the immediate future check ensures that we backoff
- # properly in the case that no brokers are available
- # to connect to (and the future is automatically failed).
- with self.coordinator._lock:
+ # TODO: When consumer.wakeup() is implemented, we need to
+ # disable here to prevent propagating an exception to this
+ # heartbeat thread
+ self.coordinator._client.poll(timeout_ms=0)
+
+ if self.coordinator.coordinator_unknown():
+ future = self.coordinator.lookup_coordinator()
+ if not future.is_done or future.failed():
+ # the immediate future check ensures that we backoff
+ # properly in the case that no brokers are available
+ # to connect to (and the future is automatically failed).
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
- elif self.coordinator.heartbeat.session_timeout_expired():
- # the session timeout has expired without seeing a
- # successful heartbeat, so we should probably make sure
- # the coordinator is still healthy.
- log.warning('Heartbeat session expired, marking coordinator dead')
- self.coordinator.coordinator_dead('Heartbeat session expired')
-
- elif self.coordinator.heartbeat.poll_timeout_expired():
- # the poll timeout has expired, which means that the
- # foreground thread has stalled in between calls to
- # poll(), so we explicitly leave the group.
- log.warning('Heartbeat poll expired, leaving group')
- self.coordinator.maybe_leave_group()
-
- elif not self.coordinator.heartbeat.should_heartbeat():
- # poll again after waiting for the retry backoff in case
- # the heartbeat failed or the coordinator disconnected
- log.log(0, 'Not ready to heartbeat, waiting')
- with self.coordinator._lock:
+ elif self.coordinator.heartbeat.session_timeout_expired():
+ # the session timeout has expired without seeing a
+ # successful heartbeat, so we should probably make sure
+ # the coordinator is still healthy.
+ log.warning('Heartbeat session expired, marking coordinator dead')
+ self.coordinator.coordinator_dead('Heartbeat session expired')
+
+ elif self.coordinator.heartbeat.poll_timeout_expired():
+ # the poll timeout has expired, which means that the
+ # foreground thread has stalled in between calls to
+ # poll(), so we explicitly leave the group.
+ log.warning('Heartbeat poll expired, leaving group')
+ self.coordinator.maybe_leave_group()
+
+ elif not self.coordinator.heartbeat.should_heartbeat():
+ # poll again after waiting for the retry backoff in case
+ # the heartbeat failed or the coordinator disconnected
+ log.log(0, 'Not ready to heartbeat, waiting')
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
- else:
- with self.coordinator._client._lock, self.coordinator._lock:
+ else:
self.coordinator.heartbeat.sent_heartbeat()
future = self.coordinator._send_heartbeat_request()
future.add_callback(self._handle_heartbeat_success)