From b090b21f07a1c7b89afb5dc36114aa2d37c580f0 Mon Sep 17 00:00:00 2001 From: Valeria Chernenko Date: Fri, 6 Nov 2020 06:00:38 +0100 Subject: Cover sticky assignor's metadata method with tests (#2161) --- .../assignors/sticky/sticky_assignor.py | 10 ++- test/test_assignors.py | 75 ++++++++++------------ 2 files changed, 40 insertions(+), 45 deletions(-) diff --git a/kafka/coordinator/assignors/sticky/sticky_assignor.py b/kafka/coordinator/assignors/sticky/sticky_assignor.py index eb83c01..dce714f 100644 --- a/kafka/coordinator/assignors/sticky/sticky_assignor.py +++ b/kafka/coordinator/assignors/sticky/sticky_assignor.py @@ -648,15 +648,19 @@ class StickyPartitionAssignor(AbstractPartitionAssignor): @classmethod def metadata(cls, topics): - if cls.member_assignment is None: + return cls._metadata(topics, cls.member_assignment, cls.generation) + + @classmethod + def _metadata(cls, topics, member_assignment_partitions, generation=-1): + if member_assignment_partitions is None: log.debug("No member assignment available") user_data = b'' else: log.debug("Member assignment is available, generating the metadata: generation {}".format(cls.generation)) partitions_by_topic = defaultdict(list) - for topic_partition in cls.member_assignment: # pylint: disable=not-an-iterable + for topic_partition in member_assignment_partitions: partitions_by_topic[topic_partition.topic].append(topic_partition.partition) - data = StickyAssignorUserDataV1(six.viewitems(partitions_by_topic), cls.generation) + data = StickyAssignorUserDataV1(six.viewitems(partitions_by_topic), generation) user_data = data.encode() return ConsumerProtocolMemberMetadata(cls.version, list(topics), user_data) diff --git a/test/test_assignors.py b/test/test_assignors.py index 016ff8e..67e91e1 100644 --- a/test/test_assignors.py +++ b/test/test_assignors.py @@ -111,7 +111,7 @@ def test_sticky_assignor1(mocker): del subscriptions['C1'] member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions()) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions()) sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { @@ -154,7 +154,7 @@ def test_sticky_assignor2(mocker): } member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, []) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, []) sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { @@ -167,7 +167,7 @@ def test_sticky_assignor2(mocker): del subscriptions['C0'] member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions()) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions()) sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { @@ -326,7 +326,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker): } member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata( + member_metadata[member] = StickyPartitionAssignor._metadata( topics, assignment[member].partitions() if member in assignment else [] ) @@ -338,7 +338,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker): } member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, assignment[member].partitions()) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions()) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) @@ -367,7 +367,7 @@ def test_sticky_add_remove_topic_two_consumers(mocker): } member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions()) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions()) sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { @@ -382,7 +382,7 @@ def test_sticky_add_remove_topic_two_consumers(mocker): } member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions()) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions()) sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { @@ -413,7 +413,7 @@ def test_sticky_reassignment_after_one_consumer_leaves(mocker): del subscriptions['C10'] member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, assignment[member].partitions()) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions()) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) @@ -435,7 +435,7 @@ def test_sticky_reassignment_after_one_consumer_added(mocker): subscriptions['C10'] = {'t'} member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata( + member_metadata[member] = StickyPartitionAssignor._metadata( topics, assignment[member].partitions() if member in assignment else [] ) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -462,7 +462,7 @@ def test_sticky_same_subscriptions(mocker): del subscriptions['C5'] member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, assignment[member].partitions()) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions()) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) assert StickyPartitionAssignor._latest_partition_movements.are_sticky() @@ -488,7 +488,7 @@ def test_sticky_large_assignment_with_multiple_consumers_leaving(mocker): member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, assignment[member].partitions()) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions()) for i in range(50): member = 'C{}'.format(randint(1, n_consumers)) @@ -517,7 +517,7 @@ def test_new_subscription(mocker): subscriptions['C0'].add('t1') member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, []) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, []) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) @@ -540,7 +540,7 @@ def test_move_existing_assignments(mocker): member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, member_assignments[member]) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, member_assignments[member]) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) @@ -570,7 +570,7 @@ def test_stickiness(mocker): del subscriptions['C1'] member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, assignment[member].partitions()) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions()) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) @@ -625,7 +625,7 @@ def test_no_exceptions_when_only_subscribed_topic_is_deleted(mocker): } member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions()) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions()) cluster = create_cluster(mocker, topics={}, topics_partitions={}) sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -645,7 +645,7 @@ def test_conflicting_previous_assignments(mocker): member_metadata = {} for member, topics in six.iteritems(subscriptions): # assume both C1 and C2 have partition 1 assigned to them in generation 1 - member_metadata[member] = build_metadata(topics, [TopicPartition('t', 0), TopicPartition('t', 0)], 1) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, [TopicPartition('t', 0), TopicPartition('t', 0)], 1) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) @@ -676,7 +676,7 @@ def test_reassignment_with_random_subscriptions_and_changes(mocker, execution_nu member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, assignment[member].partitions()) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions()) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) @@ -687,9 +687,9 @@ def test_assignment_with_multiple_generations1(mocker): cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5}) member_metadata = { - 'C1': build_metadata({'t'}, []), - 'C2': build_metadata({'t'}, []), - 'C3': build_metadata({'t'}, []), + 'C1': StickyPartitionAssignor._metadata({'t'}, []), + 'C2': StickyPartitionAssignor._metadata({'t'}, []), + 'C3': StickyPartitionAssignor._metadata({'t'}, []), } assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -699,8 +699,8 @@ def test_assignment_with_multiple_generations1(mocker): assert len(assignment1['C3'].assignment[0][1]) == 2 member_metadata = { - 'C1': build_metadata({'t'}, assignment1['C1'].partitions()), - 'C2': build_metadata({'t'}, assignment1['C2'].partitions()), + 'C1': StickyPartitionAssignor._metadata({'t'}, assignment1['C1'].partitions()), + 'C2': StickyPartitionAssignor._metadata({'t'}, assignment1['C2'].partitions()), } assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -712,8 +712,8 @@ def test_assignment_with_multiple_generations1(mocker): assert StickyPartitionAssignor._latest_partition_movements.are_sticky() member_metadata = { - 'C2': build_metadata({'t'}, assignment2['C2'].partitions(), 2), - 'C3': build_metadata({'t'}, assignment1['C3'].partitions(), 1), + 'C2': StickyPartitionAssignor._metadata({'t'}, assignment2['C2'].partitions(), 2), + 'C3': StickyPartitionAssignor._metadata({'t'}, assignment1['C3'].partitions(), 1), } assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -727,9 +727,9 @@ def test_assignment_with_multiple_generations2(mocker): cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5}) member_metadata = { - 'C1': build_metadata({'t'}, []), - 'C2': build_metadata({'t'}, []), - 'C3': build_metadata({'t'}, []), + 'C1': StickyPartitionAssignor._metadata({'t'}, []), + 'C2': StickyPartitionAssignor._metadata({'t'}, []), + 'C3': StickyPartitionAssignor._metadata({'t'}, []), } assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -739,7 +739,7 @@ def test_assignment_with_multiple_generations2(mocker): assert len(assignment1['C3'].assignment[0][1]) == 2 member_metadata = { - 'C2': build_metadata({'t'}, assignment1['C2'].partitions(), 1), + 'C2': StickyPartitionAssignor._metadata({'t'}, assignment1['C2'].partitions(), 1), } assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -749,9 +749,9 @@ def test_assignment_with_multiple_generations2(mocker): assert StickyPartitionAssignor._latest_partition_movements.are_sticky() member_metadata = { - 'C1': build_metadata({'t'}, assignment1['C1'].partitions(), 1), - 'C2': build_metadata({'t'}, assignment2['C2'].partitions(), 2), - 'C3': build_metadata({'t'}, assignment1['C3'].partitions(), 1), + 'C1': StickyPartitionAssignor._metadata({'t'}, assignment1['C1'].partitions(), 1), + 'C2': StickyPartitionAssignor._metadata({'t'}, assignment2['C2'].partitions(), 2), + 'C3': StickyPartitionAssignor._metadata({'t'}, assignment1['C3'].partitions(), 1), } assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -778,7 +778,7 @@ def test_assignment_with_conflicting_previous_generations(mocker, execution_numb } member_metadata = {} for member in six.iterkeys(member_assignments): - member_metadata[member] = build_metadata({'t'}, member_assignments[member], member_generations[member]) + member_metadata[member] = StickyPartitionAssignor._metadata({'t'}, member_assignments[member], member_generations[member]) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment) @@ -788,19 +788,10 @@ def test_assignment_with_conflicting_previous_generations(mocker, execution_numb def make_member_metadata(subscriptions): member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, []) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, []) return member_metadata -def build_metadata(topics, member_assignment_partitions, generation=-1): - partitions_by_topic = defaultdict(list) - for topic_partition in member_assignment_partitions: - partitions_by_topic[topic_partition.topic].append(topic_partition.partition) - data = StickyAssignorUserDataV1(six.viewitems(partitions_by_topic), generation) - user_data = data.encode() - return ConsumerProtocolMemberMetadata(StickyPartitionAssignor.version, list(topics), user_data) - - def assert_assignment(result_assignment, expected_assignment): assert result_assignment == expected_assignment assert set(result_assignment) == set(expected_assignment) -- cgit v1.2.1