diff options
author | Dana Powers <dana.powers@gmail.com> | 2018-02-02 16:36:30 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-02-02 16:36:30 -0800 |
commit | 618c5051493693c1305aa9f08e8a0583d5fcf0e3 (patch) | |
tree | 3a2fcec8260915a83f19a603671c4a0e5461cca0 | |
parent | 08a7fb7b754a754c6c64e96d4ba5c4f56cf38a5f (diff) | |
download | kafka-python-618c5051493693c1305aa9f08e8a0583d5fcf0e3.tar.gz |
KAFKA-3949: Avoid race condition when subscription changes during rebalance (#1364)
-rw-r--r-- | kafka/cluster.py | 7 | ||||
-rw-r--r-- | kafka/consumer/fetcher.py | 6 | ||||
-rw-r--r-- | kafka/consumer/group.py | 10 | ||||
-rw-r--r-- | kafka/consumer/subscription_state.py | 30 | ||||
-rw-r--r-- | kafka/coordinator/base.py | 24 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 102 | ||||
-rw-r--r-- | test/test_coordinator.py | 58 |
7 files changed, 128 insertions, 109 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py index d646fdf..1ab4218 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -291,6 +291,13 @@ class ClusterMetadata(object): for listener in self._listeners: listener(self) + if self.need_all_topic_metadata: + # the listener may change the interested topics, + # which could cause another metadata refresh. + # If we have already fetched all topics, however, + # another fetch should be unnecessary. + self._need_update = False + def add_listener(self, listener): """Add a callback function to be called on each metadata update""" self._listeners.add(listener) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index afb8f52..f9fcb37 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -326,9 +326,6 @@ class Fetcher(six.Iterator): max_records = self.config['max_poll_records'] assert max_records > 0 - if self._subscriptions.needs_partition_assignment: - return {}, False - drained = collections.defaultdict(list) records_remaining = max_records @@ -397,9 +394,6 @@ class Fetcher(six.Iterator): def _message_generator(self): """Iterate over fetched_records""" - if self._subscriptions.needs_partition_assignment: - raise StopIteration('Subscription needs partition assignment') - while self._next_partition_records or self._completed_fetches: if not self._next_partition_records: diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 0224d16..1c1f1e8 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -644,6 +644,11 @@ class KafkaConsumer(six.Iterator): timeout_ms = min(timeout_ms, self._coordinator.time_to_next_poll()) self._client.poll(timeout_ms=timeout_ms) + # after the long poll, we should check whether the group needs to rebalance + # prior to returning data so that the group can stabilize faster + if self._coordinator.need_rejoin(): + return {} + records, _ = self._fetcher.fetched_records(max_records) return records @@ -1055,6 +1060,11 @@ class KafkaConsumer(six.Iterator): poll_ms = 0 self._client.poll(timeout_ms=poll_ms) + # after the long poll, we should check whether the group needs to rebalance + # prior to returning data so that the group can stabilize faster + if self._coordinator.need_rejoin(): + continue + # We need to make sure we at least keep up with scheduled tasks, # like heartbeats, auto-commits, and metadata refreshes timeout_at = self._next_timeout() diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 3d4dfef..10d722e 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -68,7 +68,6 @@ class SubscriptionState(object): self._group_subscription = set() self._user_assignment = set() self.assignment = dict() - self.needs_partition_assignment = False self.listener = None # initialize to true for the consumers to fetch offset upon starting up @@ -172,7 +171,6 @@ class SubscriptionState(object): log.info('Updating subscribed topics to: %s', topics) self.subscription = set(topics) self._group_subscription.update(topics) - self.needs_partition_assignment = True # Remove any assigned partitions which are no longer subscribed to for tp in set(self.assignment.keys()): @@ -192,12 +190,12 @@ class SubscriptionState(object): raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) self._group_subscription.update(topics) - def mark_for_reassignment(self): + def reset_group_subscription(self): + """Reset the group's subscription to only contain topics subscribed by this consumer.""" if self._user_assignment: raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) assert self.subscription is not None, 'Subscription required' self._group_subscription.intersection_update(self.subscription) - self.needs_partition_assignment = True def assign_from_user(self, partitions): """Manually assign a list of TopicPartitions to this consumer. @@ -220,18 +218,17 @@ class SubscriptionState(object): if self.subscription is not None: raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) - self._user_assignment.clear() - self._user_assignment.update(partitions) + if self._user_assignment != set(partitions): + self._user_assignment = set(partitions) - for partition in partitions: - if partition not in self.assignment: - self._add_assigned_partition(partition) + for partition in partitions: + if partition not in self.assignment: + self._add_assigned_partition(partition) - for tp in set(self.assignment.keys()) - self._user_assignment: - del self.assignment[tp] + for tp in set(self.assignment.keys()) - self._user_assignment: + del self.assignment[tp] - self.needs_partition_assignment = False - self.needs_fetch_committed_offsets = True + self.needs_fetch_committed_offsets = True def assign_from_subscribed(self, assignments): """Update the assignment to the specified partitions @@ -245,16 +242,18 @@ class SubscriptionState(object): assignments (list of TopicPartition): partitions to assign to this consumer instance. """ - if self.subscription is None: + if not self.partitions_auto_assigned(): raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) for tp in assignments: if tp.topic not in self.subscription: raise ValueError("Assigned partition %s for non-subscribed topic." % str(tp)) + + # after rebalancing, we always reinitialize the assignment state self.assignment.clear() for tp in assignments: self._add_assigned_partition(tp) - self.needs_partition_assignment = False + self.needs_fetch_committed_offsets = True log.info("Updated partition assignment: %s", assignments) def unsubscribe(self): @@ -262,7 +261,6 @@ class SubscriptionState(object): self.subscription = None self._user_assignment.clear() self.assignment.clear() - self.needs_partition_assignment = True self.subscribed_pattern = None def group_subscription(self): diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 301c06d..820fc1f 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -344,23 +344,25 @@ class BaseCoordinator(object): def ensure_active_group(self): """Ensure that the group is active (i.e. joined and synced)""" with self._lock: - if not self.need_rejoin(): - return - - # call on_join_prepare if needed. We set a flag to make sure that - # we do not call it a second time if the client is woken up before - # a pending rebalance completes. - if not self.rejoining: - self._on_join_prepare(self._generation.generation_id, - self._generation.member_id) - self.rejoining = True - if self._heartbeat_thread is None: self._start_heartbeat_thread() while self.need_rejoin(): self.ensure_coordinator_ready() + # call on_join_prepare if needed. We set a flag + # to make sure that we do not call it a second + # time if the client is woken up before a pending + # rebalance completes. This must be called on each + # iteration of the loop because an event requiring + # a rebalance (such as a metadata refresh which + # changes the matched subscription set) can occur + # while another rebalance is still in progress. + if not self.rejoining: + self._on_join_prepare(self._generation.generation_id, + self._generation.member_id) + self.rejoining = True + # ensure that there are no pending requests to the coordinator. # This is important in particular to avoid resending a pending # JoinGroup request. diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index ab30883..9438a7e 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -84,6 +84,8 @@ class ConsumerCoordinator(BaseCoordinator): self.config[key] = configs[key] self._subscription = subscription + self._is_leader = False + self._joined_subscription = set() self._metadata_snapshot = self._build_metadata_snapshot(subscription, client.cluster) self._assignment_snapshot = None self._cluster = client.cluster @@ -132,11 +134,22 @@ class ConsumerCoordinator(BaseCoordinator): def group_protocols(self): """Returns list of preferred (protocols, metadata)""" - topics = self._subscription.subscription - assert topics is not None, 'Consumer has not subscribed to topics' + if self._subscription.subscription is None: + raise Errors.IllegalStateError('Consumer has not subscribed to topics') + # dpkp note: I really dislike this. + # why? because we are using this strange method group_protocols, + # which is seemingly innocuous, to set internal state (_joined_subscription) + # that is later used to check whether metadata has changed since we joined a group + # but there is no guarantee that this method, group_protocols, will get called + # in the correct sequence or that it will only be called when we want it to be. + # So this really should be moved elsewhere, but I don't have the energy to + # work that out right now. If you read this at some later date after the mutable + # state has bitten you... I'm sorry! It mimics the java client, and that's the + # best I've got for now. + self._joined_subscription = set(self._subscription.subscription) metadata_list = [] for assignor in self.config['assignors']: - metadata = assignor.metadata(topics) + metadata = assignor.metadata(self._joined_subscription) group_protocol = (assignor.name, metadata) metadata_list.append(group_protocol) return metadata_list @@ -158,21 +171,29 @@ class ConsumerCoordinator(BaseCoordinator): # check if there are any changes to the metadata which should trigger # a rebalance - if self._subscription_metadata_changed(cluster): - - if (self.config['api_version'] >= (0, 9) - and self.config['group_id'] is not None): - - self._subscription.mark_for_reassignment() - - # If we haven't got group coordinator support, - # just assign all partitions locally - else: - self._subscription.assign_from_subscribed([ - TopicPartition(topic, partition) - for topic in self._subscription.subscription - for partition in self._metadata_snapshot[topic] - ]) + if self._subscription.partitions_auto_assigned(): + metadata_snapshot = self._build_metadata_snapshot(self._subscription, cluster) + if self._metadata_snapshot != metadata_snapshot: + self._metadata_snapshot = metadata_snapshot + + # If we haven't got group coordinator support, + # just assign all partitions locally + if self._auto_assign_all_partitions(): + self._subscription.assign_from_subscribed([ + TopicPartition(topic, partition) + for topic in self._subscription.subscription + for partition in self._metadata_snapshot[topic] + ]) + + def _auto_assign_all_partitions(self): + # For users that use "subscribe" without group support, + # we will simply assign all partitions to this consumer + if self.config['api_version'] < (0, 9): + return True + elif self.config['group_id'] is None: + return True + else: + return False def _build_metadata_snapshot(self, subscription, cluster): metadata_snapshot = {} @@ -181,16 +202,6 @@ class ConsumerCoordinator(BaseCoordinator): metadata_snapshot[topic] = set(partitions) return metadata_snapshot - def _subscription_metadata_changed(self, cluster): - if not self._subscription.partitions_auto_assigned(): - return False - - metadata_snapshot = self._build_metadata_snapshot(self._subscription, cluster) - if self._metadata_snapshot != metadata_snapshot: - self._metadata_snapshot = metadata_snapshot - return True - return False - def _lookup_assignor(self, name): for assignor in self.config['assignors']: if assignor.name == name: @@ -199,12 +210,10 @@ class ConsumerCoordinator(BaseCoordinator): def _on_join_complete(self, generation, member_id, protocol, member_assignment_bytes): - # if we were the assignor, then we need to make sure that there have - # been no metadata updates since the rebalance begin. Otherwise, we - # won't rebalance again until the next metadata change - if self._assignment_snapshot is not None and self._assignment_snapshot != self._metadata_snapshot: - self._subscription.mark_for_reassignment() - return + # only the leader is responsible for monitoring for metadata changes + # (i.e. partition changes) + if not self._is_leader: + self._assignment_snapshot = None assignor = self._lookup_assignor(protocol) assert assignor, 'Coordinator selected invalid assignment protocol: %s' % protocol @@ -307,6 +316,7 @@ class ConsumerCoordinator(BaseCoordinator): # keep track of the metadata used for assignment so that we can check # after rebalance completion whether anything has changed self._cluster.request_update() + self._is_leader = True self._assignment_snapshot = self._metadata_snapshot log.debug("Performing assignment for group %s using strategy %s" @@ -338,8 +348,8 @@ class ConsumerCoordinator(BaseCoordinator): " for group %s failed on_partitions_revoked", self._subscription.listener, self.group_id) - self._assignment_snapshot = None - self._subscription.mark_for_reassignment() + self._is_leader = False + self._subscription.reset_group_subscription() def need_rejoin(self): """Check whether the group should be rejoined @@ -347,9 +357,23 @@ class ConsumerCoordinator(BaseCoordinator): Returns: bool: True if consumer should rejoin group, False otherwise """ - return (self._subscription.partitions_auto_assigned() and - (super(ConsumerCoordinator, self).need_rejoin() or - self._subscription.needs_partition_assignment)) + if not self._subscription.partitions_auto_assigned(): + return False + + if self._auto_assign_all_partitions(): + return False + + # we need to rejoin if we performed the assignment and metadata has changed + if (self._assignment_snapshot is not None + and self._assignment_snapshot != self._metadata_snapshot): + return True + + # we need to join if our subscription has changed since the last join + if (self._joined_subscription is not None + and self._joined_subscription != self._subscription.subscription): + return True + + return super(ConsumerCoordinator, self).need_rejoin() def refresh_committed_offsets_if_needed(self): """Fetch committed offsets for assigned partitions.""" diff --git a/test/test_coordinator.py b/test/test_coordinator.py index e094b9c..7a2627e 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -62,7 +62,7 @@ def test_group_protocols(coordinator): # Requires a subscription try: coordinator.group_protocols() - except AssertionError: + except Errors.IllegalStateError: pass else: assert False, 'Exception not raised when expected' @@ -85,8 +85,7 @@ def test_pattern_subscription(coordinator, api_version): coordinator.config['api_version'] = api_version coordinator._subscription.subscribe(pattern='foo') assert coordinator._subscription.subscription == set([]) - assert coordinator._subscription_metadata_changed({}) is False - assert coordinator._subscription.needs_partition_assignment is False + assert coordinator._metadata_snapshot == coordinator._build_metadata_snapshot(coordinator._subscription, {}) cluster = coordinator._client.cluster cluster.update_metadata(MetadataResponse[0]( @@ -100,12 +99,10 @@ def test_pattern_subscription(coordinator, api_version): # 0.9 consumers should trigger dynamic partition assignment if api_version >= (0, 9): - assert coordinator._subscription.needs_partition_assignment is True assert coordinator._subscription.assignment == {} # earlier consumers get all partitions assigned locally else: - assert coordinator._subscription.needs_partition_assignment is False assert set(coordinator._subscription.assignment.keys()) == set([ TopicPartition('foo1', 0), TopicPartition('foo2', 0)]) @@ -195,7 +192,6 @@ def test_perform_assignment(mocker, coordinator): def test_on_join_prepare(coordinator): coordinator._subscription.subscribe(topics=['foobar']) coordinator._on_join_prepare(0, 'member-foo') - assert coordinator._subscription.needs_partition_assignment is True def test_need_rejoin(coordinator): @@ -205,13 +201,6 @@ def test_need_rejoin(coordinator): coordinator._subscription.subscribe(topics=['foobar']) assert coordinator.need_rejoin() is True - coordinator._subscription.needs_partition_assignment = False - coordinator.rejoin_needed = False - assert coordinator.need_rejoin() is False - - coordinator._subscription.needs_partition_assignment = True - assert coordinator.need_rejoin() is True - def test_refresh_committed_offsets_if_needed(mocker, coordinator): mocker.patch.object(ConsumerCoordinator, 'fetch_committed_offsets', @@ -388,7 +377,6 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable, @pytest.fixture def patched_coord(mocker, coordinator): coordinator._subscription.subscribe(topics=['foobar']) - coordinator._subscription.needs_partition_assignment = False mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False) coordinator.coordinator_id = 0 mocker.patch.object(coordinator, 'coordinator', return_value=0) @@ -461,47 +449,39 @@ def test_send_offset_commit_request_success(mocker, patched_coord, offsets): offsets, future, mocker.ANY, response) -@pytest.mark.parametrize('response,error,dead,reassign', [ +@pytest.mark.parametrize('response,error,dead', [ (OffsetCommitResponse[0]([('foobar', [(0, 30), (1, 30)])]), - Errors.GroupAuthorizationFailedError, False, False), + Errors.GroupAuthorizationFailedError, False), (OffsetCommitResponse[0]([('foobar', [(0, 12), (1, 12)])]), - Errors.OffsetMetadataTooLargeError, False, False), + Errors.OffsetMetadataTooLargeError, False), (OffsetCommitResponse[0]([('foobar', [(0, 28), (1, 28)])]), - Errors.InvalidCommitOffsetSizeError, False, False), + Errors.InvalidCommitOffsetSizeError, False), (OffsetCommitResponse[0]([('foobar', [(0, 14), (1, 14)])]), - Errors.GroupLoadInProgressError, False, False), + Errors.GroupLoadInProgressError, False), (OffsetCommitResponse[0]([('foobar', [(0, 15), (1, 15)])]), - Errors.GroupCoordinatorNotAvailableError, True, False), + Errors.GroupCoordinatorNotAvailableError, True), (OffsetCommitResponse[0]([('foobar', [(0, 16), (1, 16)])]), - Errors.NotCoordinatorForGroupError, True, False), + Errors.NotCoordinatorForGroupError, True), (OffsetCommitResponse[0]([('foobar', [(0, 7), (1, 7)])]), - Errors.RequestTimedOutError, True, False), + Errors.RequestTimedOutError, True), (OffsetCommitResponse[0]([('foobar', [(0, 25), (1, 25)])]), - Errors.CommitFailedError, False, True), + Errors.CommitFailedError, False), (OffsetCommitResponse[0]([('foobar', [(0, 22), (1, 22)])]), - Errors.CommitFailedError, False, True), + Errors.CommitFailedError, False), (OffsetCommitResponse[0]([('foobar', [(0, 27), (1, 27)])]), - Errors.CommitFailedError, False, True), + Errors.CommitFailedError, False), (OffsetCommitResponse[0]([('foobar', [(0, 17), (1, 17)])]), - Errors.InvalidTopicError, False, False), + Errors.InvalidTopicError, False), (OffsetCommitResponse[0]([('foobar', [(0, 29), (1, 29)])]), - Errors.TopicAuthorizationFailedError, False, False), + Errors.TopicAuthorizationFailedError, False), ]) def test_handle_offset_commit_response(mocker, patched_coord, offsets, - response, error, dead, reassign): + response, error, dead): future = Future() patched_coord._handle_offset_commit_response(offsets, future, time.time(), response) assert isinstance(future.exception, error) assert patched_coord.coordinator_id is (None if dead else 0) - if reassign: - assert patched_coord._generation is Generation.NO_GENERATION - assert patched_coord.rejoin_needed is True - assert patched_coord.state is MemberState.UNJOINED - else: - assert patched_coord._generation is not Generation.NO_GENERATION - assert patched_coord.rejoin_needed is False - assert patched_coord.state is MemberState.STABLE @pytest.fixture @@ -570,6 +550,10 @@ def test_send_offset_fetch_request_success(patched_coord, partitions): Errors.GroupLoadInProgressError, False), (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 16), (1, 234, b'', 16)])]), Errors.NotCoordinatorForGroupError, True), + (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 25), (1, 234, b'', 25)])]), + Errors.UnknownMemberIdError, False), + (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 22), (1, 234, b'', 22)])]), + Errors.IllegalGenerationError, False), (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 29), (1, 234, b'', 29)])]), Errors.TopicAuthorizationFailedError, False), (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]), @@ -627,7 +611,7 @@ def test_ensure_active_group(mocker, coordinator): coordinator._subscription.subscribe(topics=['foobar']) mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False) mocker.patch.object(coordinator, '_send_join_group_request', return_value=Future().success(True)) - mocker.patch.object(coordinator, 'need_rejoin', side_effect=[True, True, False]) + mocker.patch.object(coordinator, 'need_rejoin', side_effect=[True, False]) mocker.patch.object(coordinator, '_on_join_complete') mocker.patch.object(coordinator, '_heartbeat_thread') |