From 277e4734757638678471b8682b52c9c5fe2839a4 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 16 Jul 2016 23:20:38 -0700 Subject: KAFKA-3117: handle metadata updates during consumer rebalance --- kafka/coordinator/consumer.py | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) (limited to 'kafka') diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 083a36a..05ebc56 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -81,7 +81,8 @@ class ConsumerCoordinator(BaseCoordinator): assert self.config['assignors'], 'Coordinator requires assignors' self._subscription = subscription - self._partitions_per_topic = {} + self._metadata_snapshot = {} + self._assignment_snapshot = None self._cluster = client.cluster self._cluster.request_update() self._cluster.add_listener(WeakMethod(self._handle_metadata_update)) @@ -146,7 +147,7 @@ class ConsumerCoordinator(BaseCoordinator): # check if there are any changes to the metadata which should trigger # a rebalance - if self._subscription_metadata_changed(): + if self._subscription_metadata_changed(cluster): if (self.config['api_version'] >= (0, 9) and self.config['group_id'] is not None): @@ -159,20 +160,20 @@ class ConsumerCoordinator(BaseCoordinator): self._subscription.assign_from_subscribed([ TopicPartition(topic, partition) for topic in self._subscription.subscription - for partition in self._partitions_per_topic[topic] + for partition in self._metadata_snapshot[topic] ]) - def _subscription_metadata_changed(self): + def _subscription_metadata_changed(self, cluster): if not self._subscription.partitions_auto_assigned(): return False - old_partitions_per_topic = self._partitions_per_topic - self._partitions_per_topic = {} + metadata_snapshot = {} for topic in self._subscription.group_subscription(): - partitions = self._cluster.partitions_for_topic(topic) or [] - self._partitions_per_topic[topic] = set(partitions) + partitions = cluster.partitions_for_topic(topic) or [] + metadata_snapshot[topic] = set(partitions) - if self._partitions_per_topic != old_partitions_per_topic: + if self._metadata_snapshot != metadata_snapshot: + self._metadata_snapshot = metadata_snapshot return True return False @@ -184,8 +185,15 @@ class ConsumerCoordinator(BaseCoordinator): def _on_join_complete(self, generation, member_id, protocol, member_assignment_bytes): + # if we were the assignor, then we need to make sure that there have + # been no metadata updates since the rebalance begin. Otherwise, we + # won't rebalance again until the next metadata change + if self._assignment_snapshot and self._assignment_snapshot != self._metadata_snapshot: + self._subscription.mark_for_reassignment() + return + assignor = self._lookup_assignor(protocol) - assert assignor, 'invalid assignment protocol: %s' % protocol + assert assignor, 'Coordinator selected invalid assignment protocol: %s' % protocol assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes) @@ -235,6 +243,11 @@ class ConsumerCoordinator(BaseCoordinator): self._subscription.group_subscribe(all_subscribed_topics) self._client.set_topics(self._subscription.group_subscription()) + # keep track of the metadata used for assignment so that we can check + # after rebalance completion whether anything has changed + self._cluster.request_update() + self._assignment_snapshot = self._metadata_snapshot + log.debug("Performing assignment for group %s using strategy %s" " with subscriptions %s", self.group_id, assignor.name, member_metadata) @@ -264,6 +277,7 @@ class ConsumerCoordinator(BaseCoordinator): " for group %s failed on_partitions_revoked", self._subscription.listener, self.group_id) + self._assignment_snapshot = None self._subscription.mark_for_reassignment() def need_rejoin(self): -- cgit v1.2.1