summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-16 23:20:38 -0700
committerDana Powers <dana.powers@gmail.com>2016-07-16 23:50:34 -0700
commit277e4734757638678471b8682b52c9c5fe2839a4 (patch)
tree177841b9805b282e67102fb89302672df307c20b
parent506d023978e7273bd323c0750e3f77af259d257b (diff)
downloadkafka-python-KAFKA-3117.tar.gz
KAFKA-3117: handle metadata updates during consumer rebalanceKAFKA-3117
-rw-r--r--kafka/coordinator/consumer.py34
-rw-r--r--test/test_coordinator.py2
2 files changed, 25 insertions, 11 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 083a36a..05ebc56 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -81,7 +81,8 @@ class ConsumerCoordinator(BaseCoordinator):
assert self.config['assignors'], 'Coordinator requires assignors'
self._subscription = subscription
- self._partitions_per_topic = {}
+ self._metadata_snapshot = {}
+ self._assignment_snapshot = None
self._cluster = client.cluster
self._cluster.request_update()
self._cluster.add_listener(WeakMethod(self._handle_metadata_update))
@@ -146,7 +147,7 @@ class ConsumerCoordinator(BaseCoordinator):
# check if there are any changes to the metadata which should trigger
# a rebalance
- if self._subscription_metadata_changed():
+ if self._subscription_metadata_changed(cluster):
if (self.config['api_version'] >= (0, 9)
and self.config['group_id'] is not None):
@@ -159,20 +160,20 @@ class ConsumerCoordinator(BaseCoordinator):
self._subscription.assign_from_subscribed([
TopicPartition(topic, partition)
for topic in self._subscription.subscription
- for partition in self._partitions_per_topic[topic]
+ for partition in self._metadata_snapshot[topic]
])
- def _subscription_metadata_changed(self):
+ def _subscription_metadata_changed(self, cluster):
if not self._subscription.partitions_auto_assigned():
return False
- old_partitions_per_topic = self._partitions_per_topic
- self._partitions_per_topic = {}
+ metadata_snapshot = {}
for topic in self._subscription.group_subscription():
- partitions = self._cluster.partitions_for_topic(topic) or []
- self._partitions_per_topic[topic] = set(partitions)
+ partitions = cluster.partitions_for_topic(topic) or []
+ metadata_snapshot[topic] = set(partitions)
- if self._partitions_per_topic != old_partitions_per_topic:
+ if self._metadata_snapshot != metadata_snapshot:
+ self._metadata_snapshot = metadata_snapshot
return True
return False
@@ -184,8 +185,15 @@ class ConsumerCoordinator(BaseCoordinator):
def _on_join_complete(self, generation, member_id, protocol,
member_assignment_bytes):
+ # if we were the assignor, then we need to make sure that there have
+ # been no metadata updates since the rebalance begin. Otherwise, we
+ # won't rebalance again until the next metadata change
+ if self._assignment_snapshot and self._assignment_snapshot != self._metadata_snapshot:
+ self._subscription.mark_for_reassignment()
+ return
+
assignor = self._lookup_assignor(protocol)
- assert assignor, 'invalid assignment protocol: %s' % protocol
+ assert assignor, 'Coordinator selected invalid assignment protocol: %s' % protocol
assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes)
@@ -235,6 +243,11 @@ class ConsumerCoordinator(BaseCoordinator):
self._subscription.group_subscribe(all_subscribed_topics)
self._client.set_topics(self._subscription.group_subscription())
+ # keep track of the metadata used for assignment so that we can check
+ # after rebalance completion whether anything has changed
+ self._cluster.request_update()
+ self._assignment_snapshot = self._metadata_snapshot
+
log.debug("Performing assignment for group %s using strategy %s"
" with subscriptions %s", self.group_id, assignor.name,
member_metadata)
@@ -264,6 +277,7 @@ class ConsumerCoordinator(BaseCoordinator):
" for group %s failed on_partitions_revoked",
self._subscription.listener, self.group_id)
+ self._assignment_snapshot = None
self._subscription.mark_for_reassignment()
def need_rejoin(self):
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index 3435292..280fa70 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -85,7 +85,7 @@ def test_pattern_subscription(coordinator, api_version):
coordinator.config['api_version'] = api_version
coordinator._subscription.subscribe(pattern='foo')
assert coordinator._subscription.subscription == set([])
- assert coordinator._subscription_metadata_changed() is False
+ assert coordinator._subscription_metadata_changed({}) is False
assert coordinator._subscription.needs_partition_assignment is False
cluster = coordinator._client.cluster