summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/group.py21
-rw-r--r--kafka/coordinator/base.py583
-rw-r--r--kafka/coordinator/consumer.py83
-rw-r--r--test/test_coordinator.py6
4 files changed, 501 insertions, 192 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index e37be0e..42b59b9 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -263,7 +263,7 @@ class KafkaConsumer(six.Iterator):
'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
'max_poll_records': 500,
'max_poll_interval_ms': 300000,
- 'session_timeout_ms': 10000,
+ 'session_timeout_ms': 10000, # XXX should be 30000 if < 0.11
'heartbeat_interval_ms': 3000,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
@@ -294,12 +294,13 @@ class KafkaConsumer(six.Iterator):
def __init__(self, *topics, **configs):
self.config = copy.copy(self.DEFAULT_CONFIG)
+ configs_copy = copy.copy(configs)
for key in self.config:
if key in configs:
- self.config[key] = configs.pop(key)
+ self.config[key] = configs_copy.pop(key)
# Only check for extra config keys in top-level class
- assert not configs, 'Unrecognized configs: %s' % configs
+ assert not configs_copy, 'Unrecognized configs: %s' % configs_copy
deprecated = {'smallest': 'earliest', 'largest': 'latest'}
if self.config['auto_offset_reset'] in deprecated:
@@ -602,16 +603,6 @@ class KafkaConsumer(six.Iterator):
"""
self._coordinator.poll()
- """
- if self._use_consumer_group():
- self._coordinator.ensure_coordinator_known()
- self._coordinator.ensure_active_group()
-
- # 0.8.2 brokers support kafka-backed offset storage via group coordinator
- elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
- self._coordinator.ensure_coordinator_known()
- """
-
# Fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
if not self._subscription.has_all_fetch_positions():
@@ -1023,12 +1014,12 @@ class KafkaConsumer(six.Iterator):
while time.time() < self._consumer_timeout:
if self._use_consumer_group():
- self._coordinator.ensure_coordinator_known()
+ self._coordinator.ensure_coordinator_ready()
self._coordinator.ensure_active_group()
# 0.8.2 brokers support kafka-backed offset storage via group coordinator
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
- self._coordinator.ensure_coordinator_known()
+ self._coordinator.ensure_coordinator_ready()
# Fetch offsets for any subscribed partitions that we arent tracking yet
if not self._subscription.has_all_fetch_positions():
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index af0936c..7c1d468 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -3,6 +3,7 @@ from __future__ import absolute_import, division
import abc
import copy
import logging
+import threading
import time
import weakref
@@ -20,6 +21,28 @@ from ..protocol.group import (HeartbeatRequest, JoinGroupRequest,
log = logging.getLogger('kafka.coordinator')
+class MemberState(object):
+ UNJOINED = '<unjoined>' # the client is not part of a group
+ REBALANCING = '<rebalancing>' # the client has begun rebalancing
+ STABLE = '<stable>' # the client has joined and is sending heartbeats
+
+
+class Generation(object):
+ def __init__(self, generation_id, member_id, protocol):
+ self.generation_id = generation_id
+ self.member_id = member_id
+ self.protocol = protocol
+
+Generation.NO_GENERATION = Generation(
+ OffsetCommitRequest[2].DEFAULT_GENERATION_ID,
+ JoinGroupRequest[0].UNKNOWN_MEMBER_ID,
+ None)
+
+
+class UnjoinedGroupException(Errors.KafkaError):
+ retriable = True
+
+
class BaseCoordinator(object):
"""
BaseCoordinator implements group management for a single group member
@@ -47,12 +70,21 @@ class BaseCoordinator(object):
:meth:`.group_protocols` and the format of the state assignment provided by
the leader in :meth:`._perform_assignment` and which becomes available to
members in :meth:`._on_join_complete`.
+
+ Note on locking: this class shares state between the caller and a background
+ thread which is used for sending heartbeats after the client has joined the
+ group. All mutable state as well as state transitions are protected with the
+ class's monitor. Generally this means acquiring the lock before reading or
+ writing the state of the group (e.g. generation, member_id) and holding the
+ lock when sending a request that affects the state of the group
+ (e.g. JoinGroup, LeaveGroup).
"""
DEFAULT_CONFIG = {
'group_id': 'kafka-python-default-group',
- 'session_timeout_ms': 30000,
+ 'session_timeout_ms': 10000, # XXX 30000 for < 0.11 brokers
'heartbeat_interval_ms': 3000,
+ 'max_poll_interval_ms': 300000,
'retry_backoff_ms': 100,
'api_version': (0, 9),
'metric_group_prefix': '',
@@ -84,14 +116,16 @@ class BaseCoordinator(object):
self.config[key] = configs[key]
self._client = client
- self.generation = OffsetCommitRequest[2].DEFAULT_GENERATION_ID
- self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
self.group_id = self.config['group_id']
- self.coordinator_id = None
- self.rejoin_needed = True
- self.rejoining = False
self.heartbeat = Heartbeat(**self.config)
- self.heartbeat_task = HeartbeatTask(weakref.proxy(self))
+ self._heartbeat_thread = None
+ self._lock = threading.Condition()
+ self.rejoin_needed = True
+ self.rejoining = False # renamed / complement of java needsJoinPrepare
+ self.state = MemberState.UNJOINED
+ self.join_future = None
+ self.coordinator_id = None
+ self._generation = Generation.NO_GENERATION
self.sensors = GroupCoordinatorMetrics(self.heartbeat, metrics,
self.config['metric_group_prefix'])
@@ -102,7 +136,7 @@ class BaseCoordinator(object):
@abc.abstractmethod
def protocol_type(self):
"""
- Unique identifier for the class of protocols implements
+ Unique identifier for the class of supported protocols
(e.g. "consumer" or "connect").
Returns:
@@ -186,43 +220,51 @@ class BaseCoordinator(object):
Returns:
bool: True if the coordinator is unknown
"""
- if self.coordinator_id is None:
- return True
+ return self.coordinator() is None
- if self._client.is_disconnected(self.coordinator_id):
- self.coordinator_dead('Node Disconnected')
- return True
+ def coordinator(self):
+ """Get the current coordinator
- return False
+ Returns: the current coordinator id or None if it is unknown
+ """
+ with self._lock:
+ if self.coordinator_id is None:
+ return None
+ elif self._client.is_disconnected(self.coordinator_id):
+ self.coordinator_dead('Node Disconnected')
+ return None
+ else:
+ return self.coordinator_id
- def ensure_coordinator_known(self):
+ def ensure_coordinator_ready(self):
"""Block until the coordinator for this group is known
(and we have an active connection -- java client uses unsent queue).
"""
- while self.coordinator_unknown():
-
- # Prior to 0.8.2 there was no group coordinator
- # so we will just pick a node at random and treat
- # it as the "coordinator"
- if self.config['api_version'] < (0, 8, 2):
- self.coordinator_id = self._client.least_loaded_node()
- if self.coordinator_id is not None:
- self._client.ready(self.coordinator_id)
- continue
-
- future = self._send_group_coordinator_request()
- self._client.poll(future=future)
-
- if future.failed():
- if future.retriable():
- if getattr(future.exception, 'invalid_metadata', False):
- log.debug('Requesting metadata for group coordinator request: %s', future.exception)
- metadata_update = self._client.cluster.request_update()
- self._client.poll(future=metadata_update)
+ with self._lock:
+ while self.coordinator_unknown():
+
+ # Prior to 0.8.2 there was no group coordinator
+ # so we will just pick a node at random and treat
+ # it as the "coordinator"
+ if self.config['api_version'] < (0, 8, 2):
+ self.coordinator_id = self._client.least_loaded_node()
+ if self.coordinator_id is not None:
+ self._client.ready(self.coordinator_id)
+ continue
+
+ future = self._send_group_coordinator_request()
+ self._client.poll(future=future)
+
+ if future.failed():
+ if future.retriable():
+ if getattr(future.exception, 'invalid_metadata', False):
+ log.debug('Requesting metadata for group coordinator request: %s', future.exception)
+ metadata_update = self._client.cluster.request_update()
+ self._client.poll(future=metadata_update)
+ else:
+ time.sleep(self.config['retry_backoff_ms'] / 1000)
else:
- time.sleep(self.config['retry_backoff_ms'] / 1000)
- else:
- raise future.exception # pylint: disable-msg=raising-bad-type
+ raise future.exception # pylint: disable-msg=raising-bad-type
def need_rejoin(self):
"""Check whether the group should be rejoined (e.g. if metadata changes)
@@ -230,49 +272,117 @@ class BaseCoordinator(object):
Returns:
bool: True if it should, False otherwise
"""
- return self.rejoin_needed
+ with self._lock:
+ return self.rejoin_needed
+
+ def poll_heartbeat(self):
+ """
+ Check the status of the heartbeat thread (if it is active) and indicate
+ the liveness of the client. This must be called periodically after
+ joining with :meth:`.ensureActiveGroup` to ensure that the member stays
+ in the group. If an interval of time longer than the provided rebalance
+ timeout expires without calling this method, then the client will
+ proactively leave the group.
+
+ Raises: RuntimeError for unexpected errors raised from the heartbeat thread
+ """
+ with self._lock:
+ if self._heartbeat_thread is not None:
+ if self._heartbeat_thread.failed:
+ # set the heartbeat thread to null and raise an exception.
+ # If the user catches it, the next call to ensure_active_group()
+ # will spawn a new heartbeat thread.
+ cause = self._heartbeat_thread.failed
+ self._heartbeat_thread = None
+ raise cause
+ self.heartbeat.poll()
+
+ def time_to_next_heartbeat(self):
+ with self._lock:
+ # if we have not joined the group, we don't need to send heartbeats
+ if self.state is MemberState.UNJOINED:
+ return sys.maxsize
+ return self.heartbeat.time_to_next_heartbeat()
+
+ def _handle_join_success(self, member_assignment_bytes):
+ with self._lock:
+ log.info("Successfully joined group %s with generation %s",
+ self.group_id, self._generation.generation_id)
+ self.join_future = None
+ self.state = MemberState.STABLE
+ self.rejoining = False
+ self._heartbeat_thread.enable()
+ self._on_join_complete(self._generation.generation_id,
+ self._generation.member_id,
+ self._generation.protocol,
+ member_assignment_bytes)
+
+ def _handle_join_failure(self, exception):
+ with self._lock:
+ self.join_future = None
+ self.state = MemberState.UNJOINED
def ensure_active_group(self):
"""Ensure that the group is active (i.e. joined and synced)"""
- if not self.need_rejoin():
- return
-
- if not self.rejoining:
- self._on_join_prepare(self.generation, self.member_id)
- self.rejoining = True
-
- while self.need_rejoin():
- self.ensure_coordinator_known()
-
- # ensure that there are no pending requests to the coordinator.
- # This is important in particular to avoid resending a pending
- # JoinGroup request.
- while not self.coordinator_unknown():
- if not self._client.in_flight_request_count(self.coordinator_id):
- break
- self._client.poll(delayed_tasks=False)
- else:
- continue
-
- future = self._send_join_group_request()
- self._client.poll(future=future)
+ with self._lock:
+ if not self.need_rejoin():
+ return
- if future.succeeded():
- member_assignment_bytes = future.value
- self._on_join_complete(self.generation, self.member_id,
- self.protocol, member_assignment_bytes)
- self.rejoining = False
- self.heartbeat_task.reset()
- else:
- assert future.failed()
- exception = future.exception
- if isinstance(exception, (Errors.UnknownMemberIdError,
- Errors.RebalanceInProgressError,
- Errors.IllegalGenerationError)):
+ # call on_join_prepare if needed. We set a flag to make sure that
+ # we do not call it a second time if the client is woken up before
+ # a pending rebalance completes.
+ if not self.rejoining:
+ self._on_join_prepare(self._generation.generation_id,
+ self._generation.member_id)
+ self.rejoining = True
+
+ if self._heartbeat_thread is None:
+ self._heartbeat_thread = HeartbeatThread(weakref.proxy(self))
+ self._heartbeat_thread.start()
+
+ while self.need_rejoin():
+ self.ensure_coordinator_ready()
+
+ # ensure that there are no pending requests to the coordinator.
+ # This is important in particular to avoid resending a pending
+ # JoinGroup request.
+ while not self.coordinator_unknown():
+ if not self._client.in_flight_request_count(self.coordinator_id):
+ break
+ self._client.poll(delayed_tasks=False)
+ else:
continue
- elif not future.retriable():
- raise exception # pylint: disable-msg=raising-bad-type
- time.sleep(self.config['retry_backoff_ms'] / 1000)
+
+ # we store the join future in case we are woken up by the user
+ # after beginning the rebalance in the call to poll below.
+ # This ensures that we do not mistakenly attempt to rejoin
+ # before the pending rebalance has completed.
+ if self.join_future is None:
+ self.state = MemberState.REBALANCING
+ self.join_future = self._send_join_group_request()
+
+ # handle join completion in the callback so that the
+ # callback will be invoked even if the consumer is woken up
+ # before finishing the rebalance
+ self.join_future.add_callback(self._handle_join_success)
+
+ # we handle failures below after the request finishes.
+ # If the join completes after having been woken up, the
+ # exception is ignored and we will rejoin
+ self.join_future.add_errback(self._handle_join_failure)
+
+ future = self.join_future
+ self._client.poll(future=future)
+
+ if future.failed():
+ exception = future.exception
+ if isinstance(exception, (Errors.UnknownMemberIdError,
+ Errors.RebalanceInProgressError,
+ Errors.IllegalGenerationError)):
+ continue
+ elif not future.retriable():
+ raise exception # pylint: disable-msg=raising-bad-type
+ time.sleep(self.config['retry_backoff_ms'] / 1000)
def _send_join_group_request(self):
"""Join the group and return the assignment for the next generation.
@@ -294,14 +404,35 @@ class BaseCoordinator(object):
# send a join group request to the coordinator
log.info("(Re-)joining group %s", self.group_id)
- request = JoinGroupRequest[0](
- self.group_id,
- self.config['session_timeout_ms'],
- self.member_id,
- self.protocol_type(),
- [(protocol,
- metadata if isinstance(metadata, bytes) else metadata.encode())
- for protocol, metadata in self.group_protocols()])
+ member_metadata = [
+ (protocol, metadata if isinstance(metadata, bytes) else metadata.encode())
+ for protocol, metadata in self.group_protocols()
+ ]
+ if self.config['api_version'] < (0, 9):
+ raise Errors.KafkaError('JoinGroupRequest api requires 0.9+ brokers')
+ elif (0, 9) <= self.config['api_version'] < (0, 10, 1):
+ request = JoinGroupRequest[0](
+ self.group_id,
+ self.config['session_timeout_ms'], # or max(session, max_poll_interval_ms) ?
+ self.member_id,
+ self.protocol_type(),
+ member_metadata)
+ elif (0, 10, 1) <= self.config['api_version'] < (0, 11, 0):
+ request = JoinGroupRequest[1](
+ self.group_id,
+ self.config['session_timeout_ms'],
+ self.config['max_poll_interval_ms'],
+ self.member_id,
+ self.protocol_type(),
+ member_metadata)
+ else:
+ request = JoinGroupRequest[2](
+ self.group_id,
+ self.config['session_timeout_ms'],
+ self.config['max_poll_interval_ms'],
+ self.member_id,
+ self.protocol_type(),
+ member_metadata)
# create the request for the coordinator
log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id)
@@ -327,19 +458,25 @@ class BaseCoordinator(object):
if error_type is Errors.NoError:
log.debug("Received successful JoinGroup response for group %s: %s",
self.group_id, response)
- self.member_id = response.member_id
- self.generation = response.generation_id
- self.rejoin_needed = False
- self.protocol = response.group_protocol
- log.info("Joined group '%s' (generation %s) with member_id %s",
- self.group_id, self.generation, self.member_id)
self.sensors.join_latency.record((time.time() - send_time) * 1000)
- if response.leader_id == response.member_id:
- log.info("Elected group leader -- performing partition"
- " assignments using %s", self.protocol)
- self._on_join_leader(response).chain(future)
- else:
- self._on_join_follower().chain(future)
+ with self._lock:
+ if self.state is not MemberState.REBALANCING:
+ # if the consumer was woken up before a rebalance completes,
+ # we may have already left the group. In this case, we do
+ # not want to continue with the sync group.
+ future.failure(UnjoinedGroupException())
+ else:
+ self._generation = Generation(response.generation_id,
+ response.member_id,
+ response.group_protocol)
+ self.rejoin_needed = False
+
+ if response.leader_id == response.member_id:
+ log.info("Elected group leader -- performing partition"
+ " assignments using %s", self.protocol)
+ self._on_join_leader(response).chain(future)
+ else:
+ self._on_join_follower().chain(future)
elif error_type is Errors.GroupLoadInProgressError:
log.debug("Attempt to join group %s rejected since coordinator %s"
@@ -349,7 +486,7 @@ class BaseCoordinator(object):
elif error_type is Errors.UnknownMemberIdError:
# reset the member id and retry immediately
error = error_type(self.member_id)
- self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
+ self.reset_generation()
log.debug("Attempt to join group %s failed due to unknown member id",
self.group_id)
future.failure(error)
@@ -379,10 +516,11 @@ class BaseCoordinator(object):
def _on_join_follower(self):
# send follower's sync group with an empty assignment
- request = SyncGroupRequest[0](
+ version = 0 if self.config['api_version'] < (0, 11, 0) else 1
+ request = SyncGroupRequest[version](
self.group_id,
- self.generation,
- self.member_id,
+ self._generation.generation_id,
+ self._generation.member_id,
{})
log.debug("Sending follower SyncGroup for group %s to coordinator %s: %s",
self.group_id, self.coordinator_id, request)
@@ -406,10 +544,11 @@ class BaseCoordinator(object):
except Exception as e:
return Future().failure(e)
- request = SyncGroupRequest[0](
+ version = 0 if self.config['api_version'] < (0, 11, 0) else 1
+ request = SyncGroupRequest[version](
self.group_id,
- self.generation,
- self.member_id,
+ self._generation.generation_id,
+ self._generation.member_id,
[(member_id,
assignment if isinstance(assignment, bytes) else assignment.encode())
for member_id, assignment in six.iteritems(group_assignment)])
@@ -439,14 +578,12 @@ class BaseCoordinator(object):
def _handle_sync_group_response(self, future, send_time, response):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
- log.info("Successfully joined group %s with generation %s",
- self.group_id, self.generation)
self.sensors.sync_latency.record((time.time() - send_time) * 1000)
future.success(response.member_assignment)
return
# Always rejoin on error
- self.rejoin_needed = True
+ self.request_rejoin()
if error_type is Errors.GroupAuthorizationFailedError:
future.failure(error_type(self.group_id))
elif error_type is Errors.RebalanceInProgressError:
@@ -457,7 +594,7 @@ class BaseCoordinator(object):
Errors.IllegalGenerationError):
error = error_type()
log.debug("SyncGroup for group %s failed due to %s", self.group_id, error)
- self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
+ self.reset_generation()
future.failure(error)
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError):
@@ -495,30 +632,24 @@ class BaseCoordinator(object):
def _handle_group_coordinator_response(self, future, response):
log.debug("Received group coordinator response %s", response)
- if not self.coordinator_unknown():
- # We already found the coordinator, so ignore the request
- log.debug("Coordinator already known -- ignoring metadata response")
- future.success(self.coordinator_id)
- return
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
- ok = self._client.cluster.add_group_coordinator(self.group_id, response)
- if not ok:
- # This could happen if coordinator metadata is different
- # than broker metadata
- future.failure(Errors.IllegalStateError())
- return
-
- self.coordinator_id = response.coordinator_id
- log.info("Discovered coordinator %s for group %s",
- self.coordinator_id, self.group_id)
- self._client.ready(self.coordinator_id)
-
- # start sending heartbeats only if we have a valid generation
- if self.generation > 0:
- self.heartbeat_task.reset()
+ with self._lock:
+ ok = self._client.cluster.add_group_coordinator(self.group_id, response)
+ if not ok:
+ # This could happen if coordinator metadata is different
+ # than broker metadata
+ future.failure(Errors.IllegalStateError())
+ return
+
+ self.coordinator_id = response.coordinator_id
+ log.info("Discovered coordinator %s for group %s",
+ self.coordinator_id, self.group_id)
+ self._client.ready(self.coordinator_id)
+ self.heartbeat.reset_timeouts()
future.success(self.coordinator_id)
+
elif error_type is Errors.GroupCoordinatorNotAvailableError:
log.debug("Group Coordinator Not Available; retry")
future.failure(error_type())
@@ -528,45 +659,74 @@ class BaseCoordinator(object):
future.failure(error)
else:
error = error_type()
- log.error("Unrecognized failure in Group Coordinator Request: %s",
- error)
+ log.error("Group coordinator lookup for group %s failed: %s",
+ self.group_id, error)
future.failure(error)
def coordinator_dead(self, error):
"""Mark the current coordinator as dead."""
- if self.coordinator_id is not None:
- log.warning("Marking the coordinator dead (node %s) for group %s: %s.",
- self.coordinator_id, self.group_id, error)
- self.coordinator_id = None
+ with self._lock:
+ if self.coordinator_id is not None:
+ log.warning("Marking the coordinator dead (node %s) for group %s: %s.",
+ self.coordinator_id, self.group_id, error)
+ self.coordinator_id = None
+
+ def generation(self):
+ """Get the current generation state if the group is stable.
+
+ Returns: the current generation or None if the group is unjoined/rebalancing
+ """
+ with self._lock:
+ if self.state is not MemberState.STABLE:
+ return None
+ return self._generation
+
+ def reset_generation(self):
+ """Reset the generation and memberId because we have fallen out of the group."""
+ with self._lock:
+ self._generation = Generation.NO_GENERATION
+ self.rejoin_needed = True
+ self.state = MemberState.UNJOINED
+
+ def request_rejoin(self):
+ with self._lock:
+ self.rejoin_needed = True
def close(self):
"""Close the coordinator, leave the current group,
and reset local generation / member_id"""
- try:
- self._client.unschedule(self.heartbeat_task)
- except KeyError:
- pass
-
- if not self.coordinator_unknown() and self.generation > 0:
- # this is a minimal effort attempt to leave the group. we do not
- # attempt any resending if the request fails or times out.
- log.info('Leaving consumer group (%s).', self.group_id)
- request = LeaveGroupRequest[0](self.group_id, self.member_id)
- future = self._client.send(self.coordinator_id, request)
- future.add_callback(self._handle_leave_group_response)
- future.add_errback(log.error, "LeaveGroup request failed: %s")
- self._client.poll(future=future)
-
- self.generation = OffsetCommitRequest[2].DEFAULT_GENERATION_ID
- self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
- self.rejoin_needed = True
+ with self._lock:
+ if self._heartbeat_thread is not None:
+ self._heartbeat_thread.close()
+ self.maybe_leave_group()
+
+ def maybe_leave_group(self):
+ """Leave the current group and reset local generation/memberId."""
+ with self._lock:
+ if (not self.coordinator_unknown()
+ and self.state is not MemberState.UNJOINED
+ and self._generation is not Generation.NO_GENERATION):
+
+ # this is a minimal effort attempt to leave the group. we do not
+ # attempt any resending if the request fails or times out.
+ log.info('Leaving consumer group (%s).', self.group_id)
+ version = 0 if self.config['api_version'] < (0, 11, 0) else 1
+ request = LeaveGroupRequest[version](self.group_id, self._generation.member_id)
+ future = self._client.send(self.coordinator_id, request)
+ future.add_callback(self._handle_leave_group_response)
+ future.add_errback(log.error, "LeaveGroup request failed: %s")
+ self._client.poll(future=future, wakeup=False) # XXX
+
+ self.reset_generation()
def _handle_leave_group_response(self, response):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
- log.info("LeaveGroup request succeeded")
+ log.debug("LeaveGroup request for group %s returned successfully",
+ self.group_id)
else:
- log.error("LeaveGroup request failed: %s", error_type())
+ log.error("LeaveGroup request for group %s failed with error: %s",
+ self.group_id, error_type())
def _send_heartbeat_request(self):
"""Send a heartbeat request"""
@@ -578,7 +738,10 @@ class BaseCoordinator(object):
e = Errors.NodeNotReadyError(self.coordinator_id)
return Future().failure(e)
- request = HeartbeatRequest[0](self.group_id, self.generation, self.member_id)
+ version = 0 if self.config['api_version'] < (0, 11, 0) else 1
+ request = HeartbeatRequest[version](self.group_id,
+ self._generation.generation_id,
+ self._generation.member_id)
log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member
future = Future()
_f = self._client.send(self.coordinator_id, request)
@@ -598,24 +761,23 @@ class BaseCoordinator(object):
Errors.NotCoordinatorForGroupError):
log.warning("Heartbeat failed for group %s: coordinator (node %s)"
" is either not started or not valid", self.group_id,
- self.coordinator_id)
+ self.coordinator())
self.coordinator_dead(error_type())
future.failure(error_type())
elif error_type is Errors.RebalanceInProgressError:
log.warning("Heartbeat failed for group %s because it is"
" rebalancing", self.group_id)
- self.rejoin_needed = True
+ self.request_rejoin()
future.failure(error_type())
elif error_type is Errors.IllegalGenerationError:
log.warning("Heartbeat failed for group %s: generation id is not "
" current.", self.group_id)
- self.rejoin_needed = True
+ self.reset_generation()
future.failure(error_type())
elif error_type is Errors.UnknownMemberIdError:
log.warning("Heartbeat: local member_id was not recognized;"
" this consumer needs to re-join")
- self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
- self.rejoin_needed = True
+ self.reset_generation()
future.failure(error_type)
elif error_type is Errors.GroupAuthorizationFailedError:
error = error_type(self.group_id)
@@ -743,6 +905,107 @@ class GroupCoordinatorMetrics(object):
metrics.add_metric(metrics.metric_name(
'last-heartbeat-seconds-ago', self.metric_group_name,
- 'The number of seconds since the last controller heartbeat',
+ 'The number of seconds since the last controller heartbeat was sent',
tags), AnonMeasurable(
lambda _, now: (now / 1000) - self.heartbeat.last_send))
+
+
+class HeartbeatThread(threading.Thread):
+ def __init__(self, coordinator):
+ super(HeartbeatThread, self).__init__()
+ self.coordinator = coordinator
+ self.enabled = False
+ self.closed = False
+ self.failed = None
+
+ def enable(self):
+ with self.coordinator._lock:
+ self.enabled = True
+ self.coordinator.heartbeat.reset_timeouts()
+ self.coordinator._lock.notify()
+
+ def disable(self):
+ with self.coordinator._lock:
+ self.enabled = False
+
+ def close(self):
+ with self.coordinator._lock:
+ self.closed = True
+ self.coordinator._lock.notify()
+
+ def run(self):
+ try:
+ find_coordinator_future = None
+
+ while True:
+ with self.coordinator._lock:
+ if self.closed:
+ return
+
+ if not self.enabled:
+ self.coordinator._lock.wait()
+ continue
+
+ if self.coordinator.state is not MemberState.STABLE:
+ # the group is not stable (perhaps because we left the
+ # group or because the coordinator kicked us out), so
+ # disable heartbeats and wait for the main thread to rejoin.
+ self.disable()
+ continue
+
+ # Disabling wakeup here is intended to prevent a call to
+ # consumer.wakeup() to propagate an exception to this
+ # heartbeat thread
+ self.coordinator._client.poll(wakeup=False) # XXX
+
+ if self.coordinator.coordinator_unknown():
+ if find_coordinator_future is None or find_coordinator_future.is_done():
+ find_coordinator_future = self.lookup_coordinator()
+ else:
+ self.coordinator._lock.wait(self.config['retry_backoff_ms'] / 1000)
+
+ elif self.coordinator.heartbeat.session_timeout_expired():
+ # the session timeout has expired without seeing a
+ # successful heartbeat, so we should probably make sure
+ # the coordinator is still healthy.
+ self.coordinator.coordinator_dead()
+
+ elif self.coordinator.heartbeat.poll_timeout_expired():
+ # the poll timeout has expired, which means that the
+ # foreground thread has stalled in between calls to
+ # poll(), so we explicitly leave the group.
+ self.coordinator.maybe_leave_group()
+
+ elif not self.coordinator.heartbeat.should_heartbeat():
+ # poll again after waiting for the retry backoff in case
+ # the heartbeat failed or the coordinator disconnected
+ self.coordinator._lock.wait(self.config['retry_backoff_ms'] / 1000)
+
+ else:
+ self.coordinator.heartbeat.send_heartbeat()
+ future = self.coordinator.send_heartbeat_request()
+ future.add_callback(self._handle_heartbeat_success)
+ future.add_errback(self._handle_heartbeat_failure)
+
+ except RuntimeError as e:
+ log.error("Heartbeat thread for group %s failed due to unexpected error",
+ self.coordinator.group_id, e)
+ self.failed = e
+
+ def _handle_heartbeat_success(self, result):
+ with self.coordinator._lock:
+ self.coordinator.heartbeat.receive_heartbeat()
+
+ def _handle_heartbeat_failure(self, exception):
+ with self.coordinator._lock:
+ if isinstance(exception, Errors.RebalanceInProgressError):
+ # it is valid to continue heartbeating while the group is
+ # rebalancing. This ensures that the coordinator keeps the
+ # member in the group for as long as the duration of the
+ # rebalance timeout. If we stop sending heartbeats, however,
+ # then the session timeout may expire before we can rejoin.
+ self.coordinator.heartbeat.receive_heartbeat()
+ else:
+ self.coordinator.heartbeat.fail_heartbeat()
+ # wake up the thread if it's sleeping to reschedule the heartbeat
+ self.coordinator._lock.notify()
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index d517dd7..344d6c1 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -1,4 +1,4 @@
-from __future__ import absolute_import
+from __future__ import absolute_import, division
import copy
import collections
@@ -90,10 +90,9 @@ class ConsumerCoordinator(BaseCoordinator):
self._metadata_snapshot = self._build_metadata_snapshot(subscription, client.cluster)
self._assignment_snapshot = None
self._cluster = client.cluster
- self._cluster.request_update()
- self._cluster.add_listener(WeakMethod(self._handle_metadata_update))
+ self.auto_commit_interval = self.config['auto_commit_interval_ms'] / 1000
+ self.next_auto_commit_deadline = None
- self._auto_commit_task = None
if self.config['enable_auto_commit']:
if self.config['api_version'] < (0, 8, 1):
log.warning('Broker version (%s) does not support offset'
@@ -104,13 +103,14 @@ class ConsumerCoordinator(BaseCoordinator):
log.warning('group_id is None: disabling auto-commit.')
self.config['enable_auto_commit'] = False
else:
- interval = self.config['auto_commit_interval_ms'] / 1000.0
- self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)
- self._auto_commit_task.reschedule()
+ self.next_auto_commit_deadline = time.time() + self.auto_commit_interval
self.consumer_sensors = ConsumerCoordinatorMetrics(
metrics, self.config['metric_group_prefix'], self._subscription)
+ self._cluster.request_update()
+ self._cluster.add_listener(WeakMethod(self._handle_metadata_update))
+
def __del__(self):
if hasattr(self, '_cluster') and self._cluster:
self._cluster.remove_listener(WeakMethod(self._handle_metadata_update))
@@ -210,8 +210,7 @@ class ConsumerCoordinator(BaseCoordinator):
assignor.on_assignment(assignment)
# reschedule the auto commit starting from now
- if self._auto_commit_task:
- self._auto_commit_task.reschedule()
+ self.next_auto_commit_deadline = time.time() + self.auto_commit_interval
assigned = set(self._subscription.assigned_partitions())
log.info("Setting newly assigned partitions %s for group %s",
@@ -227,6 +226,54 @@ class ConsumerCoordinator(BaseCoordinator):
self._subscription.listener, self.group_id,
assigned)
+ def poll(self):
+ """
+ Poll for coordinator events. This ensures that the coordinator is known and that the consumer
+ has joined the group (if it is using group management). This also handles periodic offset commits
+ if they are enabled.
+
+ @param now current time in milliseconds
+ """
+ self._invoke_completed_offset_commit_callbacks()
+
+ if self._subscription.partitions_auto_assigned() and self.coordinator_unknown():
+ self.ensure_coordinator_ready()
+
+ if self._subscription.partitions_auto_assigned() and self.need_rejoin():
+ # due to a race condition between the initial metadata fetch and the
+ # initial rebalance, we need to ensure that the metadata is fresh
+ # before joining initially, and then request the metadata update. If
+ # metadata update arrives while the rebalance is still pending (for
+ # example, when the join group is still inflight), then we will lose
+ # track of the fact that we need to rebalance again to reflect the
+ # change to the topic subscription. Without ensuring that the
+ # metadata is fresh, any metadata update that changes the topic
+ # subscriptions and arrives with a rebalance in progress will
+ # essentially be ignored. See KAFKA-3949 for the complete
+ # description of the problem.
+ if self._subscription.subscribed_pattern:
+ self._client.ensure_fresh_metadata() # XXX
+
+ self.ensure_active_group()
+
+ self.poll_heartbeat()
+ self._maybe_auto_commit_offsets_async()
+
+ def time_to_next_poll(self):
+ """
+ Return the time to the next needed invocation of {@link #poll(long)}.
+ @param now current time in milliseconds
+ @return the maximum time in milliseconds the caller should wait before the next invocation of poll()
+ """
+ if not self.config['enable_auto_commit']:
+ return self.time_to_next_heartbeat()
+
+ if time.time() > self.next_auto_commit_deadline:
+ return 0
+
+ return min(self.next_auto_commit_deadline - time.time(),
+ self.time_to_next_heartbeat())
+
def _perform_assignment(self, leader_id, assignment_strategy, members):
assignor = self._lookup_assignor(assignment_strategy)
assert assignor, 'Invalid assignment protocol: %s' % assignment_strategy
@@ -315,7 +362,7 @@ class ConsumerCoordinator(BaseCoordinator):
return {}
while True:
- self.ensure_coordinator_known()
+ self.ensure_coordinator_ready()
# contact coordinator to fetch committed offsets
future = self._send_offset_fetch_request(partitions)
@@ -327,7 +374,7 @@ class ConsumerCoordinator(BaseCoordinator):
if not future.retriable():
raise future.exception # pylint: disable-msg=raising-bad-type
- time.sleep(self.config['retry_backoff_ms'] / 1000.0)
+ time.sleep(self.config['retry_backoff_ms'] / 1000)
def close(self, autocommit=True):
"""Close the coordinator, leave the current group,
@@ -344,6 +391,14 @@ class ConsumerCoordinator(BaseCoordinator):
finally:
super(ConsumerCoordinator, self).close()
+ def invoke_completed_offset_commit_callbacks(self):
+ try:
+ while True:
+ completion = self.completed_offset_commits.popleft()
+ completion.invoke()
+ except IndexError:
+ pass
+
def commit_offsets_async(self, offsets, callback=None):
"""Commit specific offsets asynchronously.
@@ -386,7 +441,7 @@ class ConsumerCoordinator(BaseCoordinator):
return
while True:
- self.ensure_coordinator_known()
+ self.ensure_coordinator_ready()
future = self._send_offset_commit_request(offsets)
self._client.poll(future=future)
@@ -397,7 +452,7 @@ class ConsumerCoordinator(BaseCoordinator):
if not future.retriable():
raise future.exception # pylint: disable-msg=raising-bad-type
- time.sleep(self.config['retry_backoff_ms'] / 1000.0)
+ time.sleep(self.config['retry_backoff_ms'] / 1000)
def _maybe_auto_commit_offsets_sync(self):
if self._auto_commit_task is None:
@@ -675,7 +730,7 @@ class AutoCommitTask(object):
if self._coordinator.coordinator_unknown():
log.debug("Cannot auto-commit offsets for group %s because the"
" coordinator is unknown", self._coordinator.group_id)
- backoff = self._coordinator.config['retry_backoff_ms'] / 1000.0
+ backoff = self._coordinator.config['retry_backoff_ms'] / 1000
self.reschedule(time.time() + backoff)
return
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index 4115c03..60a687e 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -234,7 +234,7 @@ def test_fetch_committed_offsets(mocker, coordinator):
assert coordinator._client.poll.call_count == 0
# general case -- send offset fetch request, get successful future
- mocker.patch.object(coordinator, 'ensure_coordinator_known')
+ mocker.patch.object(coordinator, 'ensure_coordinator_ready')
mocker.patch.object(coordinator, '_send_offset_fetch_request',
return_value=Future().success('foobar'))
partitions = [TopicPartition('foobar', 0)]
@@ -295,7 +295,7 @@ def offsets():
def test_commit_offsets_async(mocker, coordinator, offsets):
mocker.patch.object(coordinator._client, 'poll')
- mocker.patch.object(coordinator, 'ensure_coordinator_known')
+ mocker.patch.object(coordinator, 'ensure_coordinator_ready')
mocker.patch.object(coordinator, '_send_offset_commit_request',
return_value=Future().success('fizzbuzz'))
ret = coordinator.commit_offsets_async(offsets)
@@ -304,7 +304,7 @@ def test_commit_offsets_async(mocker, coordinator, offsets):
def test_commit_offsets_sync(mocker, coordinator, offsets):
- mocker.patch.object(coordinator, 'ensure_coordinator_known')
+ mocker.patch.object(coordinator, 'ensure_coordinator_ready')
mocker.patch.object(coordinator, '_send_offset_commit_request',
return_value=Future().success('fizzbuzz'))
cli = coordinator._client