diff options
author | Valeria Chernenko <aynroot@users.noreply.github.com> | 2020-09-30 06:03:54 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-29 21:03:54 -0700 |
commit | c536dd28bc3c2db85d9b62a1e73d23a3eeaebd93 (patch) | |
tree | 40b412379666620a8a5173932652e94bdc9439b2 /test/test_coordinator.py | |
parent | cb96a1a6c79c17ac9b3399b7a33bbaea7ad8886f (diff) | |
download | kafka-python-c536dd28bc3c2db85d9b62a1e73d23a3eeaebd93.tar.gz |
KIP-54: Implement sticky partition assignment strategy (#2057)
Diffstat (limited to 'test/test_coordinator.py')
-rw-r--r-- | test/test_coordinator.py | 36 |
1 files changed, 27 insertions, 9 deletions
diff --git a/test/test_coordinator.py b/test/test_coordinator.py index ea8f84b..a35cdd1 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -9,6 +9,7 @@ from kafka.consumer.subscription_state import ( SubscriptionState, ConsumerRebalanceListener) from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor +from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor from kafka.coordinator.base import Generation, MemberState, HeartbeatThread from kafka.coordinator.consumer import ConsumerCoordinator from kafka.coordinator.protocol import ( @@ -77,6 +78,10 @@ def test_group_protocols(coordinator): RoundRobinPartitionAssignor.version, ['foobar'], b'')), + ('sticky', ConsumerProtocolMemberMetadata( + StickyPartitionAssignor.version, + ['foobar'], + b'')), ] @@ -95,7 +100,7 @@ def test_pattern_subscription(coordinator, api_version): [(0, 'fizz', []), (0, 'foo1', [(0, 0, 0, [], [])]), (0, 'foo2', [(0, 0, 1, [], [])])])) - assert coordinator._subscription.subscription == set(['foo1', 'foo2']) + assert coordinator._subscription.subscription == {'foo1', 'foo2'} # 0.9 consumers should trigger dynamic partition assignment if api_version >= (0, 9): @@ -103,14 +108,14 @@ def test_pattern_subscription(coordinator, api_version): # earlier consumers get all partitions assigned locally else: - assert set(coordinator._subscription.assignment.keys()) == set([ - TopicPartition('foo1', 0), - TopicPartition('foo2', 0)]) + assert set(coordinator._subscription.assignment.keys()) == {TopicPartition('foo1', 0), + TopicPartition('foo2', 0)} def test_lookup_assignor(coordinator): assert coordinator._lookup_assignor('roundrobin') is RoundRobinPartitionAssignor assert coordinator._lookup_assignor('range') is RangePartitionAssignor + assert coordinator._lookup_assignor('sticky') is StickyPartitionAssignor assert coordinator._lookup_assignor('foobar') is None @@ -121,10 +126,25 @@ def test_join_complete(mocker, coordinator): mocker.spy(assignor, 'on_assignment') assert assignor.on_assignment.call_count == 0 assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'') - coordinator._on_join_complete( - 0, 'member-foo', 'roundrobin', assignment.encode()) + coordinator._on_join_complete(0, 'member-foo', 'roundrobin', assignment.encode()) + assert assignor.on_assignment.call_count == 1 + assignor.on_assignment.assert_called_with(assignment) + + +def test_join_complete_with_sticky_assignor(mocker, coordinator): + coordinator._subscription.subscribe(topics=['foobar']) + assignor = StickyPartitionAssignor() + coordinator.config['assignors'] = (assignor,) + mocker.spy(assignor, 'on_assignment') + mocker.spy(assignor, 'on_generation_assignment') + assert assignor.on_assignment.call_count == 0 + assert assignor.on_generation_assignment.call_count == 0 + assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'') + coordinator._on_join_complete(0, 'member-foo', 'sticky', assignment.encode()) assert assignor.on_assignment.call_count == 1 + assert assignor.on_generation_assignment.call_count == 1 assignor.on_assignment.assert_called_with(assignment) + assignor.on_generation_assignment.assert_called_with(0) def test_subscription_listener(mocker, coordinator): @@ -141,9 +161,7 @@ def test_subscription_listener(mocker, coordinator): coordinator._on_join_complete( 0, 'member-foo', 'roundrobin', assignment.encode()) assert listener.on_partitions_assigned.call_count == 1 - listener.on_partitions_assigned.assert_called_with(set([ - TopicPartition('foobar', 0), - TopicPartition('foobar', 1)])) + listener.on_partitions_assigned.assert_called_with({TopicPartition('foobar', 0), TopicPartition('foobar', 1)}) def test_subscription_listener_failure(mocker, coordinator): |