summaryrefslogtreecommitdiff
path: root/kafka/coordinator/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r--kafka/coordinator/consumer.py34
1 files changed, 24 insertions, 10 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 083a36a..05ebc56 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -81,7 +81,8 @@ class ConsumerCoordinator(BaseCoordinator):
assert self.config['assignors'], 'Coordinator requires assignors'
self._subscription = subscription
- self._partitions_per_topic = {}
+ self._metadata_snapshot = {}
+ self._assignment_snapshot = None
self._cluster = client.cluster
self._cluster.request_update()
self._cluster.add_listener(WeakMethod(self._handle_metadata_update))
@@ -146,7 +147,7 @@ class ConsumerCoordinator(BaseCoordinator):
# check if there are any changes to the metadata which should trigger
# a rebalance
- if self._subscription_metadata_changed():
+ if self._subscription_metadata_changed(cluster):
if (self.config['api_version'] >= (0, 9)
and self.config['group_id'] is not None):
@@ -159,20 +160,20 @@ class ConsumerCoordinator(BaseCoordinator):
self._subscription.assign_from_subscribed([
TopicPartition(topic, partition)
for topic in self._subscription.subscription
- for partition in self._partitions_per_topic[topic]
+ for partition in self._metadata_snapshot[topic]
])
- def _subscription_metadata_changed(self):
+ def _subscription_metadata_changed(self, cluster):
if not self._subscription.partitions_auto_assigned():
return False
- old_partitions_per_topic = self._partitions_per_topic
- self._partitions_per_topic = {}
+ metadata_snapshot = {}
for topic in self._subscription.group_subscription():
- partitions = self._cluster.partitions_for_topic(topic) or []
- self._partitions_per_topic[topic] = set(partitions)
+ partitions = cluster.partitions_for_topic(topic) or []
+ metadata_snapshot[topic] = set(partitions)
- if self._partitions_per_topic != old_partitions_per_topic:
+ if self._metadata_snapshot != metadata_snapshot:
+ self._metadata_snapshot = metadata_snapshot
return True
return False
@@ -184,8 +185,15 @@ class ConsumerCoordinator(BaseCoordinator):
def _on_join_complete(self, generation, member_id, protocol,
member_assignment_bytes):
+ # if we were the assignor, then we need to make sure that there have
+ # been no metadata updates since the rebalance begin. Otherwise, we
+ # won't rebalance again until the next metadata change
+ if self._assignment_snapshot and self._assignment_snapshot != self._metadata_snapshot:
+ self._subscription.mark_for_reassignment()
+ return
+
assignor = self._lookup_assignor(protocol)
- assert assignor, 'invalid assignment protocol: %s' % protocol
+ assert assignor, 'Coordinator selected invalid assignment protocol: %s' % protocol
assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes)
@@ -235,6 +243,11 @@ class ConsumerCoordinator(BaseCoordinator):
self._subscription.group_subscribe(all_subscribed_topics)
self._client.set_topics(self._subscription.group_subscription())
+ # keep track of the metadata used for assignment so that we can check
+ # after rebalance completion whether anything has changed
+ self._cluster.request_update()
+ self._assignment_snapshot = self._metadata_snapshot
+
log.debug("Performing assignment for group %s using strategy %s"
" with subscriptions %s", self.group_id, assignor.name,
member_metadata)
@@ -264,6 +277,7 @@ class ConsumerCoordinator(BaseCoordinator):
" for group %s failed on_partitions_revoked",
self._subscription.listener, self.group_id)
+ self._assignment_snapshot = None
self._subscription.mark_for_reassignment()
def need_rejoin(self):