diff options
-rw-r--r-- | kafka/consumer/group.py | 21 | ||||
-rw-r--r-- | kafka/coordinator/base.py | 583 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 83 | ||||
-rw-r--r-- | test/test_coordinator.py | 6 |
4 files changed, 501 insertions, 192 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index e37be0e..42b59b9 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -263,7 +263,7 @@ class KafkaConsumer(six.Iterator): 'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor), 'max_poll_records': 500, 'max_poll_interval_ms': 300000, - 'session_timeout_ms': 10000, + 'session_timeout_ms': 10000, # XXX should be 30000 if < 0.11 'heartbeat_interval_ms': 3000, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, @@ -294,12 +294,13 @@ class KafkaConsumer(six.Iterator): def __init__(self, *topics, **configs): self.config = copy.copy(self.DEFAULT_CONFIG) + configs_copy = copy.copy(configs) for key in self.config: if key in configs: - self.config[key] = configs.pop(key) + self.config[key] = configs_copy.pop(key) # Only check for extra config keys in top-level class - assert not configs, 'Unrecognized configs: %s' % configs + assert not configs_copy, 'Unrecognized configs: %s' % configs_copy deprecated = {'smallest': 'earliest', 'largest': 'latest'} if self.config['auto_offset_reset'] in deprecated: @@ -602,16 +603,6 @@ class KafkaConsumer(six.Iterator): """ self._coordinator.poll() - """ - if self._use_consumer_group(): - self._coordinator.ensure_coordinator_known() - self._coordinator.ensure_active_group() - - # 0.8.2 brokers support kafka-backed offset storage via group coordinator - elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2): - self._coordinator.ensure_coordinator_known() - """ - # Fetch positions if we have partitions we're subscribed to that we # don't know the offset for if not self._subscription.has_all_fetch_positions(): @@ -1023,12 +1014,12 @@ class KafkaConsumer(six.Iterator): while time.time() < self._consumer_timeout: if self._use_consumer_group(): - self._coordinator.ensure_coordinator_known() + self._coordinator.ensure_coordinator_ready() self._coordinator.ensure_active_group() # 0.8.2 brokers support kafka-backed offset storage via group coordinator elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2): - self._coordinator.ensure_coordinator_known() + self._coordinator.ensure_coordinator_ready() # Fetch offsets for any subscribed partitions that we arent tracking yet if not self._subscription.has_all_fetch_positions(): diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index af0936c..7c1d468 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -3,6 +3,7 @@ from __future__ import absolute_import, division import abc import copy import logging +import threading import time import weakref @@ -20,6 +21,28 @@ from ..protocol.group import (HeartbeatRequest, JoinGroupRequest, log = logging.getLogger('kafka.coordinator') +class MemberState(object): + UNJOINED = '<unjoined>' # the client is not part of a group + REBALANCING = '<rebalancing>' # the client has begun rebalancing + STABLE = '<stable>' # the client has joined and is sending heartbeats + + +class Generation(object): + def __init__(self, generation_id, member_id, protocol): + self.generation_id = generation_id + self.member_id = member_id + self.protocol = protocol + +Generation.NO_GENERATION = Generation( + OffsetCommitRequest[2].DEFAULT_GENERATION_ID, + JoinGroupRequest[0].UNKNOWN_MEMBER_ID, + None) + + +class UnjoinedGroupException(Errors.KafkaError): + retriable = True + + class BaseCoordinator(object): """ BaseCoordinator implements group management for a single group member @@ -47,12 +70,21 @@ class BaseCoordinator(object): :meth:`.group_protocols` and the format of the state assignment provided by the leader in :meth:`._perform_assignment` and which becomes available to members in :meth:`._on_join_complete`. + + Note on locking: this class shares state between the caller and a background + thread which is used for sending heartbeats after the client has joined the + group. All mutable state as well as state transitions are protected with the + class's monitor. Generally this means acquiring the lock before reading or + writing the state of the group (e.g. generation, member_id) and holding the + lock when sending a request that affects the state of the group + (e.g. JoinGroup, LeaveGroup). """ DEFAULT_CONFIG = { 'group_id': 'kafka-python-default-group', - 'session_timeout_ms': 30000, + 'session_timeout_ms': 10000, # XXX 30000 for < 0.11 brokers 'heartbeat_interval_ms': 3000, + 'max_poll_interval_ms': 300000, 'retry_backoff_ms': 100, 'api_version': (0, 9), 'metric_group_prefix': '', @@ -84,14 +116,16 @@ class BaseCoordinator(object): self.config[key] = configs[key] self._client = client - self.generation = OffsetCommitRequest[2].DEFAULT_GENERATION_ID - self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID self.group_id = self.config['group_id'] - self.coordinator_id = None - self.rejoin_needed = True - self.rejoining = False self.heartbeat = Heartbeat(**self.config) - self.heartbeat_task = HeartbeatTask(weakref.proxy(self)) + self._heartbeat_thread = None + self._lock = threading.Condition() + self.rejoin_needed = True + self.rejoining = False # renamed / complement of java needsJoinPrepare + self.state = MemberState.UNJOINED + self.join_future = None + self.coordinator_id = None + self._generation = Generation.NO_GENERATION self.sensors = GroupCoordinatorMetrics(self.heartbeat, metrics, self.config['metric_group_prefix']) @@ -102,7 +136,7 @@ class BaseCoordinator(object): @abc.abstractmethod def protocol_type(self): """ - Unique identifier for the class of protocols implements + Unique identifier for the class of supported protocols (e.g. "consumer" or "connect"). Returns: @@ -186,43 +220,51 @@ class BaseCoordinator(object): Returns: bool: True if the coordinator is unknown """ - if self.coordinator_id is None: - return True + return self.coordinator() is None - if self._client.is_disconnected(self.coordinator_id): - self.coordinator_dead('Node Disconnected') - return True + def coordinator(self): + """Get the current coordinator - return False + Returns: the current coordinator id or None if it is unknown + """ + with self._lock: + if self.coordinator_id is None: + return None + elif self._client.is_disconnected(self.coordinator_id): + self.coordinator_dead('Node Disconnected') + return None + else: + return self.coordinator_id - def ensure_coordinator_known(self): + def ensure_coordinator_ready(self): """Block until the coordinator for this group is known (and we have an active connection -- java client uses unsent queue). """ - while self.coordinator_unknown(): - - # Prior to 0.8.2 there was no group coordinator - # so we will just pick a node at random and treat - # it as the "coordinator" - if self.config['api_version'] < (0, 8, 2): - self.coordinator_id = self._client.least_loaded_node() - if self.coordinator_id is not None: - self._client.ready(self.coordinator_id) - continue - - future = self._send_group_coordinator_request() - self._client.poll(future=future) - - if future.failed(): - if future.retriable(): - if getattr(future.exception, 'invalid_metadata', False): - log.debug('Requesting metadata for group coordinator request: %s', future.exception) - metadata_update = self._client.cluster.request_update() - self._client.poll(future=metadata_update) + with self._lock: + while self.coordinator_unknown(): + + # Prior to 0.8.2 there was no group coordinator + # so we will just pick a node at random and treat + # it as the "coordinator" + if self.config['api_version'] < (0, 8, 2): + self.coordinator_id = self._client.least_loaded_node() + if self.coordinator_id is not None: + self._client.ready(self.coordinator_id) + continue + + future = self._send_group_coordinator_request() + self._client.poll(future=future) + + if future.failed(): + if future.retriable(): + if getattr(future.exception, 'invalid_metadata', False): + log.debug('Requesting metadata for group coordinator request: %s', future.exception) + metadata_update = self._client.cluster.request_update() + self._client.poll(future=metadata_update) + else: + time.sleep(self.config['retry_backoff_ms'] / 1000) else: - time.sleep(self.config['retry_backoff_ms'] / 1000) - else: - raise future.exception # pylint: disable-msg=raising-bad-type + raise future.exception # pylint: disable-msg=raising-bad-type def need_rejoin(self): """Check whether the group should be rejoined (e.g. if metadata changes) @@ -230,49 +272,117 @@ class BaseCoordinator(object): Returns: bool: True if it should, False otherwise """ - return self.rejoin_needed + with self._lock: + return self.rejoin_needed + + def poll_heartbeat(self): + """ + Check the status of the heartbeat thread (if it is active) and indicate + the liveness of the client. This must be called periodically after + joining with :meth:`.ensureActiveGroup` to ensure that the member stays + in the group. If an interval of time longer than the provided rebalance + timeout expires without calling this method, then the client will + proactively leave the group. + + Raises: RuntimeError for unexpected errors raised from the heartbeat thread + """ + with self._lock: + if self._heartbeat_thread is not None: + if self._heartbeat_thread.failed: + # set the heartbeat thread to null and raise an exception. + # If the user catches it, the next call to ensure_active_group() + # will spawn a new heartbeat thread. + cause = self._heartbeat_thread.failed + self._heartbeat_thread = None + raise cause + self.heartbeat.poll() + + def time_to_next_heartbeat(self): + with self._lock: + # if we have not joined the group, we don't need to send heartbeats + if self.state is MemberState.UNJOINED: + return sys.maxsize + return self.heartbeat.time_to_next_heartbeat() + + def _handle_join_success(self, member_assignment_bytes): + with self._lock: + log.info("Successfully joined group %s with generation %s", + self.group_id, self._generation.generation_id) + self.join_future = None + self.state = MemberState.STABLE + self.rejoining = False + self._heartbeat_thread.enable() + self._on_join_complete(self._generation.generation_id, + self._generation.member_id, + self._generation.protocol, + member_assignment_bytes) + + def _handle_join_failure(self, exception): + with self._lock: + self.join_future = None + self.state = MemberState.UNJOINED def ensure_active_group(self): """Ensure that the group is active (i.e. joined and synced)""" - if not self.need_rejoin(): - return - - if not self.rejoining: - self._on_join_prepare(self.generation, self.member_id) - self.rejoining = True - - while self.need_rejoin(): - self.ensure_coordinator_known() - - # ensure that there are no pending requests to the coordinator. - # This is important in particular to avoid resending a pending - # JoinGroup request. - while not self.coordinator_unknown(): - if not self._client.in_flight_request_count(self.coordinator_id): - break - self._client.poll(delayed_tasks=False) - else: - continue - - future = self._send_join_group_request() - self._client.poll(future=future) + with self._lock: + if not self.need_rejoin(): + return - if future.succeeded(): - member_assignment_bytes = future.value - self._on_join_complete(self.generation, self.member_id, - self.protocol, member_assignment_bytes) - self.rejoining = False - self.heartbeat_task.reset() - else: - assert future.failed() - exception = future.exception - if isinstance(exception, (Errors.UnknownMemberIdError, - Errors.RebalanceInProgressError, - Errors.IllegalGenerationError)): + # call on_join_prepare if needed. We set a flag to make sure that + # we do not call it a second time if the client is woken up before + # a pending rebalance completes. + if not self.rejoining: + self._on_join_prepare(self._generation.generation_id, + self._generation.member_id) + self.rejoining = True + + if self._heartbeat_thread is None: + self._heartbeat_thread = HeartbeatThread(weakref.proxy(self)) + self._heartbeat_thread.start() + + while self.need_rejoin(): + self.ensure_coordinator_ready() + + # ensure that there are no pending requests to the coordinator. + # This is important in particular to avoid resending a pending + # JoinGroup request. + while not self.coordinator_unknown(): + if not self._client.in_flight_request_count(self.coordinator_id): + break + self._client.poll(delayed_tasks=False) + else: continue - elif not future.retriable(): - raise exception # pylint: disable-msg=raising-bad-type - time.sleep(self.config['retry_backoff_ms'] / 1000) + + # we store the join future in case we are woken up by the user + # after beginning the rebalance in the call to poll below. + # This ensures that we do not mistakenly attempt to rejoin + # before the pending rebalance has completed. + if self.join_future is None: + self.state = MemberState.REBALANCING + self.join_future = self._send_join_group_request() + + # handle join completion in the callback so that the + # callback will be invoked even if the consumer is woken up + # before finishing the rebalance + self.join_future.add_callback(self._handle_join_success) + + # we handle failures below after the request finishes. + # If the join completes after having been woken up, the + # exception is ignored and we will rejoin + self.join_future.add_errback(self._handle_join_failure) + + future = self.join_future + self._client.poll(future=future) + + if future.failed(): + exception = future.exception + if isinstance(exception, (Errors.UnknownMemberIdError, + Errors.RebalanceInProgressError, + Errors.IllegalGenerationError)): + continue + elif not future.retriable(): + raise exception # pylint: disable-msg=raising-bad-type + time.sleep(self.config['retry_backoff_ms'] / 1000) def _send_join_group_request(self): """Join the group and return the assignment for the next generation. @@ -294,14 +404,35 @@ class BaseCoordinator(object): # send a join group request to the coordinator log.info("(Re-)joining group %s", self.group_id) - request = JoinGroupRequest[0]( - self.group_id, - self.config['session_timeout_ms'], - self.member_id, - self.protocol_type(), - [(protocol, - metadata if isinstance(metadata, bytes) else metadata.encode()) - for protocol, metadata in self.group_protocols()]) + member_metadata = [ + (protocol, metadata if isinstance(metadata, bytes) else metadata.encode()) + for protocol, metadata in self.group_protocols() + ] + if self.config['api_version'] < (0, 9): + raise Errors.KafkaError('JoinGroupRequest api requires 0.9+ brokers') + elif (0, 9) <= self.config['api_version'] < (0, 10, 1): + request = JoinGroupRequest[0]( + self.group_id, + self.config['session_timeout_ms'], # or max(session, max_poll_interval_ms) ? + self.member_id, + self.protocol_type(), + member_metadata) + elif (0, 10, 1) <= self.config['api_version'] < (0, 11, 0): + request = JoinGroupRequest[1]( + self.group_id, + self.config['session_timeout_ms'], + self.config['max_poll_interval_ms'], + self.member_id, + self.protocol_type(), + member_metadata) + else: + request = JoinGroupRequest[2]( + self.group_id, + self.config['session_timeout_ms'], + self.config['max_poll_interval_ms'], + self.member_id, + self.protocol_type(), + member_metadata) # create the request for the coordinator log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id) @@ -327,19 +458,25 @@ class BaseCoordinator(object): if error_type is Errors.NoError: log.debug("Received successful JoinGroup response for group %s: %s", self.group_id, response) - self.member_id = response.member_id - self.generation = response.generation_id - self.rejoin_needed = False - self.protocol = response.group_protocol - log.info("Joined group '%s' (generation %s) with member_id %s", - self.group_id, self.generation, self.member_id) self.sensors.join_latency.record((time.time() - send_time) * 1000) - if response.leader_id == response.member_id: - log.info("Elected group leader -- performing partition" - " assignments using %s", self.protocol) - self._on_join_leader(response).chain(future) - else: - self._on_join_follower().chain(future) + with self._lock: + if self.state is not MemberState.REBALANCING: + # if the consumer was woken up before a rebalance completes, + # we may have already left the group. In this case, we do + # not want to continue with the sync group. + future.failure(UnjoinedGroupException()) + else: + self._generation = Generation(response.generation_id, + response.member_id, + response.group_protocol) + self.rejoin_needed = False + + if response.leader_id == response.member_id: + log.info("Elected group leader -- performing partition" + " assignments using %s", self.protocol) + self._on_join_leader(response).chain(future) + else: + self._on_join_follower().chain(future) elif error_type is Errors.GroupLoadInProgressError: log.debug("Attempt to join group %s rejected since coordinator %s" @@ -349,7 +486,7 @@ class BaseCoordinator(object): elif error_type is Errors.UnknownMemberIdError: # reset the member id and retry immediately error = error_type(self.member_id) - self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID + self.reset_generation() log.debug("Attempt to join group %s failed due to unknown member id", self.group_id) future.failure(error) @@ -379,10 +516,11 @@ class BaseCoordinator(object): def _on_join_follower(self): # send follower's sync group with an empty assignment - request = SyncGroupRequest[0]( + version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + request = SyncGroupRequest[version]( self.group_id, - self.generation, - self.member_id, + self._generation.generation_id, + self._generation.member_id, {}) log.debug("Sending follower SyncGroup for group %s to coordinator %s: %s", self.group_id, self.coordinator_id, request) @@ -406,10 +544,11 @@ class BaseCoordinator(object): except Exception as e: return Future().failure(e) - request = SyncGroupRequest[0]( + version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + request = SyncGroupRequest[version]( self.group_id, - self.generation, - self.member_id, + self._generation.generation_id, + self._generation.member_id, [(member_id, assignment if isinstance(assignment, bytes) else assignment.encode()) for member_id, assignment in six.iteritems(group_assignment)]) @@ -439,14 +578,12 @@ class BaseCoordinator(object): def _handle_sync_group_response(self, future, send_time, response): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: - log.info("Successfully joined group %s with generation %s", - self.group_id, self.generation) self.sensors.sync_latency.record((time.time() - send_time) * 1000) future.success(response.member_assignment) return # Always rejoin on error - self.rejoin_needed = True + self.request_rejoin() if error_type is Errors.GroupAuthorizationFailedError: future.failure(error_type(self.group_id)) elif error_type is Errors.RebalanceInProgressError: @@ -457,7 +594,7 @@ class BaseCoordinator(object): Errors.IllegalGenerationError): error = error_type() log.debug("SyncGroup for group %s failed due to %s", self.group_id, error) - self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID + self.reset_generation() future.failure(error) elif error_type in (Errors.GroupCoordinatorNotAvailableError, Errors.NotCoordinatorForGroupError): @@ -495,30 +632,24 @@ class BaseCoordinator(object): def _handle_group_coordinator_response(self, future, response): log.debug("Received group coordinator response %s", response) - if not self.coordinator_unknown(): - # We already found the coordinator, so ignore the request - log.debug("Coordinator already known -- ignoring metadata response") - future.success(self.coordinator_id) - return error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: - ok = self._client.cluster.add_group_coordinator(self.group_id, response) - if not ok: - # This could happen if coordinator metadata is different - # than broker metadata - future.failure(Errors.IllegalStateError()) - return - - self.coordinator_id = response.coordinator_id - log.info("Discovered coordinator %s for group %s", - self.coordinator_id, self.group_id) - self._client.ready(self.coordinator_id) - - # start sending heartbeats only if we have a valid generation - if self.generation > 0: - self.heartbeat_task.reset() + with self._lock: + ok = self._client.cluster.add_group_coordinator(self.group_id, response) + if not ok: + # This could happen if coordinator metadata is different + # than broker metadata + future.failure(Errors.IllegalStateError()) + return + + self.coordinator_id = response.coordinator_id + log.info("Discovered coordinator %s for group %s", + self.coordinator_id, self.group_id) + self._client.ready(self.coordinator_id) + self.heartbeat.reset_timeouts() future.success(self.coordinator_id) + elif error_type is Errors.GroupCoordinatorNotAvailableError: log.debug("Group Coordinator Not Available; retry") future.failure(error_type()) @@ -528,45 +659,74 @@ class BaseCoordinator(object): future.failure(error) else: error = error_type() - log.error("Unrecognized failure in Group Coordinator Request: %s", - error) + log.error("Group coordinator lookup for group %s failed: %s", + self.group_id, error) future.failure(error) def coordinator_dead(self, error): """Mark the current coordinator as dead.""" - if self.coordinator_id is not None: - log.warning("Marking the coordinator dead (node %s) for group %s: %s.", - self.coordinator_id, self.group_id, error) - self.coordinator_id = None + with self._lock: + if self.coordinator_id is not None: + log.warning("Marking the coordinator dead (node %s) for group %s: %s.", + self.coordinator_id, self.group_id, error) + self.coordinator_id = None + + def generation(self): + """Get the current generation state if the group is stable. + + Returns: the current generation or None if the group is unjoined/rebalancing + """ + with self._lock: + if self.state is not MemberState.STABLE: + return None + return self._generation + + def reset_generation(self): + """Reset the generation and memberId because we have fallen out of the group.""" + with self._lock: + self._generation = Generation.NO_GENERATION + self.rejoin_needed = True + self.state = MemberState.UNJOINED + + def request_rejoin(self): + with self._lock: + self.rejoin_needed = True def close(self): """Close the coordinator, leave the current group, and reset local generation / member_id""" - try: - self._client.unschedule(self.heartbeat_task) - except KeyError: - pass - - if not self.coordinator_unknown() and self.generation > 0: - # this is a minimal effort attempt to leave the group. we do not - # attempt any resending if the request fails or times out. - log.info('Leaving consumer group (%s).', self.group_id) - request = LeaveGroupRequest[0](self.group_id, self.member_id) - future = self._client.send(self.coordinator_id, request) - future.add_callback(self._handle_leave_group_response) - future.add_errback(log.error, "LeaveGroup request failed: %s") - self._client.poll(future=future) - - self.generation = OffsetCommitRequest[2].DEFAULT_GENERATION_ID - self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID - self.rejoin_needed = True + with self._lock: + if self._heartbeat_thread is not None: + self._heartbeat_thread.close() + self.maybe_leave_group() + + def maybe_leave_group(self): + """Leave the current group and reset local generation/memberId.""" + with self._lock: + if (not self.coordinator_unknown() + and self.state is not MemberState.UNJOINED + and self._generation is not Generation.NO_GENERATION): + + # this is a minimal effort attempt to leave the group. we do not + # attempt any resending if the request fails or times out. + log.info('Leaving consumer group (%s).', self.group_id) + version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + request = LeaveGroupRequest[version](self.group_id, self._generation.member_id) + future = self._client.send(self.coordinator_id, request) + future.add_callback(self._handle_leave_group_response) + future.add_errback(log.error, "LeaveGroup request failed: %s") + self._client.poll(future=future, wakeup=False) # XXX + + self.reset_generation() def _handle_leave_group_response(self, response): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: - log.info("LeaveGroup request succeeded") + log.debug("LeaveGroup request for group %s returned successfully", + self.group_id) else: - log.error("LeaveGroup request failed: %s", error_type()) + log.error("LeaveGroup request for group %s failed with error: %s", + self.group_id, error_type()) def _send_heartbeat_request(self): """Send a heartbeat request""" @@ -578,7 +738,10 @@ class BaseCoordinator(object): e = Errors.NodeNotReadyError(self.coordinator_id) return Future().failure(e) - request = HeartbeatRequest[0](self.group_id, self.generation, self.member_id) + version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + request = HeartbeatRequest[version](self.group_id, + self._generation.generation_id, + self._generation.member_id) log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member future = Future() _f = self._client.send(self.coordinator_id, request) @@ -598,24 +761,23 @@ class BaseCoordinator(object): Errors.NotCoordinatorForGroupError): log.warning("Heartbeat failed for group %s: coordinator (node %s)" " is either not started or not valid", self.group_id, - self.coordinator_id) + self.coordinator()) self.coordinator_dead(error_type()) future.failure(error_type()) elif error_type is Errors.RebalanceInProgressError: log.warning("Heartbeat failed for group %s because it is" " rebalancing", self.group_id) - self.rejoin_needed = True + self.request_rejoin() future.failure(error_type()) elif error_type is Errors.IllegalGenerationError: log.warning("Heartbeat failed for group %s: generation id is not " " current.", self.group_id) - self.rejoin_needed = True + self.reset_generation() future.failure(error_type()) elif error_type is Errors.UnknownMemberIdError: log.warning("Heartbeat: local member_id was not recognized;" " this consumer needs to re-join") - self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID - self.rejoin_needed = True + self.reset_generation() future.failure(error_type) elif error_type is Errors.GroupAuthorizationFailedError: error = error_type(self.group_id) @@ -743,6 +905,107 @@ class GroupCoordinatorMetrics(object): metrics.add_metric(metrics.metric_name( 'last-heartbeat-seconds-ago', self.metric_group_name, - 'The number of seconds since the last controller heartbeat', + 'The number of seconds since the last controller heartbeat was sent', tags), AnonMeasurable( lambda _, now: (now / 1000) - self.heartbeat.last_send)) + + +class HeartbeatThread(threading.Thread): + def __init__(self, coordinator): + super(HeartbeatThread, self).__init__() + self.coordinator = coordinator + self.enabled = False + self.closed = False + self.failed = None + + def enable(self): + with self.coordinator._lock: + self.enabled = True + self.coordinator.heartbeat.reset_timeouts() + self.coordinator._lock.notify() + + def disable(self): + with self.coordinator._lock: + self.enabled = False + + def close(self): + with self.coordinator._lock: + self.closed = True + self.coordinator._lock.notify() + + def run(self): + try: + find_coordinator_future = None + + while True: + with self.coordinator._lock: + if self.closed: + return + + if not self.enabled: + self.coordinator._lock.wait() + continue + + if self.coordinator.state is not MemberState.STABLE: + # the group is not stable (perhaps because we left the + # group or because the coordinator kicked us out), so + # disable heartbeats and wait for the main thread to rejoin. + self.disable() + continue + + # Disabling wakeup here is intended to prevent a call to + # consumer.wakeup() to propagate an exception to this + # heartbeat thread + self.coordinator._client.poll(wakeup=False) # XXX + + if self.coordinator.coordinator_unknown(): + if find_coordinator_future is None or find_coordinator_future.is_done(): + find_coordinator_future = self.lookup_coordinator() + else: + self.coordinator._lock.wait(self.config['retry_backoff_ms'] / 1000) + + elif self.coordinator.heartbeat.session_timeout_expired(): + # the session timeout has expired without seeing a + # successful heartbeat, so we should probably make sure + # the coordinator is still healthy. + self.coordinator.coordinator_dead() + + elif self.coordinator.heartbeat.poll_timeout_expired(): + # the poll timeout has expired, which means that the + # foreground thread has stalled in between calls to + # poll(), so we explicitly leave the group. + self.coordinator.maybe_leave_group() + + elif not self.coordinator.heartbeat.should_heartbeat(): + # poll again after waiting for the retry backoff in case + # the heartbeat failed or the coordinator disconnected + self.coordinator._lock.wait(self.config['retry_backoff_ms'] / 1000) + + else: + self.coordinator.heartbeat.send_heartbeat() + future = self.coordinator.send_heartbeat_request() + future.add_callback(self._handle_heartbeat_success) + future.add_errback(self._handle_heartbeat_failure) + + except RuntimeError as e: + log.error("Heartbeat thread for group %s failed due to unexpected error", + self.coordinator.group_id, e) + self.failed = e + + def _handle_heartbeat_success(self, result): + with self.coordinator._lock: + self.coordinator.heartbeat.receive_heartbeat() + + def _handle_heartbeat_failure(self, exception): + with self.coordinator._lock: + if isinstance(exception, Errors.RebalanceInProgressError): + # it is valid to continue heartbeating while the group is + # rebalancing. This ensures that the coordinator keeps the + # member in the group for as long as the duration of the + # rebalance timeout. If we stop sending heartbeats, however, + # then the session timeout may expire before we can rejoin. + self.coordinator.heartbeat.receive_heartbeat() + else: + self.coordinator.heartbeat.fail_heartbeat() + # wake up the thread if it's sleeping to reschedule the heartbeat + self.coordinator._lock.notify() diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index d517dd7..344d6c1 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -1,4 +1,4 @@ -from __future__ import absolute_import +from __future__ import absolute_import, division import copy import collections @@ -90,10 +90,9 @@ class ConsumerCoordinator(BaseCoordinator): self._metadata_snapshot = self._build_metadata_snapshot(subscription, client.cluster) self._assignment_snapshot = None self._cluster = client.cluster - self._cluster.request_update() - self._cluster.add_listener(WeakMethod(self._handle_metadata_update)) + self.auto_commit_interval = self.config['auto_commit_interval_ms'] / 1000 + self.next_auto_commit_deadline = None - self._auto_commit_task = None if self.config['enable_auto_commit']: if self.config['api_version'] < (0, 8, 1): log.warning('Broker version (%s) does not support offset' @@ -104,13 +103,14 @@ class ConsumerCoordinator(BaseCoordinator): log.warning('group_id is None: disabling auto-commit.') self.config['enable_auto_commit'] = False else: - interval = self.config['auto_commit_interval_ms'] / 1000.0 - self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval) - self._auto_commit_task.reschedule() + self.next_auto_commit_deadline = time.time() + self.auto_commit_interval self.consumer_sensors = ConsumerCoordinatorMetrics( metrics, self.config['metric_group_prefix'], self._subscription) + self._cluster.request_update() + self._cluster.add_listener(WeakMethod(self._handle_metadata_update)) + def __del__(self): if hasattr(self, '_cluster') and self._cluster: self._cluster.remove_listener(WeakMethod(self._handle_metadata_update)) @@ -210,8 +210,7 @@ class ConsumerCoordinator(BaseCoordinator): assignor.on_assignment(assignment) # reschedule the auto commit starting from now - if self._auto_commit_task: - self._auto_commit_task.reschedule() + self.next_auto_commit_deadline = time.time() + self.auto_commit_interval assigned = set(self._subscription.assigned_partitions()) log.info("Setting newly assigned partitions %s for group %s", @@ -227,6 +226,54 @@ class ConsumerCoordinator(BaseCoordinator): self._subscription.listener, self.group_id, assigned) + def poll(self): + """ + Poll for coordinator events. This ensures that the coordinator is known and that the consumer + has joined the group (if it is using group management). This also handles periodic offset commits + if they are enabled. + + @param now current time in milliseconds + """ + self._invoke_completed_offset_commit_callbacks() + + if self._subscription.partitions_auto_assigned() and self.coordinator_unknown(): + self.ensure_coordinator_ready() + + if self._subscription.partitions_auto_assigned() and self.need_rejoin(): + # due to a race condition between the initial metadata fetch and the + # initial rebalance, we need to ensure that the metadata is fresh + # before joining initially, and then request the metadata update. If + # metadata update arrives while the rebalance is still pending (for + # example, when the join group is still inflight), then we will lose + # track of the fact that we need to rebalance again to reflect the + # change to the topic subscription. Without ensuring that the + # metadata is fresh, any metadata update that changes the topic + # subscriptions and arrives with a rebalance in progress will + # essentially be ignored. See KAFKA-3949 for the complete + # description of the problem. + if self._subscription.subscribed_pattern: + self._client.ensure_fresh_metadata() # XXX + + self.ensure_active_group() + + self.poll_heartbeat() + self._maybe_auto_commit_offsets_async() + + def time_to_next_poll(self): + """ + Return the time to the next needed invocation of {@link #poll(long)}. + @param now current time in milliseconds + @return the maximum time in milliseconds the caller should wait before the next invocation of poll() + """ + if not self.config['enable_auto_commit']: + return self.time_to_next_heartbeat() + + if time.time() > self.next_auto_commit_deadline: + return 0 + + return min(self.next_auto_commit_deadline - time.time(), + self.time_to_next_heartbeat()) + def _perform_assignment(self, leader_id, assignment_strategy, members): assignor = self._lookup_assignor(assignment_strategy) assert assignor, 'Invalid assignment protocol: %s' % assignment_strategy @@ -315,7 +362,7 @@ class ConsumerCoordinator(BaseCoordinator): return {} while True: - self.ensure_coordinator_known() + self.ensure_coordinator_ready() # contact coordinator to fetch committed offsets future = self._send_offset_fetch_request(partitions) @@ -327,7 +374,7 @@ class ConsumerCoordinator(BaseCoordinator): if not future.retriable(): raise future.exception # pylint: disable-msg=raising-bad-type - time.sleep(self.config['retry_backoff_ms'] / 1000.0) + time.sleep(self.config['retry_backoff_ms'] / 1000) def close(self, autocommit=True): """Close the coordinator, leave the current group, @@ -344,6 +391,14 @@ class ConsumerCoordinator(BaseCoordinator): finally: super(ConsumerCoordinator, self).close() + def invoke_completed_offset_commit_callbacks(self): + try: + while True: + completion = self.completed_offset_commits.popleft() + completion.invoke() + except IndexError: + pass + def commit_offsets_async(self, offsets, callback=None): """Commit specific offsets asynchronously. @@ -386,7 +441,7 @@ class ConsumerCoordinator(BaseCoordinator): return while True: - self.ensure_coordinator_known() + self.ensure_coordinator_ready() future = self._send_offset_commit_request(offsets) self._client.poll(future=future) @@ -397,7 +452,7 @@ class ConsumerCoordinator(BaseCoordinator): if not future.retriable(): raise future.exception # pylint: disable-msg=raising-bad-type - time.sleep(self.config['retry_backoff_ms'] / 1000.0) + time.sleep(self.config['retry_backoff_ms'] / 1000) def _maybe_auto_commit_offsets_sync(self): if self._auto_commit_task is None: @@ -675,7 +730,7 @@ class AutoCommitTask(object): if self._coordinator.coordinator_unknown(): log.debug("Cannot auto-commit offsets for group %s because the" " coordinator is unknown", self._coordinator.group_id) - backoff = self._coordinator.config['retry_backoff_ms'] / 1000.0 + backoff = self._coordinator.config['retry_backoff_ms'] / 1000 self.reschedule(time.time() + backoff) return diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 4115c03..60a687e 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -234,7 +234,7 @@ def test_fetch_committed_offsets(mocker, coordinator): assert coordinator._client.poll.call_count == 0 # general case -- send offset fetch request, get successful future - mocker.patch.object(coordinator, 'ensure_coordinator_known') + mocker.patch.object(coordinator, 'ensure_coordinator_ready') mocker.patch.object(coordinator, '_send_offset_fetch_request', return_value=Future().success('foobar')) partitions = [TopicPartition('foobar', 0)] @@ -295,7 +295,7 @@ def offsets(): def test_commit_offsets_async(mocker, coordinator, offsets): mocker.patch.object(coordinator._client, 'poll') - mocker.patch.object(coordinator, 'ensure_coordinator_known') + mocker.patch.object(coordinator, 'ensure_coordinator_ready') mocker.patch.object(coordinator, '_send_offset_commit_request', return_value=Future().success('fizzbuzz')) ret = coordinator.commit_offsets_async(offsets) @@ -304,7 +304,7 @@ def test_commit_offsets_async(mocker, coordinator, offsets): def test_commit_offsets_sync(mocker, coordinator, offsets): - mocker.patch.object(coordinator, 'ensure_coordinator_known') + mocker.patch.object(coordinator, 'ensure_coordinator_ready') mocker.patch.object(coordinator, '_send_offset_commit_request', return_value=Future().success('fizzbuzz')) cli = coordinator._client |