summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-01-12 18:14:53 -0800
committerDana Powers <dana.powers@gmail.com>2019-01-12 18:14:53 -0800
commit8c794658d84c27b94326163ae92b120807106c98 (patch)
tree69816bf7043ccb79ac711f87877c60ffc2e18c90
parent1a31be52ec012dfa0ef5079ff9982e01408a8fe1 (diff)
downloadkafka-python-consumer_rejoin_fixes.tar.gz
Improve KafkaConsumer join group / only enable Heartbeat Thread during stable groupconsumer_rejoin_fixes
-rw-r--r--kafka/coordinator/base.py34
1 files changed, 23 insertions, 11 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 8ce9a24..1435183 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -331,18 +331,13 @@ class BaseCoordinator(object):
with self._lock:
log.info("Successfully joined group %s with generation %s",
self.group_id, self._generation.generation_id)
- self.join_future = None
self.state = MemberState.STABLE
- self.rejoining = False
- self._heartbeat_thread.enable()
- self._on_join_complete(self._generation.generation_id,
- self._generation.member_id,
- self._generation.protocol,
- member_assignment_bytes)
+ self.rejoin_needed = False
+ if self._heartbeat_thread:
+ self._heartbeat_thread.enable()
def _handle_join_failure(self, _):
with self._lock:
- self.join_future = None
self.state = MemberState.UNJOINED
def ensure_active_group(self):
@@ -351,7 +346,7 @@ class BaseCoordinator(object):
if self._heartbeat_thread is None:
self._start_heartbeat_thread()
- while self.need_rejoin():
+ while self.need_rejoin() or self._rejoin_incomplete():
self.ensure_coordinator_ready()
# call on_join_prepare if needed. We set a flag
@@ -382,6 +377,12 @@ class BaseCoordinator(object):
# This ensures that we do not mistakenly attempt to rejoin
# before the pending rebalance has completed.
if self.join_future is None:
+ # Fence off the heartbeat thread explicitly so that it cannot
+ # interfere with the join group. Note that this must come after
+ # the call to _on_join_prepare since we must be able to continue
+ # sending heartbeats if that callback takes some time.
+ self._heartbeat_thread.disable()
+
self.state = MemberState.REBALANCING
future = self._send_join_group_request()
@@ -402,7 +403,16 @@ class BaseCoordinator(object):
self._client.poll(future=future)
- if future.failed():
+ if future.succeeded():
+ self._on_join_complete(self._generation.generation_id,
+ self._generation.member_id,
+ self._generation.protocol,
+ future.value)
+ self.join_future = None
+ self.rejoining = False
+
+ else:
+ self.join_future = None
exception = future.exception
if isinstance(exception, (Errors.UnknownMemberIdError,
Errors.RebalanceInProgressError,
@@ -412,6 +422,9 @@ class BaseCoordinator(object):
raise exception # pylint: disable-msg=raising-bad-type
time.sleep(self.config['retry_backoff_ms'] / 1000)
+ def _rejoin_incomplete(self):
+ return self.join_future is not None
+
def _send_join_group_request(self):
"""Join the group and return the assignment for the next generation.
@@ -497,7 +510,6 @@ class BaseCoordinator(object):
self._generation = Generation(response.generation_id,
response.member_id,
response.group_protocol)
- self.rejoin_needed = False
if response.leader_id == response.member_id:
log.info("Elected group leader -- performing partition"