From dd50847a9beaa9490ae35c7b1bc18d0780b92726 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 17 Jul 2016 08:37:44 -0700 Subject: KAFKA-3117: handle metadata updates during consumer rebalance (#766 / #701) --- kafka/coordinator/consumer.py | 34 ++++++++++++++++++++++++---------- test/test_coordinator.py | 2 +- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 2543238..a18329c 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -86,7 +86,8 @@ class ConsumerCoordinator(BaseCoordinator): assert self.config['assignors'], 'Coordinator requires assignors' self._subscription = subscription - self._partitions_per_topic = {} + self._metadata_snapshot = {} + self._assignment_snapshot = None self._cluster = client.cluster self._cluster.request_update() self._cluster.add_listener(WeakMethod(self._handle_metadata_update)) @@ -150,7 +151,7 @@ class ConsumerCoordinator(BaseCoordinator): # check if there are any changes to the metadata which should trigger # a rebalance - if self._subscription_metadata_changed(): + if self._subscription_metadata_changed(cluster): if (self.config['api_version'] >= (0, 9) and self.config['group_id'] is not None): @@ -163,20 +164,20 @@ class ConsumerCoordinator(BaseCoordinator): self._subscription.assign_from_subscribed([ TopicPartition(topic, partition) for topic in self._subscription.subscription - for partition in self._partitions_per_topic[topic] + for partition in self._metadata_snapshot[topic] ]) - def _subscription_metadata_changed(self): + def _subscription_metadata_changed(self, cluster): if not self._subscription.partitions_auto_assigned(): return False - old_partitions_per_topic = self._partitions_per_topic - self._partitions_per_topic = {} + metadata_snapshot = {} for topic in self._subscription.group_subscription(): - partitions = self._cluster.partitions_for_topic(topic) or [] - self._partitions_per_topic[topic] = set(partitions) + partitions = cluster.partitions_for_topic(topic) or [] + metadata_snapshot[topic] = set(partitions) - if self._partitions_per_topic != old_partitions_per_topic: + if self._metadata_snapshot != metadata_snapshot: + self._metadata_snapshot = metadata_snapshot return True return False @@ -188,8 +189,15 @@ class ConsumerCoordinator(BaseCoordinator): def _on_join_complete(self, generation, member_id, protocol, member_assignment_bytes): + # if we were the assignor, then we need to make sure that there have + # been no metadata updates since the rebalance begin. Otherwise, we + # won't rebalance again until the next metadata change + if self._assignment_snapshot and self._assignment_snapshot != self._metadata_snapshot: + self._subscription.mark_for_reassignment() + return + assignor = self._lookup_assignor(protocol) - assert assignor, 'invalid assignment protocol: %s' % protocol + assert assignor, 'Coordinator selected invalid assignment protocol: %s' % protocol assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes) @@ -239,6 +247,11 @@ class ConsumerCoordinator(BaseCoordinator): self._subscription.group_subscribe(all_subscribed_topics) self._client.set_topics(self._subscription.group_subscription()) + # keep track of the metadata used for assignment so that we can check + # after rebalance completion whether anything has changed + self._cluster.request_update() + self._assignment_snapshot = self._metadata_snapshot + log.debug("Performing assignment for group %s using strategy %s" " with subscriptions %s", self.group_id, assignor.name, member_metadata) @@ -268,6 +281,7 @@ class ConsumerCoordinator(BaseCoordinator): " for group %s failed on_partitions_revoked", self._subscription.listener, self.group_id) + self._assignment_snapshot = None self._subscription.mark_for_reassignment() def need_rejoin(self): diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 3435292..280fa70 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -85,7 +85,7 @@ def test_pattern_subscription(coordinator, api_version): coordinator.config['api_version'] = api_version coordinator._subscription.subscribe(pattern='foo') assert coordinator._subscription.subscription == set([]) - assert coordinator._subscription_metadata_changed() is False + assert coordinator._subscription_metadata_changed({}) is False assert coordinator._subscription.needs_partition_assignment is False cluster = coordinator._client.cluster -- cgit v1.2.1