summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2018-03-10 13:50:57 -0500
committerDana Powers <dana.powers@rd.io>2018-03-10 13:50:57 -0500
commit159f5a39deda66ec6110f5010c4b2ba56ac4b004 (patch)
tree579b7736c95dc65d13a3cff74cac483026b713d3
parentec9049c60794785ab6c7babc90759678e665ccd8 (diff)
downloadkafka-python-KAFKA_4160_rebalance_listener_without_lock.tar.gz
KAFKA-4160: Ensure rebalance listener not called with coordinator lockKAFKA_4160_rebalance_listener_without_lock
-rw-r--r--kafka/coordinator/base.py196
1 files changed, 112 insertions, 84 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 57da971..4827066 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -323,90 +323,108 @@ class BaseCoordinator(object):
return sys.maxsize
return self.heartbeat.time_to_next_heartbeat()
+ def _reset_join_group_future(self):
+ with self._lock:
+ self.join_future = None
+
+ def _initiate_join_group(self):
+ with self._lock:
+ # we store the join future in case we are woken up by the user
+ # after beginning the rebalance in the call to poll below.
+ # This ensures that we do not mistakenly attempt to rejoin
+ # before the pending rebalance has completed.
+ if self.join_future is None:
+ self.state = MemberState.REBALANCING
+ self.join_future = self._send_join_group_request()
+
+ # handle join completion in the callback so that the
+ # callback will be invoked even if the consumer is woken up
+ # before finishing the rebalance
+ self.join_future.add_callback(self._handle_join_success)
+
+ # we handle failures below after the request finishes.
+ # If the join completes after having been woken up, the
+ # exception is ignored and we will rejoin
+ self.join_future.add_errback(self._handle_join_failure)
+
+ return self.join_future
+
def _handle_join_success(self, member_assignment_bytes):
+ # handle join completion in the callback so that the callback
+ # will be invoked even if the consumer is woken up before
+ # finishing the rebalance
with self._lock:
log.info("Successfully joined group %s with generation %s",
self.group_id, self._generation.generation_id)
- self.join_future = None
self.state = MemberState.STABLE
- self.rejoining = False
- self._heartbeat_thread.enable()
- self._on_join_complete(self._generation.generation_id,
- self._generation.member_id,
- self._generation.protocol,
- member_assignment_bytes)
+ if self._heartbeat_thread is not None:
+ self._heartbeat_thread.enable()
def _handle_join_failure(self, _):
+ # we handle failures below after the request finishes.
+ # if the join completes after having been woken up,
+ # the exception is ignored and we will rejoin
with self._lock:
- self.join_future = None
self.state = MemberState.UNJOINED
def ensure_active_group(self):
"""Ensure that the group is active (i.e. joined and synced)"""
- with self._lock:
- if self._heartbeat_thread is None:
- self._start_heartbeat_thread()
-
- while self.need_rejoin():
- self.ensure_coordinator_ready()
-
- # call on_join_prepare if needed. We set a flag
- # to make sure that we do not call it a second
- # time if the client is woken up before a pending
- # rebalance completes. This must be called on each
- # iteration of the loop because an event requiring
- # a rebalance (such as a metadata refresh which
- # changes the matched subscription set) can occur
- # while another rebalance is still in progress.
- if not self.rejoining:
- self._on_join_prepare(self._generation.generation_id,
- self._generation.member_id)
- self.rejoining = True
-
- # ensure that there are no pending requests to the coordinator.
- # This is important in particular to avoid resending a pending
- # JoinGroup request.
- while not self.coordinator_unknown():
- if not self._client.in_flight_request_count(self.coordinator_id):
- break
- self._client.poll()
- else:
+ self.ensure_coordinator_ready()
+ self._start_heartbeat_thread()
+ self.join_group()
+
+ def join_group(self):
+ while self.need_rejoin():
+ self.ensure_coordinator_ready()
+
+ # call on_join_prepare if needed. We set a flag
+ # to make sure that we do not call it a second
+ # time if the client is woken up before a pending
+ # rebalance completes. This must be called on each
+ # iteration of the loop because an event requiring
+ # a rebalance (such as a metadata refresh which
+ # changes the matched subscription set) can occur
+ # while another rebalance is still in progress.
+ if not self.rejoining:
+ self._on_join_prepare(self._generation.generation_id,
+ self._generation.member_id)
+ self.rejoining = True
+
+ # fence off the heartbeat thread explicitly so that it cannot
+ # interfere with the join group. # Note that this must come after
+ # the call to onJoinPrepare since we must be able to continue
+ # sending heartbeats if that callback takes some time.
+ self._disable_heartbeat_thread()
+
+ # ensure that there are no pending requests to the coordinator.
+ # This is important in particular to avoid resending a pending
+ # JoinGroup request.
+ while not self.coordinator_unknown():
+ if not self._client.in_flight_request_count(self.coordinator_id):
+ break
+ self._client.poll()
+ else:
+ continue
+
+ future = self._initiate_join_group()
+ self._client.poll(future=future)
+ self._reset_join_group_future()
+
+ if future.succeeded():
+ self.rejoining = False
+ self._on_join_complete(self._generation.generation_id,
+ self._generation.member_id,
+ self._generation.protocol,
+ future.value)
+ else:
+ exception = future.exception
+ if isinstance(exception, (Errors.UnknownMemberIdError,
+ Errors.RebalanceInProgressError,
+ Errors.IllegalGenerationError)):
continue
-
- # we store the join future in case we are woken up by the user
- # after beginning the rebalance in the call to poll below.
- # This ensures that we do not mistakenly attempt to rejoin
- # before the pending rebalance has completed.
- if self.join_future is None:
- self.state = MemberState.REBALANCING
- future = self._send_join_group_request()
-
- self.join_future = future # this should happen before adding callbacks
-
- # handle join completion in the callback so that the
- # callback will be invoked even if the consumer is woken up
- # before finishing the rebalance
- future.add_callback(self._handle_join_success)
-
- # we handle failures below after the request finishes.
- # If the join completes after having been woken up, the
- # exception is ignored and we will rejoin
- future.add_errback(self._handle_join_failure)
-
- else:
- future = self.join_future
-
- self._client.poll(future=future)
-
- if future.failed():
- exception = future.exception
- if isinstance(exception, (Errors.UnknownMemberIdError,
- Errors.RebalanceInProgressError,
- Errors.IllegalGenerationError)):
- continue
- elif not future.retriable():
- raise exception # pylint: disable-msg=raising-bad-type
- time.sleep(self.config['retry_backoff_ms'] / 1000)
+ elif not future.retriable():
+ raise exception # pylint: disable-msg=raising-bad-type
+ time.sleep(self.config['retry_backoff_ms'] / 1000)
def _send_join_group_request(self):
"""Join the group and return the assignment for the next generation.
@@ -716,20 +734,27 @@ class BaseCoordinator(object):
self.rejoin_needed = True
def _start_heartbeat_thread(self):
- if self._heartbeat_thread is None:
- log.info('Starting new heartbeat thread')
- self._heartbeat_thread = HeartbeatThread(weakref.proxy(self))
- self._heartbeat_thread.daemon = True
- self._heartbeat_thread.start()
+ with self._lock:
+ if self._heartbeat_thread is None:
+ log.info('Starting new heartbeat thread')
+ self._heartbeat_thread = HeartbeatThread(weakref.proxy(self))
+ self._heartbeat_thread.daemon = True
+ self._heartbeat_thread.start()
+
+ def _disable_heartbeat_thread(self):
+ with self._lock:
+ if self._heartbeat_thread is not None:
+ self._heartbeat_thread.disable()
def _close_heartbeat_thread(self):
- if self._heartbeat_thread is not None:
- log.info('Stopping heartbeat thread')
- try:
- self._heartbeat_thread.close()
- except ReferenceError:
- pass
- self._heartbeat_thread = None
+ with self._lock:
+ if self._heartbeat_thread is not None:
+ log.info('Stopping heartbeat thread')
+ try:
+ self._heartbeat_thread.close()
+ except ReferenceError:
+ pass
+ self._heartbeat_thread = None
def __del__(self):
self._close_heartbeat_thread()
@@ -892,12 +917,15 @@ class HeartbeatThread(threading.Thread):
def enable(self):
with self.coordinator._lock:
+ log.debug('Enabling heartbeat thread')
self.enabled = True
self.coordinator.heartbeat.reset_timeouts()
self.coordinator._lock.notify()
def disable(self):
- self.enabled = False
+ with self.coordinator._lock:
+ log.debug('Disabling heartbeat thread')
+ self.enabled = False
def close(self):
self.closed = True