summaryrefslogtreecommitdiff
path: root/test/test_assignors.py
diff options
context:
space:
mode:
authorValeria Chernenko <aynroot@users.noreply.github.com>2020-09-30 06:03:54 +0200
committerGitHub <noreply@github.com>2020-09-29 21:03:54 -0700
commitc536dd28bc3c2db85d9b62a1e73d23a3eeaebd93 (patch)
tree40b412379666620a8a5173932652e94bdc9439b2 /test/test_assignors.py
parentcb96a1a6c79c17ac9b3399b7a33bbaea7ad8886f (diff)
downloadkafka-python-c536dd28bc3c2db85d9b62a1e73d23a3eeaebd93.tar.gz
KIP-54: Implement sticky partition assignment strategy (#2057)
Diffstat (limited to 'test/test_assignors.py')
-rw-r--r--test/test_assignors.py843
1 files changed, 833 insertions, 10 deletions
diff --git a/test/test_assignors.py b/test/test_assignors.py
index 0821caf..016ff8e 100644
--- a/test/test_assignors.py
+++ b/test/test_assignors.py
@@ -1,28 +1,45 @@
# pylint: skip-file
from __future__ import absolute_import
+from collections import defaultdict
+from random import randint, sample
+
import pytest
+from kafka.structs import TopicPartition
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
-from kafka.coordinator.protocol import ConsumerProtocolMemberAssignment
+from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor, StickyAssignorUserDataV1
+from kafka.coordinator.protocol import ConsumerProtocolMemberAssignment, ConsumerProtocolMemberMetadata
+from kafka.vendor import six
+
+@pytest.fixture(autouse=True)
+def reset_sticky_assignor():
+ yield
+ StickyPartitionAssignor.member_assignment = None
+ StickyPartitionAssignor.generation = -1
-@pytest.fixture
-def cluster(mocker):
+
+def create_cluster(mocker, topics, topics_partitions=None, topic_partitions_lambda=None):
cluster = mocker.MagicMock()
- cluster.partitions_for_topic.return_value = set([0, 1, 2])
+ cluster.topics.return_value = topics
+ if topics_partitions is not None:
+ cluster.partitions_for_topic.return_value = topics_partitions
+ if topic_partitions_lambda is not None:
+ cluster.partitions_for_topic.side_effect = topic_partitions_lambda
return cluster
-def test_assignor_roundrobin(cluster):
+def test_assignor_roundrobin(mocker):
assignor = RoundRobinPartitionAssignor
member_metadata = {
- 'C0': assignor.metadata(set(['t0', 't1'])),
- 'C1': assignor.metadata(set(['t0', 't1'])),
+ 'C0': assignor.metadata({'t0', 't1'}),
+ 'C1': assignor.metadata({'t0', 't1'}),
}
+ cluster = create_cluster(mocker, {'t0', 't1'}, topics_partitions={0, 1, 2})
ret = assignor.assign(cluster, member_metadata)
expected = {
'C0': ConsumerProtocolMemberAssignment(
@@ -36,14 +53,15 @@ def test_assignor_roundrobin(cluster):
assert ret[member].encode() == expected[member].encode()
-def test_assignor_range(cluster):
+def test_assignor_range(mocker):
assignor = RangePartitionAssignor
member_metadata = {
- 'C0': assignor.metadata(set(['t0', 't1'])),
- 'C1': assignor.metadata(set(['t0', 't1'])),
+ 'C0': assignor.metadata({'t0', 't1'}),
+ 'C1': assignor.metadata({'t0', 't1'}),
}
+ cluster = create_cluster(mocker, {'t0', 't1'}, topics_partitions={0, 1, 2})
ret = assignor.assign(cluster, member_metadata)
expected = {
'C0': ConsumerProtocolMemberAssignment(
@@ -55,3 +73,808 @@ def test_assignor_range(cluster):
assert set(ret) == set(expected)
for member in ret:
assert ret[member].encode() == expected[member].encode()
+
+
+def test_sticky_assignor1(mocker):
+ """
+ Given: there are three consumers C0, C1, C2,
+ four topics t0, t1, t2, t3, and each topic has 2 partitions,
+ resulting in partitions t0p0, t0p1, t1p0, t1p1, t2p0, t2p1, t3p0, t3p1.
+ Each consumer is subscribed to all three topics.
+ Then: perform fresh assignment
+ Expected: the assignment is
+ - C0: [t0p0, t1p1, t3p0]
+ - C1: [t0p1, t2p0, t3p1]
+ - C2: [t1p0, t2p1]
+ Then: remove C1 consumer and perform the reassignment
+ Expected: the new assignment is
+ - C0 [t0p0, t1p1, t2p0, t3p0]
+ - C2 [t0p1, t1p0, t2p1, t3p1]
+ """
+ cluster = create_cluster(mocker, topics={'t0', 't1', 't2', 't3'}, topics_partitions={0, 1})
+
+ subscriptions = {
+ 'C0': {'t0', 't1', 't2', 't3'},
+ 'C1': {'t0', 't1', 't2', 't3'},
+ 'C2': {'t0', 't1', 't2', 't3'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C0': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t0', [0]), ('t1', [1]), ('t3', [0])], b''),
+ 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t0', [1]), ('t2', [0]), ('t3', [1])], b''),
+ 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0]), ('t2', [1])], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+ del subscriptions['C1']
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C0': ConsumerProtocolMemberAssignment(
+ StickyPartitionAssignor.version, [('t0', [0]), ('t1', [1]), ('t2', [0]), ('t3', [0])], b''
+ ),
+ 'C2': ConsumerProtocolMemberAssignment(
+ StickyPartitionAssignor.version, [('t0', [1]), ('t1', [0]), ('t2', [1]), ('t3', [1])], b''
+ ),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_sticky_assignor2(mocker):
+ """
+ Given: there are three consumers C0, C1, C2,
+ and three topics t0, t1, t2, with 1, 2, and 3 partitions respectively.
+ Therefore, the partitions are t0p0, t1p0, t1p1, t2p0, t2p1, t2p2.
+ C0 is subscribed to t0;
+ C1 is subscribed to t0, t1;
+ and C2 is subscribed to t0, t1, t2.
+ Then: perform the assignment
+ Expected: the assignment is
+ - C0 [t0p0]
+ - C1 [t1p0, t1p1]
+ - C2 [t2p0, t2p1, t2p2]
+ Then: remove C0 and perform the assignment
+ Expected: the assignment is
+ - C1 [t0p0, t1p0, t1p1]
+ - C2 [t2p0, t2p1, t2p2]
+ """
+
+ partitions = {'t0': {0}, 't1': {0, 1}, 't2': {0, 1, 2}}
+ cluster = create_cluster(mocker, topics={'t0', 't1', 't2'}, topic_partitions_lambda=lambda t: partitions[t])
+
+ subscriptions = {
+ 'C0': {'t0'},
+ 'C1': {'t0', 't1'},
+ 'C2': {'t0', 't1', 't2'},
+ }
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, [])
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C0': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t0', [0])], b''),
+ 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0, 1])], b''),
+ 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t2', [0, 1, 2])], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+ del subscriptions['C0']
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t0', [0]), ('t1', [0, 1])], b''),
+ 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t2', [0, 1, 2])], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_sticky_one_consumer_no_topic(mocker):
+ cluster = create_cluster(mocker, topics={}, topics_partitions={})
+
+ subscriptions = {
+ 'C': set(),
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_sticky_one_consumer_nonexisting_topic(mocker):
+ cluster = create_cluster(mocker, topics={}, topics_partitions={})
+
+ subscriptions = {
+ 'C': {'t'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_sticky_one_consumer_one_topic(mocker):
+ cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2})
+
+ subscriptions = {
+ 'C': {'t'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_sticky_should_only_assign_partitions_from_subscribed_topics(mocker):
+ cluster = create_cluster(mocker, topics={'t', 'other-t'}, topics_partitions={0, 1, 2})
+
+ subscriptions = {
+ 'C': {'t'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_sticky_one_consumer_multiple_topics(mocker):
+ cluster = create_cluster(mocker, topics={'t1', 't2'}, topics_partitions={0, 1, 2})
+
+ subscriptions = {
+ 'C': {'t1', 't2'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0, 1, 2]), ('t2', [0, 1, 2])], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_sticky_two_consumers_one_topic_one_partition(mocker):
+ cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0})
+
+ subscriptions = {
+ 'C1': {'t'},
+ 'C2': {'t'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0])], b''),
+ 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_sticky_two_consumers_one_topic_two_partitions(mocker):
+ cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1})
+
+ subscriptions = {
+ 'C1': {'t'},
+ 'C2': {'t'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0])], b''),
+ 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [1])], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_sticky_multiple_consumers_mixed_topic_subscriptions(mocker):
+ partitions = {'t1': {0, 1, 2}, 't2': {0, 1}}
+ cluster = create_cluster(mocker, topics={'t1', 't2'}, topic_partitions_lambda=lambda t: partitions[t])
+
+ subscriptions = {
+ 'C1': {'t1'},
+ 'C2': {'t1', 't2'},
+ 'C3': {'t1'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0, 2])], b''),
+ 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t2', [0, 1])], b''),
+ 'C3': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [1])], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_sticky_add_remove_consumer_one_topic(mocker):
+ cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2})
+
+ subscriptions = {
+ 'C1': {'t'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''),
+ }
+ assert_assignment(assignment, expected_assignment)
+
+ subscriptions = {
+ 'C1': {'t'},
+ 'C2': {'t'},
+ }
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(
+ topics, assignment[member].partitions() if member in assignment else []
+ )
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+
+ subscriptions = {
+ 'C2': {'t'},
+ }
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, assignment[member].partitions())
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+ assert len(assignment['C2'].assignment[0][1]) == 3
+
+
+def test_sticky_add_remove_topic_two_consumers(mocker):
+ cluster = create_cluster(mocker, topics={'t1', 't2'}, topics_partitions={0, 1, 2})
+
+ subscriptions = {
+ 'C1': {'t1'},
+ 'C2': {'t1'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0, 2])], b''),
+ 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [1])], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+ subscriptions = {
+ 'C1': {'t1', 't2'},
+ 'C2': {'t1', 't2'},
+ }
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0, 2]), ('t2', [1])], b''),
+ 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [1]), ('t2', [0, 2])], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+ subscriptions = {
+ 'C1': {'t2'},
+ 'C2': {'t2'},
+ }
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t2', [1])], b''),
+ 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t2', [0, 2])], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_sticky_reassignment_after_one_consumer_leaves(mocker):
+ partitions = dict([('t{}'.format(i), set(range(i))) for i in range(1, 20)])
+ cluster = create_cluster(
+ mocker, topics=set(['t{}'.format(i) for i in range(1, 20)]), topic_partitions_lambda=lambda t: partitions[t]
+ )
+
+ subscriptions = {}
+ for i in range(1, 20):
+ topics = set()
+ for j in range(1, i + 1):
+ topics.add('t{}'.format(j))
+ subscriptions['C{}'.format(i)] = topics
+
+ member_metadata = make_member_metadata(subscriptions)
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+
+ del subscriptions['C10']
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, assignment[member].partitions())
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+ assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
+
+
+def test_sticky_reassignment_after_one_consumer_added(mocker):
+ cluster = create_cluster(mocker, topics={'t'}, topics_partitions=set(range(20)))
+
+ subscriptions = defaultdict(set)
+ for i in range(1, 10):
+ subscriptions['C{}'.format(i)] = {'t'}
+
+ member_metadata = make_member_metadata(subscriptions)
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+
+ subscriptions['C10'] = {'t'}
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(
+ topics, assignment[member].partitions() if member in assignment else []
+ )
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+ assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
+
+
+def test_sticky_same_subscriptions(mocker):
+ partitions = dict([('t{}'.format(i), set(range(i))) for i in range(1, 15)])
+ cluster = create_cluster(
+ mocker, topics=set(['t{}'.format(i) for i in range(1, 15)]), topic_partitions_lambda=lambda t: partitions[t]
+ )
+
+ subscriptions = defaultdict(set)
+ for i in range(1, 9):
+ for j in range(1, len(six.viewkeys(partitions)) + 1):
+ subscriptions['C{}'.format(i)].add('t{}'.format(j))
+
+ member_metadata = make_member_metadata(subscriptions)
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+
+ del subscriptions['C5']
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, assignment[member].partitions())
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+ assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
+
+
+def test_sticky_large_assignment_with_multiple_consumers_leaving(mocker):
+ n_topics = 40
+ n_consumers = 200
+
+ all_topics = set(['t{}'.format(i) for i in range(1, n_topics + 1)])
+ partitions = dict([(t, set(range(1, randint(0, 10) + 1))) for t in all_topics])
+ cluster = create_cluster(mocker, topics=all_topics, topic_partitions_lambda=lambda t: partitions[t])
+
+ subscriptions = defaultdict(set)
+ for i in range(1, n_consumers + 1):
+ for j in range(0, randint(1, 20)):
+ subscriptions['C{}'.format(i)].add('t{}'.format(randint(1, n_topics)))
+
+ member_metadata = make_member_metadata(subscriptions)
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, assignment[member].partitions())
+
+ for i in range(50):
+ member = 'C{}'.format(randint(1, n_consumers))
+ if member in subscriptions:
+ del subscriptions[member]
+ del member_metadata[member]
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+ assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
+
+
+def test_new_subscription(mocker):
+ cluster = create_cluster(mocker, topics={'t1', 't2', 't3', 't4'}, topics_partitions={0})
+
+ subscriptions = defaultdict(set)
+ for i in range(3):
+ for j in range(i, 3 * i - 2 + 1):
+ subscriptions['C{}'.format(i)].add('t{}'.format(j))
+
+ member_metadata = make_member_metadata(subscriptions)
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+
+ subscriptions['C0'].add('t1')
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, [])
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+ assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
+
+
+def test_move_existing_assignments(mocker):
+ cluster = create_cluster(mocker, topics={'t1', 't2', 't3', 't4', 't5', 't6'}, topics_partitions={0})
+
+ subscriptions = {
+ 'C1': {'t1', 't2'},
+ 'C2': {'t1', 't2', 't3', 't4'},
+ 'C3': {'t2', 't3', 't4', 't5', 't6'},
+ }
+ member_assignments = {
+ 'C1': [TopicPartition('t1', 0)],
+ 'C2': [TopicPartition('t2', 0), TopicPartition('t3', 0)],
+ 'C3': [TopicPartition('t4', 0), TopicPartition('t5', 0), TopicPartition('t6', 0)],
+ }
+
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, member_assignments[member])
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+
+
+def test_stickiness(mocker):
+ cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2})
+ subscriptions = {
+ 'C1': {'t'},
+ 'C2': {'t'},
+ 'C3': {'t'},
+ 'C4': {'t'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+ partitions_assigned = {}
+ for consumer, consumer_assignment in six.iteritems(assignment):
+ assert (
+ len(consumer_assignment.partitions()) <= 1
+ ), 'Consumer {} is assigned more topic partitions than expected.'.format(consumer)
+ if len(consumer_assignment.partitions()) == 1:
+ partitions_assigned[consumer] = consumer_assignment.partitions()[0]
+
+ # removing the potential group leader
+ del subscriptions['C1']
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, assignment[member].partitions())
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+ assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
+
+ for consumer, consumer_assignment in six.iteritems(assignment):
+ assert (
+ len(consumer_assignment.partitions()) <= 1
+ ), 'Consumer {} is assigned more topic partitions than expected.'.format(consumer)
+ assert (
+ consumer not in partitions_assigned or partitions_assigned[consumer] in consumer_assignment.partitions()
+ ), 'Stickiness was not honored for consumer {}'.format(consumer)
+
+
+def test_assignment_updated_for_deleted_topic(mocker):
+ def topic_partitions(topic):
+ if topic == 't1':
+ return {0}
+ if topic == 't3':
+ return set(range(100))
+
+ cluster = create_cluster(mocker, topics={'t1', 't3'}, topic_partitions_lambda=topic_partitions)
+
+ subscriptions = {
+ 'C': {'t1', 't2', 't3'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0]), ('t3', list(range(100)))], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_no_exceptions_when_only_subscribed_topic_is_deleted(mocker):
+ cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2})
+
+ subscriptions = {
+ 'C': {'t'},
+ }
+ member_metadata = make_member_metadata(subscriptions)
+
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+ subscriptions = {
+ 'C': {},
+ }
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
+
+ cluster = create_cluster(mocker, topics={}, topics_partitions={})
+ sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ expected_assignment = {
+ 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [], b''),
+ }
+ assert_assignment(sticky_assignment, expected_assignment)
+
+
+def test_conflicting_previous_assignments(mocker):
+ cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1})
+
+ subscriptions = {
+ 'C1': {'t'},
+ 'C2': {'t'},
+ }
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ # assume both C1 and C2 have partition 1 assigned to them in generation 1
+ member_metadata[member] = build_metadata(topics, [TopicPartition('t', 0), TopicPartition('t', 0)], 1)
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+
+
+@pytest.mark.parametrize(
+ 'execution_number,n_topics,n_consumers', [(i, randint(10, 20), randint(20, 40)) for i in range(100)]
+)
+def test_reassignment_with_random_subscriptions_and_changes(mocker, execution_number, n_topics, n_consumers):
+ all_topics = set(['t{}'.format(i) for i in range(1, n_topics + 1)])
+ partitions = dict([(t, set(range(1, i + 1))) for i, t in enumerate(all_topics)])
+ cluster = create_cluster(mocker, topics=all_topics, topic_partitions_lambda=lambda t: partitions[t])
+
+ subscriptions = defaultdict(set)
+ for i in range(n_consumers):
+ topics_sample = sample(all_topics, randint(1, len(all_topics) - 1))
+ subscriptions['C{}'.format(i)].update(topics_sample)
+
+ member_metadata = make_member_metadata(subscriptions)
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+
+ subscriptions = defaultdict(set)
+ for i in range(n_consumers):
+ topics_sample = sample(all_topics, randint(1, len(all_topics) - 1))
+ subscriptions['C{}'.format(i)].update(topics_sample)
+
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, assignment[member].partitions())
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance(subscriptions, assignment)
+ assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
+
+
+def test_assignment_with_multiple_generations1(mocker):
+ cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5})
+
+ member_metadata = {
+ 'C1': build_metadata({'t'}, []),
+ 'C2': build_metadata({'t'}, []),
+ 'C3': build_metadata({'t'}, []),
+ }
+
+ assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment1)
+ assert len(assignment1['C1'].assignment[0][1]) == 2
+ assert len(assignment1['C2'].assignment[0][1]) == 2
+ assert len(assignment1['C3'].assignment[0][1]) == 2
+
+ member_metadata = {
+ 'C1': build_metadata({'t'}, assignment1['C1'].partitions()),
+ 'C2': build_metadata({'t'}, assignment1['C2'].partitions()),
+ }
+
+ assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}}, assignment2)
+ assert len(assignment2['C1'].assignment[0][1]) == 3
+ assert len(assignment2['C2'].assignment[0][1]) == 3
+ assert all([partition in assignment2['C1'].assignment[0][1] for partition in assignment1['C1'].assignment[0][1]])
+ assert all([partition in assignment2['C2'].assignment[0][1] for partition in assignment1['C2'].assignment[0][1]])
+ assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
+
+ member_metadata = {
+ 'C2': build_metadata({'t'}, assignment2['C2'].partitions(), 2),
+ 'C3': build_metadata({'t'}, assignment1['C3'].partitions(), 1),
+ }
+
+ assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance({'C2': {'t'}, 'C3': {'t'}}, assignment3)
+ assert len(assignment3['C2'].assignment[0][1]) == 3
+ assert len(assignment3['C3'].assignment[0][1]) == 3
+ assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
+
+
+def test_assignment_with_multiple_generations2(mocker):
+ cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5})
+
+ member_metadata = {
+ 'C1': build_metadata({'t'}, []),
+ 'C2': build_metadata({'t'}, []),
+ 'C3': build_metadata({'t'}, []),
+ }
+
+ assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment1)
+ assert len(assignment1['C1'].assignment[0][1]) == 2
+ assert len(assignment1['C2'].assignment[0][1]) == 2
+ assert len(assignment1['C3'].assignment[0][1]) == 2
+
+ member_metadata = {
+ 'C2': build_metadata({'t'}, assignment1['C2'].partitions(), 1),
+ }
+
+ assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance({'C2': {'t'}}, assignment2)
+ assert len(assignment2['C2'].assignment[0][1]) == 6
+ assert all([partition in assignment2['C2'].assignment[0][1] for partition in assignment1['C2'].assignment[0][1]])
+ assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
+
+ member_metadata = {
+ 'C1': build_metadata({'t'}, assignment1['C1'].partitions(), 1),
+ 'C2': build_metadata({'t'}, assignment2['C2'].partitions(), 2),
+ 'C3': build_metadata({'t'}, assignment1['C3'].partitions(), 1),
+ }
+
+ assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment3)
+ assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
+ assert set(assignment3['C1'].assignment[0][1]) == set(assignment1['C1'].assignment[0][1])
+ assert set(assignment3['C2'].assignment[0][1]) == set(assignment1['C2'].assignment[0][1])
+ assert set(assignment3['C3'].assignment[0][1]) == set(assignment1['C3'].assignment[0][1])
+
+
+@pytest.mark.parametrize('execution_number', range(50))
+def test_assignment_with_conflicting_previous_generations(mocker, execution_number):
+ cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5})
+
+ member_assignments = {
+ 'C1': [TopicPartition('t', p) for p in {0, 1, 4}],
+ 'C2': [TopicPartition('t', p) for p in {0, 2, 3}],
+ 'C3': [TopicPartition('t', p) for p in {3, 4, 5}],
+ }
+ member_generations = {
+ 'C1': 1,
+ 'C2': 1,
+ 'C3': 2,
+ }
+ member_metadata = {}
+ for member in six.iterkeys(member_assignments):
+ member_metadata[member] = build_metadata({'t'}, member_assignments[member], member_generations[member])
+
+ assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
+ verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment)
+ assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
+
+
+def make_member_metadata(subscriptions):
+ member_metadata = {}
+ for member, topics in six.iteritems(subscriptions):
+ member_metadata[member] = build_metadata(topics, [])
+ return member_metadata
+
+
+def build_metadata(topics, member_assignment_partitions, generation=-1):
+ partitions_by_topic = defaultdict(list)
+ for topic_partition in member_assignment_partitions:
+ partitions_by_topic[topic_partition.topic].append(topic_partition.partition)
+ data = StickyAssignorUserDataV1(six.viewitems(partitions_by_topic), generation)
+ user_data = data.encode()
+ return ConsumerProtocolMemberMetadata(StickyPartitionAssignor.version, list(topics), user_data)
+
+
+def assert_assignment(result_assignment, expected_assignment):
+ assert result_assignment == expected_assignment
+ assert set(result_assignment) == set(expected_assignment)
+ for member in result_assignment:
+ assert result_assignment[member].encode() == expected_assignment[member].encode()
+
+
+def verify_validity_and_balance(subscriptions, assignment):
+ """
+ Verifies that the given assignment is valid with respect to the given subscriptions
+ Validity requirements:
+ - each consumer is subscribed to topics of all partitions assigned to it, and
+ - each partition is assigned to no more than one consumer
+ Balance requirements:
+ - the assignment is fully balanced (the numbers of topic partitions assigned to consumers differ by at most one), or
+ - there is no topic partition that can be moved from one consumer to another with 2+ fewer topic partitions
+
+ :param subscriptions topic subscriptions of each consumer
+ :param assignment: given assignment for balance check
+ """
+ assert six.viewkeys(subscriptions) == six.viewkeys(assignment)
+
+ consumers = sorted(six.viewkeys(assignment))
+ for i in range(len(consumers)):
+ consumer = consumers[i]
+ partitions = assignment[consumer].partitions()
+ for partition in partitions:
+ assert partition.topic in subscriptions[consumer], (
+ 'Error: Partition {} is assigned to consumer {}, '
+ 'but it is not subscribed to topic {}\n'
+ 'Subscriptions: {}\n'
+ 'Assignments: {}'.format(partition, consumers[i], partition.topic, subscriptions, assignment)
+ )
+ if i == len(consumers) - 1:
+ continue
+
+ for j in range(i + 1, len(consumers)):
+ other_consumer = consumers[j]
+ other_partitions = assignment[other_consumer].partitions()
+ partitions_intersection = set(partitions).intersection(set(other_partitions))
+ assert partitions_intersection == set(), (
+ 'Error: Consumers {} and {} have common partitions '
+ 'assigned to them: {}\n'
+ 'Subscriptions: {}\n'
+ 'Assignments: {}'.format(consumer, other_consumer, partitions_intersection, subscriptions, assignment)
+ )
+
+ if abs(len(partitions) - len(other_partitions)) <= 1:
+ continue
+
+ assignments_by_topic = group_partitions_by_topic(partitions)
+ other_assignments_by_topic = group_partitions_by_topic(other_partitions)
+ if len(partitions) > len(other_partitions):
+ for topic in six.iterkeys(assignments_by_topic):
+ assert topic not in other_assignments_by_topic, (
+ 'Error: Some partitions can be moved from {} ({} partitions) '
+ 'to {} ({} partitions) '
+ 'to achieve a better balance\n'
+ 'Subscriptions: {}\n'
+ 'Assignments: {}'.format(consumer, len(partitions), other_consumer, len(other_partitions), subscriptions, assignment)
+ )
+ if len(other_partitions) > len(partitions):
+ for topic in six.iterkeys(other_assignments_by_topic):
+ assert topic not in assignments_by_topic, (
+ 'Error: Some partitions can be moved from {} ({} partitions) '
+ 'to {} ({} partitions) '
+ 'to achieve a better balance\n'
+ 'Subscriptions: {}\n'
+ 'Assignments: {}'.format(other_consumer, len(other_partitions), consumer, len(partitions), subscriptions, assignment)
+ )
+
+
+def group_partitions_by_topic(partitions):
+ result = defaultdict(set)
+ for p in partitions:
+ result[p.topic].add(p.partition)
+ return result