summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValeria Chernenko <aynroot@users.noreply.github.com>2020-09-30 06:03:54 +0200
committerGitHub <noreply@github.com>2020-09-29 21:03:54 -0700
commitc536dd28bc3c2db85d9b62a1e73d23a3eeaebd93 (patch)
tree40b412379666620a8a5173932652e94bdc9439b2
parentcb96a1a6c79c17ac9b3399b7a33bbaea7ad8886f (diff)
downloadkafka-python-c536dd28bc3c2db85d9b62a1e73d23a3eeaebd93.tar.gz
KIP-54: Implement sticky partition assignment strategy (#2057)
-rw-r--r--kafka/coordinator/assignors/sticky/__init__.py0
-rw-r--r--kafka/coordinator/assignors/sticky/partition_movements.py149
-rw-r--r--kafka/coordinator/assignors/sticky/sorted_set.py63
-rw-r--r--kafka/coordinator/assignors/sticky/sticky_assignor.py681
-rw-r--r--kafka/coordinator/consumer.py5
-rw-r--r--setup.py1
-rw-r--r--test/test_assignors.py843
-rw-r--r--test/test_coordinator.py36
-rw-r--r--test/test_partition_movements.py23
9 files changed, 1781 insertions, 20 deletions
diff --git a/kafka/coordinator/assignors/sticky/__init__.py b/kafka/coordinator/assignors/sticky/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/kafka/coordinator/assignors/sticky/__init__.py
diff --git a/kafka/coordinator/assignors/sticky/partition_movements.py b/kafka/coordinator/assignors/sticky/partition_movements.py
new file mode 100644
index 0000000..8851e4c
--- /dev/null
+++ b/kafka/coordinator/assignors/sticky/partition_movements.py
@@ -0,0 +1,149 @@
+import logging
+from collections import defaultdict, namedtuple
+from copy import deepcopy
+
+from kafka.vendor import six
+
+log = logging.getLogger(__name__)
+
+
+ConsumerPair = namedtuple("ConsumerPair", ["src_member_id", "dst_member_id"])
+"""
+Represents a pair of Kafka consumer ids involved in a partition reassignment.
+Each ConsumerPair corresponds to a particular partition or topic, indicates that the particular partition or some
+partition of the particular topic was moved from the source consumer to the destination consumer
+during the rebalance. This class helps in determining whether a partition reassignment results in cycles among
+the generated graph of consumer pairs.
+"""
+
+
+def is_sublist(source, target):
+ """Checks if one list is a sublist of another.
+
+ Arguments:
+ source: the list in which to search for the occurrence of target.
+ target: the list to search for as a sublist of source
+
+ Returns:
+ true if target is in source; false otherwise
+ """
+ for index in (i for i, e in enumerate(source) if e == target[0]):
+ if tuple(source[index: index + len(target)]) == target:
+ return True
+ return False
+
+
+class PartitionMovements:
+ """
+ This class maintains some data structures to simplify lookup of partition movements among consumers.
+ At each point of time during a partition rebalance it keeps track of partition movements
+ corresponding to each topic, and also possible movement (in form a ConsumerPair object) for each partition.
+ """
+
+ def __init__(self):
+ self.partition_movements_by_topic = defaultdict(
+ lambda: defaultdict(set)
+ )
+ self.partition_movements = {}
+
+ def move_partition(self, partition, old_consumer, new_consumer):
+ pair = ConsumerPair(src_member_id=old_consumer, dst_member_id=new_consumer)
+ if partition in self.partition_movements:
+ # this partition has previously moved
+ existing_pair = self._remove_movement_record_of_partition(partition)
+ assert existing_pair.dst_member_id == old_consumer
+ if existing_pair.src_member_id != new_consumer:
+ # the partition is not moving back to its previous consumer
+ self._add_partition_movement_record(
+ partition, ConsumerPair(src_member_id=existing_pair.src_member_id, dst_member_id=new_consumer)
+ )
+ else:
+ self._add_partition_movement_record(partition, pair)
+
+ def get_partition_to_be_moved(self, partition, old_consumer, new_consumer):
+ if partition.topic not in self.partition_movements_by_topic:
+ return partition
+ if partition in self.partition_movements:
+ # this partition has previously moved
+ assert old_consumer == self.partition_movements[partition].dst_member_id
+ old_consumer = self.partition_movements[partition].src_member_id
+ reverse_pair = ConsumerPair(src_member_id=new_consumer, dst_member_id=old_consumer)
+ if reverse_pair not in self.partition_movements_by_topic[partition.topic]:
+ return partition
+
+ return next(iter(self.partition_movements_by_topic[partition.topic][reverse_pair]))
+
+ def are_sticky(self):
+ for topic, movements in six.iteritems(self.partition_movements_by_topic):
+ movement_pairs = set(movements.keys())
+ if self._has_cycles(movement_pairs):
+ log.error(
+ "Stickiness is violated for topic {}\n"
+ "Partition movements for this topic occurred among the following consumer pairs:\n"
+ "{}".format(topic, movement_pairs)
+ )
+ return False
+ return True
+
+ def _remove_movement_record_of_partition(self, partition):
+ pair = self.partition_movements[partition]
+ del self.partition_movements[partition]
+
+ self.partition_movements_by_topic[partition.topic][pair].remove(partition)
+ if not self.partition_movements_by_topic[partition.topic][pair]:
+ del self.partition_movements_by_topic[partition.topic][pair]
+ if not self.partition_movements_by_topic[partition.topic]:
+ del self.partition_movements_by_topic[partition.topic]
+
+ return pair
+
+ def _add_partition_movement_record(self, partition, pair):
+ self.partition_movements[partition] = pair
+ self.partition_movements_by_topic[partition.topic][pair].add(partition)
+
+ def _has_cycles(self, consumer_pairs):
+ cycles = set()
+ for pair in consumer_pairs:
+ reduced_pairs = deepcopy(consumer_pairs)
+ reduced_pairs.remove(pair)
+ path = [pair.src_member_id]
+ if self._is_linked(pair.dst_member_id, pair.src_member_id, reduced_pairs, path) and not self._is_subcycle(
+ path, cycles
+ ):
+ cycles.add(tuple(path))
+ log.error("A cycle of length {} was found: {}".format(len(path) - 1, path))
+
+ # for now we want to make sure there is no partition movements of the same topic between a pair of consumers.
+ # the odds of finding a cycle among more than two consumers seem to be very low (according to various randomized
+ # tests with the given sticky algorithm) that it should not worth the added complexity of handling those cases.
+ for cycle in cycles:
+ if len(cycle) == 3: # indicates a cycle of length 2
+ return True
+ return False
+
+ @staticmethod
+ def _is_subcycle(cycle, cycles):
+ super_cycle = deepcopy(cycle)
+ super_cycle = super_cycle[:-1]
+ super_cycle.extend(cycle)
+ for found_cycle in cycles:
+ if len(found_cycle) == len(cycle) and is_sublist(super_cycle, found_cycle):
+ return True
+ return False
+
+ def _is_linked(self, src, dst, pairs, current_path):
+ if src == dst:
+ return False
+ if not pairs:
+ return False
+ if ConsumerPair(src, dst) in pairs:
+ current_path.append(src)
+ current_path.append(dst)
+ return True
+ for pair in pairs:
+ if pair.src_member_id == src:
+ reduced_set = deepcopy(pairs)
+ reduced_set.remove(pair)
+ current_path.append(pair.src_member_id)
+ return self._is_linked(pair.dst_member_id, dst, reduced_set, current_path)
+ return False
diff --git a/kafka/coordinator/assignors/sticky/sorted_set.py b/kafka/coordinator/assignors/sticky/sorted_set.py
new file mode 100644
index 0000000..6a454a4
--- /dev/null
+++ b/kafka/coordinator/assignors/sticky/sorted_set.py
@@ -0,0 +1,63 @@
+class SortedSet:
+ def __init__(self, iterable=None, key=None):
+ self._key = key if key is not None else lambda x: x
+ self._set = set(iterable) if iterable is not None else set()
+
+ self._cached_last = None
+ self._cached_first = None
+
+ def first(self):
+ if self._cached_first is not None:
+ return self._cached_first
+
+ first = None
+ for element in self._set:
+ if first is None or self._key(first) > self._key(element):
+ first = element
+ self._cached_first = first
+ return first
+
+ def last(self):
+ if self._cached_last is not None:
+ return self._cached_last
+
+ last = None
+ for element in self._set:
+ if last is None or self._key(last) < self._key(element):
+ last = element
+ self._cached_last = last
+ return last
+
+ def pop_last(self):
+ value = self.last()
+ self._set.remove(value)
+ self._cached_last = None
+ return value
+
+ def add(self, value):
+ if self._cached_last is not None and self._key(value) > self._key(self._cached_last):
+ self._cached_last = value
+ if self._cached_first is not None and self._key(value) < self._key(self._cached_first):
+ self._cached_first = value
+
+ return self._set.add(value)
+
+ def remove(self, value):
+ if self._cached_last is not None and self._cached_last == value:
+ self._cached_last = None
+ if self._cached_first is not None and self._cached_first == value:
+ self._cached_first = None
+
+ return self._set.remove(value)
+
+ def __contains__(self, value):
+ return value in self._set
+
+ def __iter__(self):
+ return iter(sorted(self._set, key=self._key))
+
+ def _bool(self):
+ return len(self._set) != 0
+
+ __nonzero__ = _bool
+ __bool__ = _bool
diff --git a/kafka/coordinator/assignors/sticky/sticky_assignor.py b/kafka/coordinator/assignors/sticky/sticky_assignor.py
new file mode 100644
index 0000000..7827086
--- /dev/null
+++ b/kafka/coordinator/assignors/sticky/sticky_assignor.py
@@ -0,0 +1,681 @@
+import logging
+from collections import defaultdict, namedtuple
+from copy import deepcopy
+
+from kafka.cluster import ClusterMetadata
+from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
+from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements
+from kafka.coordinator.assignors.sticky.sorted_set import SortedSet
+from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
+from kafka.coordinator.protocol import Schema
+from kafka.protocol.struct import Struct
+from kafka.protocol.types import String, Array, Int32
+from kafka.structs import TopicPartition
+from kafka.vendor import six
+
+log = logging.getLogger(__name__)
+
+ConsumerGenerationPair = namedtuple("ConsumerGenerationPair", ["consumer", "generation"])
+
+
+def has_identical_list_elements(list_):
+ """Checks if all lists in the collection have the same members
+
+ Arguments:
+ list_: collection of lists
+
+ Returns:
+ true if all lists in the collection have the same members; false otherwise
+ """
+ if not list_:
+ return True
+ for i in range(1, len(list_)):
+ if list_[i] != list_[i - 1]:
+ return False
+ return True
+
+
+def subscriptions_comparator_key(element):
+ return len(element[1]), element[0]
+
+
+def partitions_comparator_key(element):
+ return len(element[1]), element[0].topic, element[0].partition
+
+
+def remove_if_present(collection, element):
+ try:
+ collection.remove(element)
+ except (ValueError, KeyError):
+ pass
+
+
+StickyAssignorMemberMetadataV1 = namedtuple("StickyAssignorMemberMetadataV1",
+ ["subscription", "partitions", "generation"])
+
+
+class StickyAssignorUserDataV1(Struct):
+ """
+ Used for preserving consumer's previously assigned partitions
+ list and sending it as user data to the leader during a rebalance
+ """
+
+ SCHEMA = Schema(
+ ("previous_assignment", Array(("topic", String("utf-8")), ("partitions", Array(Int32)))), ("generation", Int32)
+ )
+
+
+class StickyAssignmentExecutor:
+ def __init__(self, cluster, members):
+ self.members = members
+ # a mapping between consumers and their assigned partitions that is updated during assignment procedure
+ self.current_assignment = defaultdict(list)
+ # an assignment from a previous generation
+ self.previous_assignment = {}
+ # a mapping between partitions and their assigned consumers
+ self.current_partition_consumer = {}
+ # a flag indicating that there were no previous assignments performed ever
+ self.is_fresh_assignment = False
+ # a mapping of all topic partitions to all consumers that can be assigned to them
+ self.partition_to_all_potential_consumers = {}
+ # a mapping of all consumers to all potential topic partitions that can be assigned to them
+ self.consumer_to_all_potential_partitions = {}
+ # an ascending sorted set of consumers based on how many topic partitions are already assigned to them
+ self.sorted_current_subscriptions = SortedSet()
+ # an ascending sorted list of topic partitions based on how many consumers can potentially use them
+ self.sorted_partitions = []
+ # all partitions that need to be assigned
+ self.unassigned_partitions = []
+ # a flag indicating that a certain partition cannot remain assigned to its current consumer because the consumer
+ # is no longer subscribed to its topic
+ self.revocation_required = False
+
+ self.partition_movements = PartitionMovements()
+ self._initialize(cluster)
+
+ def perform_initial_assignment(self):
+ self._populate_sorted_partitions()
+ self._populate_partitions_to_reassign()
+
+ def balance(self):
+ self._initialize_current_subscriptions()
+ initializing = len(self.current_assignment[self._get_consumer_with_most_subscriptions()]) == 0
+
+ # assign all unassigned partitions
+ for partition in self.unassigned_partitions:
+ # skip if there is no potential consumer for the partition
+ if not self.partition_to_all_potential_consumers[partition]:
+ continue
+ self._assign_partition(partition)
+
+ # narrow down the reassignment scope to only those partitions that can actually be reassigned
+ fixed_partitions = set()
+ for partition in six.iterkeys(self.partition_to_all_potential_consumers):
+ if not self._can_partition_participate_in_reassignment(partition):
+ fixed_partitions.add(partition)
+ for fixed_partition in fixed_partitions:
+ remove_if_present(self.sorted_partitions, fixed_partition)
+ remove_if_present(self.unassigned_partitions, fixed_partition)
+
+ # narrow down the reassignment scope to only those consumers that are subject to reassignment
+ fixed_assignments = {}
+ for consumer in six.iterkeys(self.consumer_to_all_potential_partitions):
+ if not self._can_consumer_participate_in_reassignment(consumer):
+ self._remove_consumer_from_current_subscriptions_and_maintain_order(consumer)
+ fixed_assignments[consumer] = self.current_assignment[consumer]
+ del self.current_assignment[consumer]
+
+ # create a deep copy of the current assignment so we can revert to it
+ # if we do not get a more balanced assignment later
+ prebalance_assignment = deepcopy(self.current_assignment)
+ prebalance_partition_consumers = deepcopy(self.current_partition_consumer)
+
+ # if we don't already need to revoke something due to subscription changes,
+ # first try to balance by only moving newly added partitions
+ if not self.revocation_required:
+ self._perform_reassignments(self.unassigned_partitions)
+ reassignment_performed = self._perform_reassignments(self.sorted_partitions)
+
+ # if we are not preserving existing assignments and we have made changes to the current assignment
+ # make sure we are getting a more balanced assignment; otherwise, revert to previous assignment
+ if (
+ not initializing
+ and reassignment_performed
+ and self._get_balance_score(self.current_assignment) >= self._get_balance_score(prebalance_assignment)
+ ):
+ self.current_assignment = prebalance_assignment
+ self.current_partition_consumer.clear()
+ self.current_partition_consumer.update(prebalance_partition_consumers)
+
+ # add the fixed assignments (those that could not change) back
+ for consumer, partitions in six.iteritems(fixed_assignments):
+ self.current_assignment[consumer] = partitions
+ self._add_consumer_to_current_subscriptions_and_maintain_order(consumer)
+
+ def get_final_assignment(self, member_id):
+ assignment = defaultdict(list)
+ for topic_partition in self.current_assignment[member_id]:
+ assignment[topic_partition.topic].append(topic_partition.partition)
+ assignment = {k: sorted(v) for k, v in six.iteritems(assignment)}
+ return six.viewitems(assignment)
+
+ def _initialize(self, cluster):
+ self._init_current_assignments(self.members)
+
+ for topic in cluster.topics():
+ partitions = cluster.partitions_for_topic(topic)
+ if partitions is None:
+ log.warning("No partition metadata for topic %s", topic)
+ continue
+ for p in partitions:
+ partition = TopicPartition(topic=topic, partition=p)
+ self.partition_to_all_potential_consumers[partition] = []
+ for consumer_id, member_metadata in six.iteritems(self.members):
+ self.consumer_to_all_potential_partitions[consumer_id] = []
+ for topic in member_metadata.subscription:
+ if cluster.partitions_for_topic(topic) is None:
+ log.warning("No partition metadata for topic {}".format(topic))
+ continue
+ for p in cluster.partitions_for_topic(topic):
+ partition = TopicPartition(topic=topic, partition=p)
+ self.consumer_to_all_potential_partitions[consumer_id].append(partition)
+ self.partition_to_all_potential_consumers[partition].append(consumer_id)
+ if consumer_id not in self.current_assignment:
+ self.current_assignment[consumer_id] = []
+
+ def _init_current_assignments(self, members):
+ # we need to process subscriptions' user data with each consumer's reported generation in mind
+ # higher generations overwrite lower generations in case of a conflict
+ # note that a conflict could exists only if user data is for different generations
+
+ # for each partition we create a map of its consumers by generation
+ sorted_partition_consumers_by_generation = {}
+ for consumer, member_metadata in six.iteritems(members):
+ for partitions in member_metadata.partitions:
+ if partitions in sorted_partition_consumers_by_generation:
+ consumers = sorted_partition_consumers_by_generation[partitions]
+ if member_metadata.generation and member_metadata.generation in consumers:
+ # same partition is assigned to two consumers during the same rebalance.
+ # log a warning and skip this record
+ log.warning(
+ "Partition {} is assigned to multiple consumers "
+ "following sticky assignment generation {}.".format(partitions, member_metadata.generation)
+ )
+ else:
+ consumers[member_metadata.generation] = consumer
+ else:
+ sorted_consumers = {member_metadata.generation: consumer}
+ sorted_partition_consumers_by_generation[partitions] = sorted_consumers
+
+ # previous_assignment holds the prior ConsumerGenerationPair (before current) of each partition
+ # current and previous consumers are the last two consumers of each partition in the above sorted map
+ for partitions, consumers in six.iteritems(sorted_partition_consumers_by_generation):
+ generations = sorted(consumers.keys(), reverse=True)
+ self.current_assignment[consumers[generations[0]]].append(partitions)
+ # now update previous assignment if any
+ if len(generations) > 1:
+ self.previous_assignment[partitions] = ConsumerGenerationPair(
+ consumer=consumers[generations[1]], generation=generations[1]
+ )
+
+ self.is_fresh_assignment = len(self.current_assignment) == 0
+
+ for consumer_id, partitions in six.iteritems(self.current_assignment):
+ for partition in partitions:
+ self.current_partition_consumer[partition] = consumer_id
+
+ def _are_subscriptions_identical(self):
+ """
+ Returns:
+ true, if both potential consumers of partitions and potential partitions that consumers can
+ consume are the same
+ """
+ if not has_identical_list_elements(list(six.itervalues(self.partition_to_all_potential_consumers))):
+ return False
+ return has_identical_list_elements(list(six.itervalues(self.consumer_to_all_potential_partitions)))
+
+ def _populate_sorted_partitions(self):
+ # set of topic partitions with their respective potential consumers
+ all_partitions = set((tp, tuple(consumers))
+ for tp, consumers in six.iteritems(self.partition_to_all_potential_consumers))
+ partitions_sorted_by_num_of_potential_consumers = sorted(all_partitions, key=partitions_comparator_key)
+
+ self.sorted_partitions = []
+ if not self.is_fresh_assignment and self._are_subscriptions_identical():
+ # if this is a reassignment and the subscriptions are identical (all consumers can consumer from all topics)
+ # then we just need to simply list partitions in a round robin fashion (from consumers with
+ # most assigned partitions to those with least)
+ assignments = deepcopy(self.current_assignment)
+ for consumer_id, partitions in six.iteritems(assignments):
+ to_remove = []
+ for partition in partitions:
+ if partition not in self.partition_to_all_potential_consumers:
+ to_remove.append(partition)
+ for partition in to_remove:
+ partitions.remove(partition)
+
+ sorted_consumers = SortedSet(
+ iterable=[(consumer, tuple(partitions)) for consumer, partitions in six.iteritems(assignments)],
+ key=subscriptions_comparator_key,
+ )
+ # at this point, sorted_consumers contains an ascending-sorted list of consumers based on
+ # how many valid partitions are currently assigned to them
+ while sorted_consumers:
+ # take the consumer with the most partitions
+ consumer, _ = sorted_consumers.pop_last()
+ # currently assigned partitions to this consumer
+ remaining_partitions = assignments[consumer]
+ # from partitions that had a different consumer before,
+ # keep only those that are assigned to this consumer now
+ previous_partitions = set(six.iterkeys(self.previous_assignment)).intersection(set(remaining_partitions))
+ if previous_partitions:
+ # if there is a partition of this consumer that was assigned to another consumer before
+ # mark it as good options for reassignment
+ partition = previous_partitions.pop()
+ remaining_partitions.remove(partition)
+ self.sorted_partitions.append(partition)
+ sorted_consumers.add((consumer, tuple(assignments[consumer])))
+ elif remaining_partitions:
+ # otherwise, mark any other one of the current partitions as a reassignment candidate
+ self.sorted_partitions.append(remaining_partitions.pop())
+ sorted_consumers.add((consumer, tuple(assignments[consumer])))
+
+ while partitions_sorted_by_num_of_potential_consumers:
+ partition = partitions_sorted_by_num_of_potential_consumers.pop(0)[0]
+ if partition not in self.sorted_partitions:
+ self.sorted_partitions.append(partition)
+ else:
+ while partitions_sorted_by_num_of_potential_consumers:
+ self.sorted_partitions.append(partitions_sorted_by_num_of_potential_consumers.pop(0)[0])
+
+ def _populate_partitions_to_reassign(self):
+ self.unassigned_partitions = deepcopy(self.sorted_partitions)
+
+ assignments_to_remove = []
+ for consumer_id, partitions in six.iteritems(self.current_assignment):
+ if consumer_id not in self.members:
+ # if a consumer that existed before (and had some partition assignments) is now removed,
+ # remove it from current_assignment
+ for partition in partitions:
+ del self.current_partition_consumer[partition]
+ assignments_to_remove.append(consumer_id)
+ else:
+ # otherwise (the consumer still exists)
+ partitions_to_remove = []
+ for partition in partitions:
+ if partition not in self.partition_to_all_potential_consumers:
+ # if this topic partition of this consumer no longer exists
+ # remove it from current_assignment of the consumer
+ partitions_to_remove.append(partition)
+ elif partition.topic not in self.members[consumer_id].subscription:
+ # if this partition cannot remain assigned to its current consumer because the consumer
+ # is no longer subscribed to its topic remove it from current_assignment of the consumer
+ partitions_to_remove.append(partition)
+ self.revocation_required = True
+ else:
+ # otherwise, remove the topic partition from those that need to be assigned only if
+ # its current consumer is still subscribed to its topic (because it is already assigned
+ # and we would want to preserve that assignment as much as possible)
+ self.unassigned_partitions.remove(partition)
+ for partition in partitions_to_remove:
+ self.current_assignment[consumer_id].remove(partition)
+ del self.current_partition_consumer[partition]
+ for consumer_id in assignments_to_remove:
+ del self.current_assignment[consumer_id]
+
+ def _initialize_current_subscriptions(self):
+ self.sorted_current_subscriptions = SortedSet(
+ iterable=[(consumer, tuple(partitions)) for consumer, partitions in six.iteritems(self.current_assignment)],
+ key=subscriptions_comparator_key,
+ )
+
+ def _get_consumer_with_least_subscriptions(self):
+ return self.sorted_current_subscriptions.first()[0]
+
+ def _get_consumer_with_most_subscriptions(self):
+ return self.sorted_current_subscriptions.last()[0]
+
+ def _remove_consumer_from_current_subscriptions_and_maintain_order(self, consumer):
+ self.sorted_current_subscriptions.remove((consumer, tuple(self.current_assignment[consumer])))
+
+ def _add_consumer_to_current_subscriptions_and_maintain_order(self, consumer):
+ self.sorted_current_subscriptions.add((consumer, tuple(self.current_assignment[consumer])))
+
+ def _is_balanced(self):
+ """Determines if the current assignment is a balanced one"""
+ if (
+ len(self.current_assignment[self._get_consumer_with_least_subscriptions()])
+ >= len(self.current_assignment[self._get_consumer_with_most_subscriptions()]) - 1
+ ):
+ # if minimum and maximum numbers of partitions assigned to consumers differ by at most one return true
+ return True
+
+ # create a mapping from partitions to the consumer assigned to them
+ all_assigned_partitions = {}
+ for consumer_id, consumer_partitions in six.iteritems(self.current_assignment):
+ for partition in consumer_partitions:
+ if partition in all_assigned_partitions:
+ log.error("{} is assigned to more than one consumer.".format(partition))
+ all_assigned_partitions[partition] = consumer_id
+
+ # for each consumer that does not have all the topic partitions it can get
+ # make sure none of the topic partitions it could but did not get cannot be moved to it
+ # (because that would break the balance)
+ for consumer, _ in self.sorted_current_subscriptions:
+ consumer_partition_count = len(self.current_assignment[consumer])
+ # skip if this consumer already has all the topic partitions it can get
+ if consumer_partition_count == len(self.consumer_to_all_potential_partitions[consumer]):
+ continue
+
+ # otherwise make sure it cannot get any more
+ for partition in self.consumer_to_all_potential_partitions[consumer]:
+ if partition not in self.current_assignment[consumer]:
+ other_consumer = all_assigned_partitions[partition]
+ other_consumer_partition_count = len(self.current_assignment[other_consumer])
+ if consumer_partition_count < other_consumer_partition_count:
+ return False
+ return True
+
+ def _assign_partition(self, partition):
+ for consumer, _ in self.sorted_current_subscriptions:
+ if partition in self.consumer_to_all_potential_partitions[consumer]:
+ self._remove_consumer_from_current_subscriptions_and_maintain_order(consumer)
+ self.current_assignment[consumer].append(partition)
+ self.current_partition_consumer[partition] = consumer
+ self._add_consumer_to_current_subscriptions_and_maintain_order(consumer)
+ break
+
+ def _can_partition_participate_in_reassignment(self, partition):
+ return len(self.partition_to_all_potential_consumers[partition]) >= 2
+
+ def _can_consumer_participate_in_reassignment(self, consumer):
+ current_partitions = self.current_assignment[consumer]
+ current_assignment_size = len(current_partitions)
+ max_assignment_size = len(self.consumer_to_all_potential_partitions[consumer])
+ if current_assignment_size > max_assignment_size:
+ log.error("The consumer {} is assigned more partitions than the maximum possible.".format(consumer))
+ if current_assignment_size < max_assignment_size:
+ # if a consumer is not assigned all its potential partitions it is subject to reassignment
+ return True
+ for partition in current_partitions:
+ # if any of the partitions assigned to a consumer is subject to reassignment the consumer itself
+ # is subject to reassignment
+ if self._can_partition_participate_in_reassignment(partition):
+ return True
+ return False
+
+ def _perform_reassignments(self, reassignable_partitions):
+ reassignment_performed = False
+
+ # repeat reassignment until no partition can be moved to improve the balance
+ while True:
+ modified = False
+ # reassign all reassignable partitions until the full list is processed or a balance is achieved
+ # (starting from the partition with least potential consumers and if needed)
+ for partition in reassignable_partitions:
+ if self._is_balanced():
+ break
+ # the partition must have at least two potential consumers
+ if len(self.partition_to_all_potential_consumers[partition]) <= 1:
+ log.error("Expected more than one potential consumer for partition {}".format(partition))
+ # the partition must have a current consumer
+ consumer = self.current_partition_consumer.get(partition)
+ if consumer is None:
+ log.error("Expected partition {} to be assigned to a consumer".format(partition))
+
+ if (
+ partition in self.previous_assignment
+ and len(self.current_assignment[consumer])
+ > len(self.current_assignment[self.previous_assignment[partition].consumer]) + 1
+ ):
+ self._reassign_partition_to_consumer(
+ partition, self.previous_assignment[partition].consumer,
+ )
+ reassignment_performed = True
+ modified = True
+ continue
+
+ # check if a better-suited consumer exist for the partition; if so, reassign it
+ for other_consumer in self.partition_to_all_potential_consumers[partition]:
+ if len(self.current_assignment[consumer]) > len(self.current_assignment[other_consumer]) + 1:
+ self._reassign_partition(partition)
+ reassignment_performed = True
+ modified = True
+ break
+
+ if not modified:
+ break
+ return reassignment_performed
+
+ def _reassign_partition(self, partition):
+ new_consumer = None
+ for another_consumer, _ in self.sorted_current_subscriptions:
+ if partition in self.consumer_to_all_potential_partitions[another_consumer]:
+ new_consumer = another_consumer
+ break
+ assert new_consumer is not None
+ self._reassign_partition_to_consumer(partition, new_consumer)
+
+ def _reassign_partition_to_consumer(self, partition, new_consumer):
+ consumer = self.current_partition_consumer[partition]
+ # find the correct partition movement considering the stickiness requirement
+ partition_to_be_moved = self.partition_movements.get_partition_to_be_moved(partition, consumer, new_consumer)
+ self._move_partition(partition_to_be_moved, new_consumer)
+
+ def _move_partition(self, partition, new_consumer):
+ old_consumer = self.current_partition_consumer[partition]
+ self._remove_consumer_from_current_subscriptions_and_maintain_order(old_consumer)
+ self._remove_consumer_from_current_subscriptions_and_maintain_order(new_consumer)
+
+ self.partition_movements.move_partition(partition, old_consumer, new_consumer)
+
+ self.current_assignment[old_consumer].remove(partition)
+ self.current_assignment[new_consumer].append(partition)
+ self.current_partition_consumer[partition] = new_consumer
+
+ self._add_consumer_to_current_subscriptions_and_maintain_order(new_consumer)
+ self._add_consumer_to_current_subscriptions_and_maintain_order(old_consumer)
+
+ @staticmethod
+ def _get_balance_score(assignment):
+ """Calculates a balance score of a give assignment
+ as the sum of assigned partitions size difference of all consumer pairs.
+ A perfectly balanced assignment (with all consumers getting the same number of partitions)
+ has a balance score of 0. Lower balance score indicates a more balanced assignment.
+
+ Arguments:
+ assignment (dict): {consumer: list of assigned topic partitions}
+
+ Returns:
+ the balance score of the assignment
+ """
+ score = 0
+ consumer_to_assignment = {}
+ for consumer_id, partitions in six.iteritems(assignment):
+ consumer_to_assignment[consumer_id] = len(partitions)
+
+ consumers_to_explore = set(consumer_to_assignment.keys())
+ for consumer_id in consumer_to_assignment.keys():
+ if consumer_id in consumers_to_explore:
+ consumers_to_explore.remove(consumer_id)
+ for other_consumer_id in consumers_to_explore:
+ score += abs(consumer_to_assignment[consumer_id] - consumer_to_assignment[other_consumer_id])
+ return score
+
+
+class StickyPartitionAssignor(AbstractPartitionAssignor):
+ """
+ https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
+
+ The sticky assignor serves two purposes. First, it guarantees an assignment that is as balanced as possible, meaning either:
+ - the numbers of topic partitions assigned to consumers differ by at most one; or
+ - each consumer that has 2+ fewer topic partitions than some other consumer cannot get any of those topic partitions transferred to it.
+
+ Second, it preserved as many existing assignment as possible when a reassignment occurs.
+ This helps in saving some of the overhead processing when topic partitions move from one consumer to another.
+
+ Starting fresh it would work by distributing the partitions over consumers as evenly as possible.
+ Even though this may sound similar to how round robin assignor works, the second example below shows that it is not.
+ During a reassignment it would perform the reassignment in such a way that in the new assignment
+ - topic partitions are still distributed as evenly as possible, and
+ - topic partitions stay with their previously assigned consumers as much as possible.
+
+ The first goal above takes precedence over the second one.
+
+ Example 1.
+ Suppose there are three consumers C0, C1, C2,
+ four topics t0, t1, t2, t3, and each topic has 2 partitions,
+ resulting in partitions t0p0, t0p1, t1p0, t1p1, t2p0, t2p1, t3p0, t3p1.
+ Each consumer is subscribed to all three topics.
+
+ The assignment with both sticky and round robin assignors will be:
+ - C0: [t0p0, t1p1, t3p0]
+ - C1: [t0p1, t2p0, t3p1]
+ - C2: [t1p0, t2p1]
+
+ Now, let's assume C1 is removed and a reassignment is about to happen. The round robin assignor would produce:
+ - C0: [t0p0, t1p0, t2p0, t3p0]
+ - C2: [t0p1, t1p1, t2p1, t3p1]
+
+ while the sticky assignor would result in:
+ - C0 [t0p0, t1p1, t3p0, t2p0]
+ - C2 [t1p0, t2p1, t0p1, t3p1]
+ preserving all the previous assignments (unlike the round robin assignor).
+
+
+ Example 2.
+ There are three consumers C0, C1, C2,
+ and three topics t0, t1, t2, with 1, 2, and 3 partitions respectively.
+ Therefore, the partitions are t0p0, t1p0, t1p1, t2p0, t2p1, t2p2.
+ C0 is subscribed to t0;
+ C1 is subscribed to t0, t1;
+ and C2 is subscribed to t0, t1, t2.
+
+ The round robin assignor would come up with the following assignment:
+ - C0 [t0p0]
+ - C1 [t1p0]
+ - C2 [t1p1, t2p0, t2p1, t2p2]
+
+ which is not as balanced as the assignment suggested by sticky assignor:
+ - C0 [t0p0]
+ - C1 [t1p0, t1p1]
+ - C2 [t2p0, t2p1, t2p2]
+
+ Now, if consumer C0 is removed, these two assignors would produce the following assignments.
+ Round Robin (preserves 3 partition assignments):
+ - C1 [t0p0, t1p1]
+ - C2 [t1p0, t2p0, t2p1, t2p2]
+
+ Sticky (preserves 5 partition assignments):
+ - C1 [t1p0, t1p1, t0p0]
+ - C2 [t2p0, t2p1, t2p2]
+ """
+
+ DEFAULT_GENERATION_ID = -1
+
+ name = "sticky"
+ version = 0
+
+ member_assignment = None
+ generation = DEFAULT_GENERATION_ID
+
+ _latest_partition_movements = None
+
+ @classmethod
+ def assign(cls, cluster, members):
+ """Performs group assignment given cluster metadata and member subscriptions
+
+ Arguments:
+ cluster (ClusterMetadata): cluster metadata
+ members (dict of {member_id: MemberMetadata}): decoded metadata for each member in the group.
+
+ Returns:
+ dict: {member_id: MemberAssignment}
+ """
+ members_metadata = {}
+ for consumer, member_metadata in six.iteritems(members):
+ members_metadata[consumer] = cls.parse_member_metadata(member_metadata)
+
+ executor = StickyAssignmentExecutor(cluster, members_metadata)
+ executor.perform_initial_assignment()
+ executor.balance()
+
+ cls._latest_partition_movements = executor.partition_movements
+
+ assignment = {}
+ for member_id in members:
+ assignment[member_id] = ConsumerProtocolMemberAssignment(
+ cls.version, sorted(executor.get_final_assignment(member_id)), b''
+ )
+ return assignment
+
+ @classmethod
+ def parse_member_metadata(cls, metadata):
+ """
+ Parses member metadata into a python object.
+ This implementation only serializes and deserializes the StickyAssignorMemberMetadataV1 user data,
+ since no StickyAssignor written in Python was deployed ever in the wild with version V0, meaning that
+ there is no need to support backward compatibility with V0.
+
+ Arguments:
+ metadata (MemberMetadata): decoded metadata for a member of the group.
+
+ Returns:
+ parsed metadata (StickyAssignorMemberMetadataV1)
+ """
+ user_data = metadata.user_data
+ if not user_data:
+ return StickyAssignorMemberMetadataV1(
+ partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.subscription
+ )
+
+ try:
+ decoded_user_data = StickyAssignorUserDataV1.decode(user_data)
+ except Exception as e:
+ # ignore the consumer's previous assignment if it cannot be parsed
+ log.error("Could not parse member data", e) # pylint: disable=logging-too-many-args
+ return StickyAssignorMemberMetadataV1(
+ partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.subscription
+ )
+
+ member_partitions = []
+ for topic, partitions in decoded_user_data.previous_assignment: # pylint: disable=no-member
+ member_partitions.extend([TopicPartition(topic, partition) for partition in partitions])
+ return StickyAssignorMemberMetadataV1(
+ # pylint: disable=no-member
+ partitions=member_partitions, generation=decoded_user_data.generation, subscription=metadata.subscription
+ )
+
+ @classmethod
+ def metadata(cls, topics):
+ if cls.member_assignment is None:
+ log.debug("No member assignment available")
+ user_data = b''
+ else:
+ log.debug("Member assignment is available, generating the metadata: generation {}".format(cls.generation))
+ partitions_by_topic = defaultdict(list)
+ for topic_partition in cls.member_assignment: # pylint: disable=not-an-iterable
+ partitions_by_topic[topic_partition.topic].append(topic_partition.partition)
+ data = StickyAssignorUserDataV1(six.iteritems(partitions_by_topic), cls.generation)
+ user_data = data.encode()
+ return ConsumerProtocolMemberMetadata(cls.version, list(topics), user_data)
+
+ @classmethod
+ def on_assignment(cls, assignment):
+ """Callback that runs on each assignment. Updates assignor's state.
+
+ Arguments:
+ assignment: MemberAssignment
+ """
+ log.debug("On assignment: assignment={}".format(assignment))
+ cls.member_assignment = assignment.partitions()
+
+ @classmethod
+ def on_generation_assignment(cls, generation):
+ """Callback that runs on each assignment. Updates assignor's generation id.
+
+ Arguments:
+ generation: generation id
+ """
+ log.debug("On generation assignment: generation={}".format(generation))
+ cls.generation = generation
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index fda80aa..971f5e8 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -11,6 +11,7 @@ from kafka.vendor import six
from kafka.coordinator.base import BaseCoordinator, Generation
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
+from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor
from kafka.coordinator.protocol import ConsumerProtocol
import kafka.errors as Errors
from kafka.future import Future
@@ -31,7 +32,7 @@ class ConsumerCoordinator(BaseCoordinator):
'enable_auto_commit': True,
'auto_commit_interval_ms': 5000,
'default_offset_commit_callback': None,
- 'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor),
+ 'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor, StickyPartitionAssignor),
'session_timeout_ms': 10000,
'heartbeat_interval_ms': 3000,
'max_poll_interval_ms': 300000,
@@ -234,6 +235,8 @@ class ConsumerCoordinator(BaseCoordinator):
# give the assignor a chance to update internal state
# based on the received assignment
assignor.on_assignment(assignment)
+ if assignor.name == 'sticky':
+ assignor.on_generation_assignment(generation)
# reschedule the auto commit starting from now
self.next_auto_commit_deadline = time.time() + self.auto_commit_interval
diff --git a/setup.py b/setup.py
index 5cb2e72..b92dd6e 100644
--- a/setup.py
+++ b/setup.py
@@ -7,6 +7,7 @@ from setuptools import setup, Command, find_packages
# since we can't import something we haven't built yet :)
exec(open('kafka/version.py').read())
+
class Tox(Command):
user_options = []
diff --git a/test/test_assignors.py b/test/test_assignors.py
index 0821caf..016ff8e 100644
--- a/test/test_assignors.py
+++ b/test/test_assignors.py
@@ -1,28 +1,45 @@
# pylint: skip-file
from __future__ import absolute_import
+from collections import defaultdict
+from random import randint, sample
+
import pytest
+from kafka.structs import TopicPartition
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
-from kafka.coordinator.protocol import ConsumerProtocolMemberAssignment
+from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor, StickyAssignorUserDataV1
+from kafka.coordinator.protocol import ConsumerProtocolMemberAssignment, ConsumerProtocolMemberMetadata
+from kafka.vendor import six
+
+@pytest.fixture(autouse=True)
+def reset_sticky_assignor():
+ yield
+ StickyPartitionAssignor.member_assignment = None
+ StickyPartitionAssignor.generation = -1
-@pytest.fixture
-def cluster(mocker):
+
+def create_cluster(mocker, topics, topics_partitions=None, topic_partitions_lambda=None):
cluster = mocker.MagicMock()
- cluster.partitions_for_topic.return_value = set([0, 1, 2])
+ cluster.topics.return_value = topics
+ if topics_partitions is not None:
+ cluster.partitions_for_topic.return_value = topics_partitions
+ if topic_partitions_lambda is not None:
+ cluster.partitions_for_topic.side_effect = topic_partitions_lambda
return cluster
-def test_assignor_roundrobin(cluster):
+def test_assignor_roundrobin(mocker):
assignor = RoundRobinPartitionAssignor
member_metadata = {
- 'C0': assignor.metadata(set(['t0', 't1'])),
- 'C1': assignor.metadata(set(['t0', 't1'])),
+ 'C0': assignor.metadata({'t0', 't1'}),
+ 'C1': assignor.metadata({'t0', 't1'}),
}
+ cluster = create_cluster(mocker, {'t0', 't1'}, topics_partitions={0, 1, 2})
ret = assignor.assign(cluster, member_metadata)
expected = {
'C0': ConsumerProtocolMemberAssignment(
@@ -36,14 +53,15 @@ def test_assignor_roundrobin(cluster):
assert ret[member].encode() == expected[member].encode()
-def test_assignor_range(cluster):
+def test_assignor_range(mocker):
assignor = RangePartitionAssignor
member_metadata = {
- 'C0': assignor.metadata(set(['t0', 't1'])),
- 'C1': assignor.metadata(set(['t0', 't1'])),
+ 'C0': assignor.metadata({'t0', 't1'}),
+ 'C1': assignor.metadata({'t0', 't1'}),
}
+ cluster = create_cluster(mocker, {'t0', 't1'}, topics_partitions={0, 1, 2})
ret = assignor.assign(cluster, member_metadata)
expected = {
'C0': ConsumerProtocolMemberAssignment(
@@ -55,3 +73,808 @@ def test_assignor_range(cluster):
assert set(ret) == set(expected)
for member in ret:
assert ret[member].encode() == expected[member].encode()
+
+
+def test_sticky_assignor1(mocker):
+ """
+ Given: there are three consumers C0, C1, C2,
+ four topics t0, t1, t2, t3, and each topic has 2 partitions,
+ resulting in partitions t0p0, t0p1, t1p0, t1p1, t2p0, t2p1, t3p0, t3p1.
+ Each consumer is subscribed to all three topics.
+ Then: perform fresh assignment
+ Expected: the assignment is
+ - C0: [t0p0, t1p1, t3p0]
+ - C1: [t0p1, t2p0, t3p1]
+ - C2: [t1p0, t2p1]
+ Then: remove C1 consumer and perform the reassignment
+ Expected: the new assignment is
+ - C0 [t0p0, t1p1, t2p0, t3p0]
+ - C2 [t0p1, t1p0, t2p1, t3p1]
+ """
+ cluster = create_cluster(mocker, topics={'t0', 't1', 't2', 't3'}, topics_partitions={0, 1})
+
+ subscriptions = {
+ 'C0': {'t0', 't1', 't2', 't3'},
+ 'C1': {'t0', 't1', 't2', 't3'},
+ 'C2': {'t0', 't1', 't2', 't3'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C0': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t0', [0]), ('t1', [1]), ('t3', [0])], b''),
+ 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t0', [1]), ('t2', [0]), ('t3', [1])], b''),
+ 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0]), ('t2', [1])], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+ del subscriptions['C1']
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C0': ConsumerProtocolMemberAssignment(
+ StickyPartitionAssignor.version, [('t0', [0]), ('t1', [1]), ('t2', [0]), ('t3', [0])], b''
+ ),
+ 'C2': ConsumerProtocolMemberAssignment(
+ StickyPartitionAssignor.version, [('t0', [1]), ('t1', [0]), ('t2', [1]), ('t3', [1])], b''
+ ),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_sticky_assignor2(mocker):
+ """
+ Given: there are three consumers C0, C1, C2,
+ and three topics t0, t1, t2, with 1, 2, and 3 partitions respectively.
+ Therefore, the partitions are t0p0, t1p0, t1p1, t2p0, t2p1, t2p2.
+ C0 is subscribed to t0;
+ C1 is subscribed to t0, t1;
+ and C2 is subscribed to t0, t1, t2.
+ Then: perform the assignment
+ Expected: the assignment is
+ - C0 [t0p0]
+ - C1 [t1p0, t1p1]
+ - C2 [t2p0, t2p1, t2p2]
+ Then: remove C0 and perform the assignment
+ Expected: the assignment is
+ - C1 [t0p0, t1p0, t1p1]
+ - C2 [t2p0, t2p1, t2p2]
+ """
+
+ partitions = {'t0': {0}, 't1': {0, 1}, 't2': {0, 1, 2}}
+ cluster = create_cluster(mocker, topics={'t0', 't1', 't2'}, topic_partitions_lambda=lambda t: partitions[t])
+
+ subscriptions = {
+ 'C0': {'t0'},
+ 'C1': {'t0', 't1'},
+ 'C2': {'t0', 't1', 't2'},
+ }
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, [])
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C0': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t0', [0])], b''),
+ 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0, 1])], b''),
+ 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t2', [0, 1, 2])], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+ del subscriptions['C0']
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t0', [0]), ('t1', [0, 1])], b''),
+ 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t2', [0, 1, 2])], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_sticky_one_consumer_no_topic(mocker):
+ cluster = create_cluster(mocker, topics={}, topics_partitions={})
+
+ subscriptions = {
+ 'C': set(),
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_sticky_one_consumer_nonexisting_topic(mocker):
+ cluster = create_cluster(mocker, topics={}, topics_partitions={})
+
+ subscriptions = {
+ 'C': {'t'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_sticky_one_consumer_one_topic(mocker):
+ cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2})
+
+ subscriptions = {
+ 'C': {'t'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_sticky_should_only_assign_partitions_from_subscribed_topics(mocker):
+ cluster = create_cluster(mocker, topics={'t', 'other-t'}, topics_partitions={0, 1, 2})
+
+ subscriptions = {
+ 'C': {'t'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_sticky_one_consumer_multiple_topics(mocker):
+ cluster = create_cluster(mocker, topics={'t1', 't2'}, topics_partitions={0, 1, 2})
+
+ subscriptions = {
+ 'C': {'t1', 't2'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0, 1, 2]), ('t2', [0, 1, 2])], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_sticky_two_consumers_one_topic_one_partition(mocker):
+ cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0})
+
+ subscriptions = {
+ 'C1': {'t'},
+ 'C2': {'t'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0])], b''),
+ 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_sticky_two_consumers_one_topic_two_partitions(mocker):
+ cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1})
+
+ subscriptions = {
+ 'C1': {'t'},
+ 'C2': {'t'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0])], b''),
+ 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [1])], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_sticky_multiple_consumers_mixed_topic_subscriptions(mocker):
+ partitions = {'t1': {0, 1, 2}, 't2': {0, 1}}
+ cluster = create_cluster(mocker, topics={'t1', 't2'}, topic_partitions_lambda=lambda t: partitions[t])
+
+ subscriptions = {
+ 'C1': {'t1'},
+ 'C2': {'t1', 't2'},
+ 'C3': {'t1'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0, 2])], b''),
+ 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t2', [0, 1])], b''),
+ 'C3': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [1])], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_sticky_add_remove_consumer_one_topic(mocker):
+ cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2})
+
+ subscriptions = {
+ 'C1': {'t'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''),
+ }
+ assert_assignment(assignment, expected_assignment)
+
+ subscriptions = {
+ 'C1': {'t'},
+ 'C2': {'t'},
+ }
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(
+ topics, assignment[member].partitions() if member in assignment else []
+ )
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+
+ subscriptions = {
+ 'C2': {'t'},
+ }
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, assignment[member].partitions())
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+ assert len(assignment['C2'].assignment[0][1]) == 3
+
+
+def test_sticky_add_remove_topic_two_consumers(mocker):
+ cluster = create_cluster(mocker, topics={'t1', 't2'}, topics_partitions={0, 1, 2})
+
+ subscriptions = {
+ 'C1': {'t1'},
+ 'C2': {'t1'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0, 2])], b''),
+ 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [1])], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+ subscriptions = {
+ 'C1': {'t1', 't2'},
+ 'C2': {'t1', 't2'},
+ }
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0, 2]), ('t2', [1])], b''),
+ 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [1]), ('t2', [0, 2])], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+ subscriptions = {
+ 'C1': {'t2'},
+ 'C2': {'t2'},
+ }
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t2', [1])], b''),
+ 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t2', [0, 2])], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_sticky_reassignment_after_one_consumer_leaves(mocker):
+ partitions = dict([('t{}'.format(i), set(range(i))) for i in range(1, 20)])
+ cluster = create_cluster(
+ mocker, topics=set(['t{}'.format(i) for i in range(1, 20)]), topic_partitions_lambda=lambda t: partitions[t]
+ )
+
+ subscriptions = {}
+ for i in range(1, 20):
+ topics = set()
+ for j in range(1, i + 1):
+ topics.add('t{}'.format(j))
+ subscriptions['C{}'.format(i)] = topics
+
+ member_metadata = make_member_metadata(subscriptions)
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+
+ del subscriptions['C10']
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, assignment[member].partitions())
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+ assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
+
+
+def test_sticky_reassignment_after_one_consumer_added(mocker):
+ cluster = create_cluster(mocker, topics={'t'}, topics_partitions=set(range(20)))
+
+ subscriptions = defaultdict(set)
+ for i in range(1, 10):
+ subscriptions['C{}'.format(i)] = {'t'}
+
+ member_metadata = make_member_metadata(subscriptions)
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+
+ subscriptions['C10'] = {'t'}
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(
+ topics, assignment[member].partitions() if member in assignment else []
+ )
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+ assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
+
+
+def test_sticky_same_subscriptions(mocker):
+ partitions = dict([('t{}'.format(i), set(range(i))) for i in range(1, 15)])
+ cluster = create_cluster(
+ mocker, topics=set(['t{}'.format(i) for i in range(1, 15)]), topic_partitions_lambda=lambda t: partitions[t]
+ )
+
+ subscriptions = defaultdict(set)
+ for i in range(1, 9):
+ for j in range(1, len(six.viewkeys(partitions)) + 1):
+ subscriptions['C{}'.format(i)].add('t{}'.format(j))
+
+ member_metadata = make_member_metadata(subscriptions)
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+
+ del subscriptions['C5']
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, assignment[member].partitions())
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+ assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
+
+
+def test_sticky_large_assignment_with_multiple_consumers_leaving(mocker):
+ n_topics = 40
+ n_consumers = 200
+
+ all_topics = set(['t{}'.format(i) for i in range(1, n_topics + 1)])
+ partitions = dict([(t, set(range(1, randint(0, 10) + 1))) for t in all_topics])
+ cluster = create_cluster(mocker, topics=all_topics, topic_partitions_lambda=lambda t: partitions[t])
+
+ subscriptions = defaultdict(set)
+ for i in range(1, n_consumers + 1):
+ for j in range(0, randint(1, 20)):
+ subscriptions['C{}'.format(i)].add('t{}'.format(randint(1, n_topics)))
+
+ member_metadata = make_member_metadata(subscriptions)
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, assignment[member].partitions())
+
+ for i in range(50):
+ member = 'C{}'.format(randint(1, n_consumers))
+ if member in subscriptions:
+ del subscriptions[member]
+ del member_metadata[member]
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+ assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
+
+
+def test_new_subscription(mocker):
+ cluster = create_cluster(mocker, topics={'t1', 't2', 't3', 't4'}, topics_partitions={0})
+
+ subscriptions = defaultdict(set)
+ for i in range(3):
+ for j in range(i, 3 * i - 2 + 1):
+ subscriptions['C{}'.format(i)].add('t{}'.format(j))
+
+ member_metadata = make_member_metadata(subscriptions)
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+
+ subscriptions['C0'].add('t1')
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, [])
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+ assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
+
+
+def test_move_existing_assignments(mocker):
+ cluster = create_cluster(mocker, topics={'t1', 't2', 't3', 't4', 't5', 't6'}, topics_partitions={0})
+
+ subscriptions = {
+ 'C1': {'t1', 't2'},
+ 'C2': {'t1', 't2', 't3', 't4'},
+ 'C3': {'t2', 't3', 't4', 't5', 't6'},
+ }
+ member_assignments = {
+ 'C1': [TopicPartition('t1', 0)],
+ 'C2': [TopicPartition('t2', 0), TopicPartition('t3', 0)],
+ 'C3': [TopicPartition('t4', 0), TopicPartition('t5', 0), TopicPartition('t6', 0)],
+ }
+
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, member_assignments[member])
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+
+
+def test_stickiness(mocker):
+ cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2})
+ subscriptions = {
+ 'C1': {'t'},
+ 'C2': {'t'},
+ 'C3': {'t'},
+ 'C4': {'t'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+ partitions_assigned = {}
+ for consumer, consumer_assignment in six.iteritems(assignment):
+ assert (
+ len(consumer_assignment.partitions()) <= 1
+ ), 'Consumer {} is assigned more topic partitions than expected.'.format(consumer)
+ if len(consumer_assignment.partitions()) == 1:
+ partitions_assigned[consumer] = consumer_assignment.partitions()[0]
+
+ # removing the potential group leader
+ del subscriptions['C1']
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, assignment[member].partitions())
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+ assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
+
+ for consumer, consumer_assignment in six.iteritems(assignment):
+ assert (
+ len(consumer_assignment.partitions()) <= 1
+ ), 'Consumer {} is assigned more topic partitions than expected.'.format(consumer)
+ assert (
+ consumer not in partitions_assigned or partitions_assigned[consumer] in consumer_assignment.partitions()
+ ), 'Stickiness was not honored for consumer {}'.format(consumer)
+
+
+def test_assignment_updated_for_deleted_topic(mocker):
+ def topic_partitions(topic):
+ if topic == 't1':
+ return {0}
+ if topic == 't3':
+ return set(range(100))
+
+ cluster = create_cluster(mocker, topics={'t1', 't3'}, topic_partitions_lambda=topic_partitions)
+
+ subscriptions = {
+ 'C': {'t1', 't2', 't3'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0]), ('t3', list(range(100)))], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_no_exceptions_when_only_subscribed_topic_is_deleted(mocker):
+ cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2})
+
+ subscriptions = {
+ 'C': {'t'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+ subscriptions = {
+ 'C': {},
+ }
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
+
+ cluster = create_cluster(mocker, topics={}, topics_partitions={})
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_conflicting_previous_assignments(mocker):
+ cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1})
+
+ subscriptions = {
+ 'C1': {'t'},
+ 'C2': {'t'},
+ }
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ # assume both C1 and C2 have partition 1 assigned to them in generation 1
+ member_metadata[member] = build_metadata(topics, [TopicPartition('t', 0), TopicPartition('t', 0)], 1)
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+
+
+@pytest.mark.parametrize(
+ 'execution_number,n_topics,n_consumers', [(i, randint(10, 20), randint(20, 40)) for i in range(100)]
+)
+def test_reassignment_with_random_subscriptions_and_changes(mocker, execution_number, n_topics, n_consumers):
+ all_topics = set(['t{}'.format(i) for i in range(1, n_topics + 1)])
+ partitions = dict([(t, set(range(1, i + 1))) for i, t in enumerate(all_topics)])
+ cluster = create_cluster(mocker, topics=all_topics, topic_partitions_lambda=lambda t: partitions[t])
+
+ subscriptions = defaultdict(set)
+ for i in range(n_consumers):
+ topics_sample = sample(all_topics, randint(1, len(all_topics) - 1))
+ subscriptions['C{}'.format(i)].update(topics_sample)
+
+ member_metadata = make_member_metadata(subscriptions)
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+
+ subscriptions = defaultdict(set)
+ for i in range(n_consumers):
+ topics_sample = sample(all_topics, randint(1, len(all_topics) - 1))
+ subscriptions['C{}'.format(i)].update(topics_sample)
+
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, assignment[member].partitions())
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+ assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
+
+
+def test_assignment_with_multiple_generations1(mocker):
+ cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5})
+
+ member_metadata = {
+ 'C1': build_metadata({'t'}, []),
+ 'C2': build_metadata({'t'}, []),
+ 'C3': build_metadata({'t'}, []),
+ }
+
+ assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment1)
+ assert len(assignment1['C1'].assignment[0][1]) == 2
+ assert len(assignment1['C2'].assignment[0][1]) == 2
+ assert len(assignment1['C3'].assignment[0][1]) == 2
+
+ member_metadata = {
+ 'C1': build_metadata({'t'}, assignment1['C1'].partitions()),
+ 'C2': build_metadata({'t'}, assignment1['C2'].partitions()),
+ }
+
+ assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}}, assignment2)
+ assert len(assignment2['C1'].assignment[0][1]) == 3
+ assert len(assignment2['C2'].assignment[0][1]) == 3
+ assert all([partition in assignment2['C1'].assignment[0][1] for partition in assignment1['C1'].assignment[0][1]])
+ assert all([partition in assignment2['C2'].assignment[0][1] for partition in assignment1['C2'].assignment[0][1]])
+ assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
+
+ member_metadata = {
+ 'C2': build_metadata({'t'}, assignment2['C2'].partitions(), 2),
+ 'C3': build_metadata({'t'}, assignment1['C3'].partitions(), 1),
+ }
+
+ assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance({'C2': {'t'}, 'C3': {'t'}}, assignment3)
+ assert len(assignment3['C2'].assignment[0][1]) == 3
+ assert len(assignment3['C3'].assignment[0][1]) == 3
+ assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
+
+
+def test_assignment_with_multiple_generations2(mocker):
+ cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5})
+
+ member_metadata = {
+ 'C1': build_metadata({'t'}, []),
+ 'C2': build_metadata({'t'}, []),
+ 'C3': build_metadata({'t'}, []),
+ }
+
+ assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment1)
+ assert len(assignment1['C1'].assignment[0][1]) == 2
+ assert len(assignment1['C2'].assignment[0][1]) == 2
+ assert len(assignment1['C3'].assignment[0][1]) == 2
+
+ member_metadata = {
+ 'C2': build_metadata({'t'}, assignment1['C2'].partitions(), 1),
+ }
+
+ assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance({'C2': {'t'}}, assignment2)
+ assert len(assignment2['C2'].assignment[0][1]) == 6
+ assert all([partition in assignment2['C2'].assignment[0][1] for partition in assignment1['C2'].assignment[0][1]])
+ assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
+
+ member_metadata = {
+ 'C1': build_metadata({'t'}, assignment1['C1'].partitions(), 1),
+ 'C2': build_metadata({'t'}, assignment2['C2'].partitions(), 2),
+ 'C3': build_metadata({'t'}, assignment1['C3'].partitions(), 1),
+ }
+
+ assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment3)
+ assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
+ assert set(assignment3['C1'].assignment[0][1]) == set(assignment1['C1'].assignment[0][1])
+ assert set(assignment3['C2'].assignment[0][1]) == set(assignment1['C2'].assignment[0][1])
+ assert set(assignment3['C3'].assignment[0][1]) == set(assignment1['C3'].assignment[0][1])
+
+
+@pytest.mark.parametrize('execution_number', range(50))
+def test_assignment_with_conflicting_previous_generations(mocker, execution_number):
+ cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5})
+
+ member_assignments = {
+ 'C1': [TopicPartition('t', p) for p in {0, 1, 4}],
+ 'C2': [TopicPartition('t', p) for p in {0, 2, 3}],
+ 'C3': [TopicPartition('t', p) for p in {3, 4, 5}],
+ }
+ member_generations = {
+ 'C1': 1,
+ 'C2': 1,
+ 'C3': 2,
+ }
+ member_metadata = {}
+ for member in six.iterkeys(member_assignments):
+ member_metadata[member] = build_metadata({'t'}, member_assignments[member], member_generations[member])
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment)
+ assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
+
+
+def make_member_metadata(subscriptions):
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, [])
+ return member_metadata
+
+
+def build_metadata(topics, member_assignment_partitions, generation=-1):
+ partitions_by_topic = defaultdict(list)
+ for topic_partition in member_assignment_partitions:
+ partitions_by_topic[topic_partition.topic].append(topic_partition.partition)
+ data = StickyAssignorUserDataV1(six.viewitems(partitions_by_topic), generation)
+ user_data = data.encode()
+ return ConsumerProtocolMemberMetadata(StickyPartitionAssignor.version, list(topics), user_data)
+
+
+def assert_assignment(result_assignment, expected_assignment):
+ assert result_assignment == expected_assignment
+ assert set(result_assignment) == set(expected_assignment)
+ for member in result_assignment:
+ assert result_assignment[member].encode() == expected_assignment[member].encode()
+
+
+def verify_validity_and_balance(subscriptions, assignment):
+ """
+ Verifies that the given assignment is valid with respect to the given subscriptions
+ Validity requirements:
+ - each consumer is subscribed to topics of all partitions assigned to it, and
+ - each partition is assigned to no more than one consumer
+ Balance requirements:
+ - the assignment is fully balanced (the numbers of topic partitions assigned to consumers differ by at most one), or
+ - there is no topic partition that can be moved from one consumer to another with 2+ fewer topic partitions
+
+ :param subscriptions topic subscriptions of each consumer
+ :param assignment: given assignment for balance check
+ """
+ assert six.viewkeys(subscriptions) == six.viewkeys(assignment)
+
+ consumers = sorted(six.viewkeys(assignment))
+ for i in range(len(consumers)):
+ consumer = consumers[i]
+ partitions = assignment[consumer].partitions()
+ for partition in partitions:
+ assert partition.topic in subscriptions[consumer], (
+ 'Error: Partition {} is assigned to consumer {}, '
+ 'but it is not subscribed to topic {}\n'
+ 'Subscriptions: {}\n'
+ 'Assignments: {}'.format(partition, consumers[i], partition.topic, subscriptions, assignment)
+ )
+ if i == len(consumers) - 1:
+ continue
+
+ for j in range(i + 1, len(consumers)):
+ other_consumer = consumers[j]
+ other_partitions = assignment[other_consumer].partitions()
+ partitions_intersection = set(partitions).intersection(set(other_partitions))
+ assert partitions_intersection == set(), (
+ 'Error: Consumers {} and {} have common partitions '
+ 'assigned to them: {}\n'
+ 'Subscriptions: {}\n'
+ 'Assignments: {}'.format(consumer, other_consumer, partitions_intersection, subscriptions, assignment)
+ )
+
+ if abs(len(partitions) - len(other_partitions)) <= 1:
+ continue
+
+ assignments_by_topic = group_partitions_by_topic(partitions)
+ other_assignments_by_topic = group_partitions_by_topic(other_partitions)
+ if len(partitions) > len(other_partitions):
+ for topic in six.iterkeys(assignments_by_topic):
+ assert topic not in other_assignments_by_topic, (
+ 'Error: Some partitions can be moved from {} ({} partitions) '
+ 'to {} ({} partitions) '
+ 'to achieve a better balance\n'
+ 'Subscriptions: {}\n'
+ 'Assignments: {}'.format(consumer, len(partitions), other_consumer, len(other_partitions), subscriptions, assignment)
+ )
+ if len(other_partitions) > len(partitions):
+ for topic in six.iterkeys(other_assignments_by_topic):
+ assert topic not in assignments_by_topic, (
+ 'Error: Some partitions can be moved from {} ({} partitions) '
+ 'to {} ({} partitions) '
+ 'to achieve a better balance\n'
+ 'Subscriptions: {}\n'
+ 'Assignments: {}'.format(other_consumer, len(other_partitions), consumer, len(partitions), subscriptions, assignment)
+ )
+
+
+def group_partitions_by_topic(partitions):
+ result = defaultdict(set)
+ for p in partitions:
+ result[p.topic].add(p.partition)
+ return result
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index ea8f84b..a35cdd1 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -9,6 +9,7 @@ from kafka.consumer.subscription_state import (
SubscriptionState, ConsumerRebalanceListener)
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
+from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor
from kafka.coordinator.base import Generation, MemberState, HeartbeatThread
from kafka.coordinator.consumer import ConsumerCoordinator
from kafka.coordinator.protocol import (
@@ -77,6 +78,10 @@ def test_group_protocols(coordinator):
RoundRobinPartitionAssignor.version,
['foobar'],
b'')),
+ ('sticky', ConsumerProtocolMemberMetadata(
+ StickyPartitionAssignor.version,
+ ['foobar'],
+ b'')),
]
@@ -95,7 +100,7 @@ def test_pattern_subscription(coordinator, api_version):
[(0, 'fizz', []),
(0, 'foo1', [(0, 0, 0, [], [])]),
(0, 'foo2', [(0, 0, 1, [], [])])]))
- assert coordinator._subscription.subscription == set(['foo1', 'foo2'])
+ assert coordinator._subscription.subscription == {'foo1', 'foo2'}
# 0.9 consumers should trigger dynamic partition assignment
if api_version >= (0, 9):
@@ -103,14 +108,14 @@ def test_pattern_subscription(coordinator, api_version):
# earlier consumers get all partitions assigned locally
else:
- assert set(coordinator._subscription.assignment.keys()) == set([
- TopicPartition('foo1', 0),
- TopicPartition('foo2', 0)])
+ assert set(coordinator._subscription.assignment.keys()) == {TopicPartition('foo1', 0),
+ TopicPartition('foo2', 0)}
def test_lookup_assignor(coordinator):
assert coordinator._lookup_assignor('roundrobin') is RoundRobinPartitionAssignor
assert coordinator._lookup_assignor('range') is RangePartitionAssignor
+ assert coordinator._lookup_assignor('sticky') is StickyPartitionAssignor
assert coordinator._lookup_assignor('foobar') is None
@@ -121,10 +126,25 @@ def test_join_complete(mocker, coordinator):
mocker.spy(assignor, 'on_assignment')
assert assignor.on_assignment.call_count == 0
assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'')
- coordinator._on_join_complete(
- 0, 'member-foo', 'roundrobin', assignment.encode())
+ coordinator._on_join_complete(0, 'member-foo', 'roundrobin', assignment.encode())
+ assert assignor.on_assignment.call_count == 1
+ assignor.on_assignment.assert_called_with(assignment)
+
+
+def test_join_complete_with_sticky_assignor(mocker, coordinator):
+ coordinator._subscription.subscribe(topics=['foobar'])
+ assignor = StickyPartitionAssignor()
+ coordinator.config['assignors'] = (assignor,)
+ mocker.spy(assignor, 'on_assignment')
+ mocker.spy(assignor, 'on_generation_assignment')
+ assert assignor.on_assignment.call_count == 0
+ assert assignor.on_generation_assignment.call_count == 0
+ assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'')
+ coordinator._on_join_complete(0, 'member-foo', 'sticky', assignment.encode())
assert assignor.on_assignment.call_count == 1
+ assert assignor.on_generation_assignment.call_count == 1
assignor.on_assignment.assert_called_with(assignment)
+ assignor.on_generation_assignment.assert_called_with(0)
def test_subscription_listener(mocker, coordinator):
@@ -141,9 +161,7 @@ def test_subscription_listener(mocker, coordinator):
coordinator._on_join_complete(
0, 'member-foo', 'roundrobin', assignment.encode())
assert listener.on_partitions_assigned.call_count == 1
- listener.on_partitions_assigned.assert_called_with(set([
- TopicPartition('foobar', 0),
- TopicPartition('foobar', 1)]))
+ listener.on_partitions_assigned.assert_called_with({TopicPartition('foobar', 0), TopicPartition('foobar', 1)})
def test_subscription_listener_failure(mocker, coordinator):
diff --git a/test/test_partition_movements.py b/test/test_partition_movements.py
new file mode 100644
index 0000000..bc990bf
--- /dev/null
+++ b/test/test_partition_movements.py
@@ -0,0 +1,23 @@
+from kafka.structs import TopicPartition
+
+from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements
+
+
+def test_empty_movements_are_sticky():
+ partition_movements = PartitionMovements()
+ assert partition_movements.are_sticky()
+
+
+def test_sticky_movements():
+ partition_movements = PartitionMovements()
+ partition_movements.move_partition(TopicPartition('t', 1), 'C1', 'C2')
+ partition_movements.move_partition(TopicPartition('t', 1), 'C2', 'C3')
+ partition_movements.move_partition(TopicPartition('t', 1), 'C3', 'C1')
+ assert partition_movements.are_sticky()
+
+
+def test_should_detect_non_sticky_assignment():
+ partition_movements = PartitionMovements()
+ partition_movements.move_partition(TopicPartition('t', 1), 'C1', 'C2')
+ partition_movements.move_partition(TopicPartition('t', 2), 'C2', 'C1')
+ assert not partition_movements.are_sticky()