summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValeria Chernenko <aynroot@users.noreply.github.com>2020-11-06 06:00:38 +0100
committerGitHub <noreply@github.com>2020-11-05 21:00:38 -0800
commitb090b21f07a1c7b89afb5dc36114aa2d37c580f0 (patch)
treea25b4bcb1220cb237caaeab20acb461a757e75d6
parent83b7b2752fecdfef00bb03e7c2a3ac1fe8c2d0d3 (diff)
downloadkafka-python-b090b21f07a1c7b89afb5dc36114aa2d37c580f0.tar.gz
Cover sticky assignor's metadata method with tests (#2161)
-rw-r--r--kafka/coordinator/assignors/sticky/sticky_assignor.py10
-rw-r--r--test/test_assignors.py75
2 files changed, 40 insertions, 45 deletions
diff --git a/kafka/coordinator/assignors/sticky/sticky_assignor.py b/kafka/coordinator/assignors/sticky/sticky_assignor.py
index eb83c01..dce714f 100644
--- a/kafka/coordinator/assignors/sticky/sticky_assignor.py
+++ b/kafka/coordinator/assignors/sticky/sticky_assignor.py
@@ -648,15 +648,19 @@ class StickyPartitionAssignor(AbstractPartitionAssignor):
@classmethod
def metadata(cls, topics):
- if cls.member_assignment is None:
+ return cls._metadata(topics, cls.member_assignment, cls.generation)
+
+ @classmethod
+ def _metadata(cls, topics, member_assignment_partitions, generation=-1):
+ if member_assignment_partitions is None:
log.debug("No member assignment available")
user_data = b''
else:
log.debug("Member assignment is available, generating the metadata: generation {}".format(cls.generation))
partitions_by_topic = defaultdict(list)
- for topic_partition in cls.member_assignment: # pylint: disable=not-an-iterable
+ for topic_partition in member_assignment_partitions:
partitions_by_topic[topic_partition.topic].append(topic_partition.partition)
- data = StickyAssignorUserDataV1(six.viewitems(partitions_by_topic), cls.generation)
+ data = StickyAssignorUserDataV1(six.viewitems(partitions_by_topic), generation)
user_data = data.encode()
return ConsumerProtocolMemberMetadata(cls.version, list(topics), user_data)
diff --git a/test/test_assignors.py b/test/test_assignors.py
index 016ff8e..67e91e1 100644
--- a/test/test_assignors.py
+++ b/test/test_assignors.py
@@ -111,7 +111,7 @@ def test_sticky_assignor1(mocker):
del subscriptions['C1']
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions())
sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
expected_assignment = {
@@ -154,7 +154,7 @@ def test_sticky_assignor2(mocker):
}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, [])
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, [])
sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
expected_assignment = {
@@ -167,7 +167,7 @@ def test_sticky_assignor2(mocker):
del subscriptions['C0']
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions())
sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
expected_assignment = {
@@ -326,7 +326,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker):
}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(
+ member_metadata[member] = StickyPartitionAssignor._metadata(
topics, assignment[member].partitions() if member in assignment else []
)
@@ -338,7 +338,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker):
}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, assignment[member].partitions())
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
@@ -367,7 +367,7 @@ def test_sticky_add_remove_topic_two_consumers(mocker):
}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions())
sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
expected_assignment = {
@@ -382,7 +382,7 @@ def test_sticky_add_remove_topic_two_consumers(mocker):
}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions())
sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
expected_assignment = {
@@ -413,7 +413,7 @@ def test_sticky_reassignment_after_one_consumer_leaves(mocker):
del subscriptions['C10']
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, assignment[member].partitions())
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
@@ -435,7 +435,7 @@ def test_sticky_reassignment_after_one_consumer_added(mocker):
subscriptions['C10'] = {'t'}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(
+ member_metadata[member] = StickyPartitionAssignor._metadata(
topics, assignment[member].partitions() if member in assignment else []
)
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
@@ -462,7 +462,7 @@ def test_sticky_same_subscriptions(mocker):
del subscriptions['C5']
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, assignment[member].partitions())
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
@@ -488,7 +488,7 @@ def test_sticky_large_assignment_with_multiple_consumers_leaving(mocker):
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, assignment[member].partitions())
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())
for i in range(50):
member = 'C{}'.format(randint(1, n_consumers))
@@ -517,7 +517,7 @@ def test_new_subscription(mocker):
subscriptions['C0'].add('t1')
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, [])
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, [])
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
@@ -540,7 +540,7 @@ def test_move_existing_assignments(mocker):
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, member_assignments[member])
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, member_assignments[member])
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
@@ -570,7 +570,7 @@ def test_stickiness(mocker):
del subscriptions['C1']
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, assignment[member].partitions())
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
@@ -625,7 +625,7 @@ def test_no_exceptions_when_only_subscribed_topic_is_deleted(mocker):
}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions())
cluster = create_cluster(mocker, topics={}, topics_partitions={})
sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
@@ -645,7 +645,7 @@ def test_conflicting_previous_assignments(mocker):
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
# assume both C1 and C2 have partition 1 assigned to them in generation 1
- member_metadata[member] = build_metadata(topics, [TopicPartition('t', 0), TopicPartition('t', 0)], 1)
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, [TopicPartition('t', 0), TopicPartition('t', 0)], 1)
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
@@ -676,7 +676,7 @@ def test_reassignment_with_random_subscriptions_and_changes(mocker, execution_nu
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, assignment[member].partitions())
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
@@ -687,9 +687,9 @@ def test_assignment_with_multiple_generations1(mocker):
cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5})
member_metadata = {
- 'C1': build_metadata({'t'}, []),
- 'C2': build_metadata({'t'}, []),
- 'C3': build_metadata({'t'}, []),
+ 'C1': StickyPartitionAssignor._metadata({'t'}, []),
+ 'C2': StickyPartitionAssignor._metadata({'t'}, []),
+ 'C3': StickyPartitionAssignor._metadata({'t'}, []),
}
assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata)
@@ -699,8 +699,8 @@ def test_assignment_with_multiple_generations1(mocker):
assert len(assignment1['C3'].assignment[0][1]) == 2
member_metadata = {
- 'C1': build_metadata({'t'}, assignment1['C1'].partitions()),
- 'C2': build_metadata({'t'}, assignment1['C2'].partitions()),
+ 'C1': StickyPartitionAssignor._metadata({'t'}, assignment1['C1'].partitions()),
+ 'C2': StickyPartitionAssignor._metadata({'t'}, assignment1['C2'].partitions()),
}
assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata)
@@ -712,8 +712,8 @@ def test_assignment_with_multiple_generations1(mocker):
assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
member_metadata = {
- 'C2': build_metadata({'t'}, assignment2['C2'].partitions(), 2),
- 'C3': build_metadata({'t'}, assignment1['C3'].partitions(), 1),
+ 'C2': StickyPartitionAssignor._metadata({'t'}, assignment2['C2'].partitions(), 2),
+ 'C3': StickyPartitionAssignor._metadata({'t'}, assignment1['C3'].partitions(), 1),
}
assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata)
@@ -727,9 +727,9 @@ def test_assignment_with_multiple_generations2(mocker):
cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5})
member_metadata = {
- 'C1': build_metadata({'t'}, []),
- 'C2': build_metadata({'t'}, []),
- 'C3': build_metadata({'t'}, []),
+ 'C1': StickyPartitionAssignor._metadata({'t'}, []),
+ 'C2': StickyPartitionAssignor._metadata({'t'}, []),
+ 'C3': StickyPartitionAssignor._metadata({'t'}, []),
}
assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata)
@@ -739,7 +739,7 @@ def test_assignment_with_multiple_generations2(mocker):
assert len(assignment1['C3'].assignment[0][1]) == 2
member_metadata = {
- 'C2': build_metadata({'t'}, assignment1['C2'].partitions(), 1),
+ 'C2': StickyPartitionAssignor._metadata({'t'}, assignment1['C2'].partitions(), 1),
}
assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata)
@@ -749,9 +749,9 @@ def test_assignment_with_multiple_generations2(mocker):
assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
member_metadata = {
- 'C1': build_metadata({'t'}, assignment1['C1'].partitions(), 1),
- 'C2': build_metadata({'t'}, assignment2['C2'].partitions(), 2),
- 'C3': build_metadata({'t'}, assignment1['C3'].partitions(), 1),
+ 'C1': StickyPartitionAssignor._metadata({'t'}, assignment1['C1'].partitions(), 1),
+ 'C2': StickyPartitionAssignor._metadata({'t'}, assignment2['C2'].partitions(), 2),
+ 'C3': StickyPartitionAssignor._metadata({'t'}, assignment1['C3'].partitions(), 1),
}
assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata)
@@ -778,7 +778,7 @@ def test_assignment_with_conflicting_previous_generations(mocker, execution_numb
}
member_metadata = {}
for member in six.iterkeys(member_assignments):
- member_metadata[member] = build_metadata({'t'}, member_assignments[member], member_generations[member])
+ member_metadata[member] = StickyPartitionAssignor._metadata({'t'}, member_assignments[member], member_generations[member])
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment)
@@ -788,19 +788,10 @@ def test_assignment_with_conflicting_previous_generations(mocker, execution_numb
def make_member_metadata(subscriptions):
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, [])
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, [])
return member_metadata
-def build_metadata(topics, member_assignment_partitions, generation=-1):
- partitions_by_topic = defaultdict(list)
- for topic_partition in member_assignment_partitions:
- partitions_by_topic[topic_partition.topic].append(topic_partition.partition)
- data = StickyAssignorUserDataV1(six.viewitems(partitions_by_topic), generation)
- user_data = data.encode()
- return ConsumerProtocolMemberMetadata(StickyPartitionAssignor.version, list(topics), user_data)
-
-
def assert_assignment(result_assignment, expected_assignment):
assert result_assignment == expected_assignment
assert set(result_assignment) == set(expected_assignment)