summaryrefslogtreecommitdiff
path: root/test/test_coordinator.py
diff options
context:
space:
mode:
authorValeria Chernenko <aynroot@users.noreply.github.com>2020-09-30 06:03:54 +0200
committerGitHub <noreply@github.com>2020-09-29 21:03:54 -0700
commitc536dd28bc3c2db85d9b62a1e73d23a3eeaebd93 (patch)
tree40b412379666620a8a5173932652e94bdc9439b2 /test/test_coordinator.py
parentcb96a1a6c79c17ac9b3399b7a33bbaea7ad8886f (diff)
downloadkafka-python-c536dd28bc3c2db85d9b62a1e73d23a3eeaebd93.tar.gz
KIP-54: Implement sticky partition assignment strategy (#2057)
Diffstat (limited to 'test/test_coordinator.py')
-rw-r--r--test/test_coordinator.py36
1 files changed, 27 insertions, 9 deletions
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index ea8f84b..a35cdd1 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -9,6 +9,7 @@ from kafka.consumer.subscription_state import (
SubscriptionState, ConsumerRebalanceListener)
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
+from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor
from kafka.coordinator.base import Generation, MemberState, HeartbeatThread
from kafka.coordinator.consumer import ConsumerCoordinator
from kafka.coordinator.protocol import (
@@ -77,6 +78,10 @@ def test_group_protocols(coordinator):
RoundRobinPartitionAssignor.version,
['foobar'],
b'')),
+ ('sticky', ConsumerProtocolMemberMetadata(
+ StickyPartitionAssignor.version,
+ ['foobar'],
+ b'')),
]
@@ -95,7 +100,7 @@ def test_pattern_subscription(coordinator, api_version):
[(0, 'fizz', []),
(0, 'foo1', [(0, 0, 0, [], [])]),
(0, 'foo2', [(0, 0, 1, [], [])])]))
- assert coordinator._subscription.subscription == set(['foo1', 'foo2'])
+ assert coordinator._subscription.subscription == {'foo1', 'foo2'}
# 0.9 consumers should trigger dynamic partition assignment
if api_version >= (0, 9):
@@ -103,14 +108,14 @@ def test_pattern_subscription(coordinator, api_version):
# earlier consumers get all partitions assigned locally
else:
- assert set(coordinator._subscription.assignment.keys()) == set([
- TopicPartition('foo1', 0),
- TopicPartition('foo2', 0)])
+ assert set(coordinator._subscription.assignment.keys()) == {TopicPartition('foo1', 0),
+ TopicPartition('foo2', 0)}
def test_lookup_assignor(coordinator):
assert coordinator._lookup_assignor('roundrobin') is RoundRobinPartitionAssignor
assert coordinator._lookup_assignor('range') is RangePartitionAssignor
+ assert coordinator._lookup_assignor('sticky') is StickyPartitionAssignor
assert coordinator._lookup_assignor('foobar') is None
@@ -121,10 +126,25 @@ def test_join_complete(mocker, coordinator):
mocker.spy(assignor, 'on_assignment')
assert assignor.on_assignment.call_count == 0
assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'')
- coordinator._on_join_complete(
- 0, 'member-foo', 'roundrobin', assignment.encode())
+ coordinator._on_join_complete(0, 'member-foo', 'roundrobin', assignment.encode())
+ assert assignor.on_assignment.call_count == 1
+ assignor.on_assignment.assert_called_with(assignment)
+
+
+def test_join_complete_with_sticky_assignor(mocker, coordinator):
+ coordinator._subscription.subscribe(topics=['foobar'])
+ assignor = StickyPartitionAssignor()
+ coordinator.config['assignors'] = (assignor,)
+ mocker.spy(assignor, 'on_assignment')
+ mocker.spy(assignor, 'on_generation_assignment')
+ assert assignor.on_assignment.call_count == 0
+ assert assignor.on_generation_assignment.call_count == 0
+ assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'')
+ coordinator._on_join_complete(0, 'member-foo', 'sticky', assignment.encode())
assert assignor.on_assignment.call_count == 1
+ assert assignor.on_generation_assignment.call_count == 1
assignor.on_assignment.assert_called_with(assignment)
+ assignor.on_generation_assignment.assert_called_with(0)
def test_subscription_listener(mocker, coordinator):
@@ -141,9 +161,7 @@ def test_subscription_listener(mocker, coordinator):
coordinator._on_join_complete(
0, 'member-foo', 'roundrobin', assignment.encode())
assert listener.on_partitions_assigned.call_count == 1
- listener.on_partitions_assigned.assert_called_with(set([
- TopicPartition('foobar', 0),
- TopicPartition('foobar', 1)]))
+ listener.on_partitions_assigned.assert_called_with({TopicPartition('foobar', 0), TopicPartition('foobar', 1)})
def test_subscription_listener_failure(mocker, coordinator):