diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-10 03:15:47 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-10 11:10:23 -0800 |
commit | 240f7029def4027bfccde7b8627c978ab1fdd5a6 (patch) | |
tree | c1a177029082b792a76d4893734ce5830992dbe3 /test/test_coordinator.py | |
parent | bbd6444e85a3062224a977f1033da3f393110b87 (diff) | |
download | kafka-python-240f7029def4027bfccde7b8627c978ab1fdd5a6.tar.gz |
Add ConsumerCoordinator unit tests
Diffstat (limited to 'test/test_coordinator.py')
-rw-r--r-- | test/test_coordinator.py | 568 |
1 files changed, 568 insertions, 0 deletions
diff --git a/test/test_coordinator.py b/test/test_coordinator.py new file mode 100644 index 0000000..f7c5772 --- /dev/null +++ b/test/test_coordinator.py @@ -0,0 +1,568 @@ +# pylint: skip-file +from __future__ import absolute_import + +import pytest + +from kafka.client_async import KafkaClient +from kafka.common import TopicPartition, OffsetAndMetadata +from kafka.consumer.subscription_state import ( + SubscriptionState, ConsumerRebalanceListener) +from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor +from kafka.coordinator.consumer import ConsumerCoordinator +from kafka.coordinator.protocol import ( + ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment) +from kafka.conn import ConnectionStates +from kafka.future import Future +from kafka.protocol.commit import ( + OffsetCommitRequest_v0, OffsetCommitRequest_v1, OffsetCommitRequest_v2, + OffsetCommitResponse, OffsetFetchRequest_v0, OffsetFetchRequest_v1, + OffsetFetchResponse) +from kafka.protocol.metadata import MetadataResponse + +import kafka.common as Errors + + +@pytest.fixture +def conn(mocker): + conn = mocker.patch('kafka.client_async.BrokerConnection') + conn.return_value = conn + conn.state = ConnectionStates.CONNECTED + conn.send.return_value = Future().success( + MetadataResponse( + [(0, 'foo', 12), (1, 'bar', 34)], # brokers + [])) # topics + return conn + + +@pytest.fixture +def coordinator(conn): + return ConsumerCoordinator(KafkaClient(), SubscriptionState()) + + +def test_init(conn): + cli = KafkaClient() + coordinator = ConsumerCoordinator(cli, SubscriptionState()) + + # metadata update on init + assert cli.cluster._need_update is True + assert coordinator._handle_metadata_update in cli.cluster._listeners + + +@pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)]) +def test_autocommit_enable_api_version(conn, api_version): + coordinator = ConsumerCoordinator( + KafkaClient(), SubscriptionState(), api_version=api_version) + if api_version < (0, 8, 1): + assert coordinator._auto_commit_task is None + else: + assert coordinator._auto_commit_task is not None + + +def test_protocol_type(coordinator): + assert coordinator.protocol_type() is 'consumer' + + +def test_group_protocols(coordinator): + # Requires a subscription + try: + coordinator.group_protocols() + except AssertionError: + pass + else: + assert False, 'Exception not raised when expected' + + coordinator._subscription.subscribe(topics=['foobar']) + assert coordinator.group_protocols() == [( + 'roundrobin', + ConsumerProtocolMemberMetadata( + RoundRobinPartitionAssignor.version, + ['foobar'], + b'') + )] + + +@pytest.mark.parametrize('api_version', [(0, 8), (0, 8, 1), (0, 8, 2), (0, 9)]) +def test_pattern_subscription(coordinator, api_version): + coordinator.config['api_version'] = api_version + coordinator._subscription.subscribe(pattern='foo') + assert coordinator._subscription.subscription == set([]) + assert coordinator._subscription_metadata_changed() is False + assert coordinator._subscription.needs_partition_assignment is False + + cluster = coordinator._client.cluster + cluster.update_metadata(MetadataResponse( + # brokers + [(0, 'foo', 12), (1, 'bar', 34)], + # topics + [(0, 'fizz', []), + (0, 'foo1', [(0, 0, 0, [], [])]), + (0, 'foo2', [(0, 0, 1, [], [])])])) + assert coordinator._subscription.subscription == set(['foo1', 'foo2']) + + # 0.9 consumers should trigger dynamic partition assignment + if api_version >= (0, 9): + assert coordinator._subscription.needs_partition_assignment is True + assert coordinator._subscription.assignment == {} + + # earlier consumers get all partitions assigned locally + else: + assert coordinator._subscription.needs_partition_assignment is False + assert set(coordinator._subscription.assignment.keys()) == set([ + TopicPartition('foo1', 0), + TopicPartition('foo2', 0)]) + + +def test_lookup_assignor(coordinator): + assignor = coordinator._lookup_assignor('roundrobin') + assert assignor is RoundRobinPartitionAssignor + assert coordinator._lookup_assignor('foobar') is None + + +def test_join_complete(mocker, coordinator): + coordinator._subscription.subscribe(topics=['foobar']) + assignor = RoundRobinPartitionAssignor() + coordinator.config['assignors'] = (assignor,) + mocker.spy(assignor, 'on_assignment') + assert assignor.on_assignment.call_count == 0 + assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'') + coordinator._on_join_complete( + 0, 'member-foo', 'roundrobin', assignment.encode()) + assert assignor.on_assignment.call_count == 1 + assignor.on_assignment.assert_called_with(assignment) + + +def test_subscription_listener(mocker, coordinator): + listener = mocker.MagicMock(spec=ConsumerRebalanceListener) + coordinator._subscription.subscribe( + topics=['foobar'], + listener=listener) + + coordinator._on_join_prepare(0, 'member-foo') + assert listener.on_partitions_revoked.call_count == 1 + listener.on_partitions_revoked.assert_called_with(set([])) + + assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'') + coordinator._on_join_complete( + 0, 'member-foo', 'roundrobin', assignment.encode()) + assert listener.on_partitions_assigned.call_count == 1 + listener.on_partitions_assigned.assert_called_with(set([ + TopicPartition('foobar', 0), + TopicPartition('foobar', 1)])) + + +def test_subscription_listener_failure(mocker, coordinator): + listener = mocker.MagicMock(spec=ConsumerRebalanceListener) + coordinator._subscription.subscribe( + topics=['foobar'], + listener=listener) + + # exception raised in listener should not be re-raised by coordinator + listener.on_partitions_revoked.side_effect = Exception('crash') + coordinator._on_join_prepare(0, 'member-foo') + assert listener.on_partitions_revoked.call_count == 1 + + assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'') + coordinator._on_join_complete( + 0, 'member-foo', 'roundrobin', assignment.encode()) + assert listener.on_partitions_assigned.call_count == 1 + + +def test_perform_assignment(mocker, coordinator): + member_metadata = { + 'member-foo': ConsumerProtocolMemberMetadata(0, ['foo1'], b''), + 'member-bar': ConsumerProtocolMemberMetadata(0, ['foo1'], b'') + } + assignments = { + 'member-foo': ConsumerProtocolMemberAssignment( + 0, [('foo1', [0])], b''), + 'member-bar': ConsumerProtocolMemberAssignment( + 0, [('foo1', [1])], b'') + } + + mocker.patch.object(RoundRobinPartitionAssignor, 'assign') + RoundRobinPartitionAssignor.assign.return_value = assignments + + ret = coordinator._perform_assignment( + 'member-foo', 'roundrobin', + [(member, metadata.encode()) + for member, metadata in member_metadata.items()]) + + assert RoundRobinPartitionAssignor.assign.call_count == 1 + RoundRobinPartitionAssignor.assign.assert_called_with( + coordinator._client.cluster, member_metadata) + assert ret == assignments + + +def test_on_join_prepare(coordinator): + coordinator._subscription.subscribe(topics=['foobar']) + coordinator._on_join_prepare(0, 'member-foo') + assert coordinator._subscription.needs_partition_assignment is True + + +def test_need_rejoin(coordinator): + # No subscription - no rejoin + assert coordinator.need_rejoin() is False + + coordinator._subscription.subscribe(topics=['foobar']) + assert coordinator.need_rejoin() is True + + coordinator._subscription.needs_partition_assignment = False + coordinator.rejoin_needed = False + assert coordinator.need_rejoin() is False + + coordinator._subscription.needs_partition_assignment = True + assert coordinator.need_rejoin() is True + + +def test_refresh_committed_offsets_if_needed(mocker, coordinator): + mocker.patch.object(ConsumerCoordinator, 'fetch_committed_offsets', + return_value = { + TopicPartition('foobar', 0): OffsetAndMetadata(123, b''), + TopicPartition('foobar', 1): OffsetAndMetadata(234, b'')}) + coordinator._subscription.assign_from_user([TopicPartition('foobar', 0)]) + assert coordinator._subscription.needs_fetch_committed_offsets is True + coordinator.refresh_committed_offsets_if_needed() + assignment = coordinator._subscription.assignment + assert assignment[TopicPartition('foobar', 0)].committed == 123 + assert TopicPartition('foobar', 1) not in assignment + assert coordinator._subscription.needs_fetch_committed_offsets is False + + +def test_fetch_committed_offsets(mocker, coordinator): + + # No partitions, no IO polling + mocker.patch.object(coordinator._client, 'poll') + assert coordinator.fetch_committed_offsets([]) == {} + assert coordinator._client.poll.call_count == 0 + + # general case -- send offset fetch request, get successful future + mocker.patch.object(coordinator, 'ensure_coordinator_known') + mocker.patch.object(coordinator, '_send_offset_fetch_request', + return_value=Future().success('foobar')) + partitions = [TopicPartition('foobar', 0)] + ret = coordinator.fetch_committed_offsets(partitions) + assert ret == 'foobar' + coordinator._send_offset_fetch_request.assert_called_with(partitions) + assert coordinator._client.poll.call_count == 1 + + # Failed future is raised if not retriable + coordinator._send_offset_fetch_request.return_value = Future().failure(AssertionError) + coordinator._client.poll.reset_mock() + try: + coordinator.fetch_committed_offsets(partitions) + except AssertionError: + pass + else: + assert False, 'Exception not raised when expected' + assert coordinator._client.poll.call_count == 1 + + coordinator._client.poll.reset_mock() + coordinator._send_offset_fetch_request.side_effect = [ + Future().failure(Errors.RequestTimedOutError), + Future().success('fizzbuzz')] + + ret = coordinator.fetch_committed_offsets(partitions) + assert ret == 'fizzbuzz' + assert coordinator._client.poll.call_count == 2 # call + retry + + +def test_close(mocker, coordinator): + mocker.patch.object(coordinator, '_maybe_auto_commit_offsets_sync') + mocker.patch.object(coordinator, '_handle_leave_group_response') + coordinator.coordinator_id = 0 + coordinator.generation = 1 + cli = coordinator._client + mocker.patch.object(cli, 'unschedule') + mocker.patch.object(cli, 'send', return_value=Future().success('foobar')) + mocker.patch.object(cli, 'poll') + + coordinator.close() + assert coordinator._maybe_auto_commit_offsets_sync.call_count == 1 + cli.unschedule.assert_called_with(coordinator.heartbeat_task) + coordinator._handle_leave_group_response.assert_called_with('foobar') + + assert coordinator.generation == -1 + assert coordinator.member_id == '' + assert coordinator.rejoin_needed is True + + +@pytest.fixture +def offsets(): + return { + TopicPartition('foobar', 0): OffsetAndMetadata(123, b''), + TopicPartition('foobar', 1): OffsetAndMetadata(234, b''), + } + + +def test_commit_offsets_async(mocker, coordinator, offsets): + mocker.patch.object(coordinator._client, 'poll') + mocker.patch.object(coordinator, 'ensure_coordinator_known') + mocker.patch.object(coordinator, '_send_offset_commit_request', + return_value=Future().success('fizzbuzz')) + ret = coordinator.commit_offsets_async(offsets) + assert isinstance(ret, Future) + assert coordinator._send_offset_commit_request.call_count == 1 + + +def test_commit_offsets_sync(mocker, coordinator, offsets): + mocker.patch.object(coordinator, 'ensure_coordinator_known') + mocker.patch.object(coordinator, '_send_offset_commit_request', + return_value=Future().success('fizzbuzz')) + cli = coordinator._client + mocker.patch.object(cli, 'poll') + + # No offsets, no calls + assert coordinator.commit_offsets_sync({}) is None + assert coordinator._send_offset_commit_request.call_count == 0 + assert cli.poll.call_count == 0 + + ret = coordinator.commit_offsets_sync(offsets) + assert coordinator._send_offset_commit_request.call_count == 1 + assert cli.poll.call_count == 1 + assert ret == 'fizzbuzz' + + # Failed future is raised if not retriable + coordinator._send_offset_commit_request.return_value = Future().failure(AssertionError) + coordinator._client.poll.reset_mock() + try: + coordinator.commit_offsets_sync(offsets) + except AssertionError: + pass + else: + assert False, 'Exception not raised when expected' + assert coordinator._client.poll.call_count == 1 + + coordinator._client.poll.reset_mock() + coordinator._send_offset_commit_request.side_effect = [ + Future().failure(Errors.RequestTimedOutError), + Future().success('fizzbuzz')] + + ret = coordinator.commit_offsets_sync(offsets) + assert ret == 'fizzbuzz' + assert coordinator._client.poll.call_count == 2 # call + retry + + +@pytest.mark.parametrize( + 'api_version,enable,error,task_disable,commit_offsets,warn,exc', [ + ((0, 8), True, None, False, False, False, False), + ((0, 9), False, None, False, False, False, False), + ((0, 9), True, Errors.UnknownMemberIdError(), True, True, True, False), + ((0, 9), True, Errors.IllegalGenerationError(), True, True, True, False), + ((0, 9), True, Errors.RebalanceInProgressError(), True, True, True, False), + ((0, 9), True, Exception(), True, True, False, True), + ((0, 9), True, None, True, True, False, False), + ]) +def test_maybe_auto_commit_offsets_sync(mocker, coordinator, + api_version, enable, error, task_disable, + commit_offsets, warn, exc): + auto_commit_task = mocker.patch.object(coordinator, '_auto_commit_task') + commit_sync = mocker.patch.object(coordinator, 'commit_offsets_sync', + side_effect=error) + mock_warn = mocker.patch('kafka.coordinator.consumer.log.warning') + mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception') + + coordinator.config['api_version'] = api_version + coordinator.config['enable_auto_commit'] = enable + assert coordinator._maybe_auto_commit_offsets_sync() is None + assert auto_commit_task.disable.call_count == (1 if task_disable else 0) + assert commit_sync.call_count == (1 if commit_offsets else 0) + assert mock_warn.call_count == (1 if warn else 0) + assert mock_exc.call_count == (1 if exc else 0) + + +@pytest.fixture +def patched_coord(mocker, coordinator): + coordinator._subscription.subscribe(topics=['foobar']) + coordinator._subscription.needs_partition_assignment = False + mocker.patch.object(coordinator, 'coordinator_unknown') + coordinator.coordinator_unknown.return_value = False + coordinator.coordinator_id = 0 + mocker.patch.object(coordinator._client, 'least_loaded_node', + return_value=1) + mocker.patch.object(coordinator._client, 'send') + mocker.spy(coordinator, '_failed_request') + mocker.spy(coordinator, '_handle_offset_commit_response') + mocker.spy(coordinator, '_handle_offset_fetch_response') + return coordinator + + +def test_send_offset_commit_request_fail(patched_coord, offsets): + patched_coord.coordinator_unknown.return_value = True + patched_coord.coordinator_id = None + + # No offsets + ret = patched_coord._send_offset_commit_request({}) + assert isinstance(ret, Future) + assert ret.succeeded() + + # No coordinator + ret = patched_coord._send_offset_commit_request(offsets) + assert ret.failed() + assert isinstance(ret.exception, Errors.GroupCoordinatorNotAvailableError) + + +@pytest.mark.parametrize('api_version,req_type', [ + ((0, 8, 1), OffsetCommitRequest_v0), + ((0, 8, 2), OffsetCommitRequest_v1), + ((0, 9), OffsetCommitRequest_v2)]) +def test_send_offset_commit_request_versions(patched_coord, offsets, + api_version, req_type): + # assuming fixture sets coordinator=0, least_loaded_node=1 + expect_node = 0 if api_version >= (0, 8, 2) else 1 + patched_coord.config['api_version'] = api_version + + patched_coord._send_offset_commit_request(offsets) + (node, request), _ = patched_coord._client.send.call_args + assert node == expect_node, 'Unexpected coordinator node' + assert isinstance(request, req_type) + + +def test_send_offset_commit_request_failure(patched_coord, offsets): + _f = Future() + patched_coord._client.send.return_value = _f + future = patched_coord._send_offset_commit_request(offsets) + (node, request), _ = patched_coord._client.send.call_args + error = Exception() + _f.failure(error) + patched_coord._failed_request.assert_called_with(0, request, future, error) + assert future.failed() + assert future.exception is error + + +def test_send_offset_commit_request_success(patched_coord, offsets): + _f = Future() + patched_coord._client.send.return_value = _f + future = patched_coord._send_offset_commit_request(offsets) + (node, request), _ = patched_coord._client.send.call_args + response = OffsetCommitResponse([('foobar', [(0, 0), (1, 0)])]) + _f.success(response) + patched_coord._handle_offset_commit_response.assert_called_with( + offsets, future, response) + + +@pytest.mark.parametrize('response,error,dead,reassign', [ + (OffsetCommitResponse([('foobar', [(0, 30), (1, 30)])]), + Errors.GroupAuthorizationFailedError, False, False), + (OffsetCommitResponse([('foobar', [(0, 12), (1, 12)])]), + Errors.OffsetMetadataTooLargeError, False, False), + (OffsetCommitResponse([('foobar', [(0, 28), (1, 28)])]), + Errors.InvalidCommitOffsetSizeError, False, False), + (OffsetCommitResponse([('foobar', [(0, 14), (1, 14)])]), + Errors.GroupLoadInProgressError, False, False), + (OffsetCommitResponse([('foobar', [(0, 15), (1, 15)])]), + Errors.GroupCoordinatorNotAvailableError, True, False), + (OffsetCommitResponse([('foobar', [(0, 16), (1, 16)])]), + Errors.NotCoordinatorForGroupError, True, False), + (OffsetCommitResponse([('foobar', [(0, 7), (1, 7)])]), + Errors.RequestTimedOutError, True, False), + (OffsetCommitResponse([('foobar', [(0, 25), (1, 25)])]), + Errors.UnknownMemberIdError, False, True), + (OffsetCommitResponse([('foobar', [(0, 22), (1, 22)])]), + Errors.IllegalGenerationError, False, True), + (OffsetCommitResponse([('foobar', [(0, 27), (1, 27)])]), + Errors.RebalanceInProgressError, False, True), + (OffsetCommitResponse([('foobar', [(0, 17), (1, 17)])]), + Errors.InvalidTopicError, False, False), + (OffsetCommitResponse([('foobar', [(0, 29), (1, 29)])]), + Errors.TopicAuthorizationFailedError, False, False), +]) +def test_handle_offset_commit_response(patched_coord, offsets, + response, error, dead, reassign): + future = Future() + patched_coord._handle_offset_commit_response(offsets, future, response) + assert isinstance(future.exception, error) + assert patched_coord.coordinator_id is (None if dead else 0) + assert patched_coord._subscription.needs_partition_assignment is reassign + + +@pytest.fixture +def partitions(): + return [TopicPartition('foobar', 0), TopicPartition('foobar', 1)] + + +def test_send_offset_fetch_request_fail(patched_coord, partitions): + patched_coord.coordinator_unknown.return_value = True + patched_coord.coordinator_id = None + + # No partitions + ret = patched_coord._send_offset_fetch_request([]) + assert isinstance(ret, Future) + assert ret.succeeded() + assert ret.value == {} + + # No coordinator + ret = patched_coord._send_offset_fetch_request(partitions) + assert ret.failed() + assert isinstance(ret.exception, Errors.GroupCoordinatorNotAvailableError) + + +@pytest.mark.parametrize('api_version,req_type', [ + ((0, 8, 1), OffsetFetchRequest_v0), + ((0, 8, 2), OffsetFetchRequest_v1), + ((0, 9), OffsetFetchRequest_v1)]) +def test_send_offset_fetch_request_versions(patched_coord, partitions, + api_version, req_type): + # assuming fixture sets coordinator=0, least_loaded_node=1 + expect_node = 0 if api_version >= (0, 8, 2) else 1 + patched_coord.config['api_version'] = api_version + + patched_coord._send_offset_fetch_request(partitions) + (node, request), _ = patched_coord._client.send.call_args + assert node == expect_node, 'Unexpected coordinator node' + assert isinstance(request, req_type) + + +def test_send_offset_fetch_request_failure(patched_coord, partitions): + _f = Future() + patched_coord._client.send.return_value = _f + future = patched_coord._send_offset_fetch_request(partitions) + (node, request), _ = patched_coord._client.send.call_args + error = Exception() + _f.failure(error) + patched_coord._failed_request.assert_called_with(0, request, future, error) + assert future.failed() + assert future.exception is error + + +def test_send_offset_fetch_request_success(patched_coord, partitions): + _f = Future() + patched_coord._client.send.return_value = _f + future = patched_coord._send_offset_fetch_request(partitions) + (node, request), _ = patched_coord._client.send.call_args + response = OffsetFetchResponse([('foobar', [(0, 0), (1, 0)])]) + _f.success(response) + patched_coord._handle_offset_fetch_response.assert_called_with( + future, response) + + +@pytest.mark.parametrize('response,error,dead,reassign', [ + #(OffsetFetchResponse([('foobar', [(0, 123, b'', 30), (1, 234, b'', 30)])]), + # Errors.GroupAuthorizationFailedError, False, False), + #(OffsetFetchResponse([('foobar', [(0, 123, b'', 7), (1, 234, b'', 7)])]), + # Errors.RequestTimedOutError, True, False), + #(OffsetFetchResponse([('foobar', [(0, 123, b'', 27), (1, 234, b'', 27)])]), + # Errors.RebalanceInProgressError, False, True), + (OffsetFetchResponse([('foobar', [(0, 123, b'', 14), (1, 234, b'', 14)])]), + Errors.GroupLoadInProgressError, False, False), + (OffsetFetchResponse([('foobar', [(0, 123, b'', 16), (1, 234, b'', 16)])]), + Errors.NotCoordinatorForGroupError, True, False), + (OffsetFetchResponse([('foobar', [(0, 123, b'', 25), (1, 234, b'', 25)])]), + Errors.UnknownMemberIdError, False, True), + (OffsetFetchResponse([('foobar', [(0, 123, b'', 22), (1, 234, b'', 22)])]), + Errors.IllegalGenerationError, False, True), + (OffsetFetchResponse([('foobar', [(0, 123, b'', 29), (1, 234, b'', 29)])]), + Errors.TopicAuthorizationFailedError, False, False), + (OffsetFetchResponse([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]), + None, False, False), +]) +def test_handle_offset_fetch_response(patched_coord, offsets, + response, error, dead, reassign): + future = Future() + patched_coord._handle_offset_fetch_response(future, response) + if error is not None: + assert isinstance(future.exception, error) + else: + assert future.succeeded() + assert future.value == offsets + assert patched_coord.coordinator_id is (None if dead else 0) + assert patched_coord._subscription.needs_partition_assignment is reassign |