summaryrefslogtreecommitdiff
path: root/test/test_coordinator.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-10 03:15:47 -0800
committerDana Powers <dana.powers@rd.io>2016-01-10 11:10:23 -0800
commit240f7029def4027bfccde7b8627c978ab1fdd5a6 (patch)
treec1a177029082b792a76d4893734ce5830992dbe3 /test/test_coordinator.py
parentbbd6444e85a3062224a977f1033da3f393110b87 (diff)
downloadkafka-python-240f7029def4027bfccde7b8627c978ab1fdd5a6.tar.gz
Add ConsumerCoordinator unit tests
Diffstat (limited to 'test/test_coordinator.py')
-rw-r--r--test/test_coordinator.py568
1 files changed, 568 insertions, 0 deletions
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
new file mode 100644
index 0000000..f7c5772
--- /dev/null
+++ b/test/test_coordinator.py
@@ -0,0 +1,568 @@
+# pylint: skip-file
+from __future__ import absolute_import
+
+import pytest
+
+from kafka.client_async import KafkaClient
+from kafka.common import TopicPartition, OffsetAndMetadata
+from kafka.consumer.subscription_state import (
+ SubscriptionState, ConsumerRebalanceListener)
+from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
+from kafka.coordinator.consumer import ConsumerCoordinator
+from kafka.coordinator.protocol import (
+ ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment)
+from kafka.conn import ConnectionStates
+from kafka.future import Future
+from kafka.protocol.commit import (
+ OffsetCommitRequest_v0, OffsetCommitRequest_v1, OffsetCommitRequest_v2,
+ OffsetCommitResponse, OffsetFetchRequest_v0, OffsetFetchRequest_v1,
+ OffsetFetchResponse)
+from kafka.protocol.metadata import MetadataResponse
+
+import kafka.common as Errors
+
+
+@pytest.fixture
+def conn(mocker):
+ conn = mocker.patch('kafka.client_async.BrokerConnection')
+ conn.return_value = conn
+ conn.state = ConnectionStates.CONNECTED
+ conn.send.return_value = Future().success(
+ MetadataResponse(
+ [(0, 'foo', 12), (1, 'bar', 34)], # brokers
+ [])) # topics
+ return conn
+
+
+@pytest.fixture
+def coordinator(conn):
+ return ConsumerCoordinator(KafkaClient(), SubscriptionState())
+
+
+def test_init(conn):
+ cli = KafkaClient()
+ coordinator = ConsumerCoordinator(cli, SubscriptionState())
+
+ # metadata update on init
+ assert cli.cluster._need_update is True
+ assert coordinator._handle_metadata_update in cli.cluster._listeners
+
+
+@pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)])
+def test_autocommit_enable_api_version(conn, api_version):
+ coordinator = ConsumerCoordinator(
+ KafkaClient(), SubscriptionState(), api_version=api_version)
+ if api_version < (0, 8, 1):
+ assert coordinator._auto_commit_task is None
+ else:
+ assert coordinator._auto_commit_task is not None
+
+
+def test_protocol_type(coordinator):
+ assert coordinator.protocol_type() is 'consumer'
+
+
+def test_group_protocols(coordinator):
+ # Requires a subscription
+ try:
+ coordinator.group_protocols()
+ except AssertionError:
+ pass
+ else:
+ assert False, 'Exception not raised when expected'
+
+ coordinator._subscription.subscribe(topics=['foobar'])
+ assert coordinator.group_protocols() == [(
+ 'roundrobin',
+ ConsumerProtocolMemberMetadata(
+ RoundRobinPartitionAssignor.version,
+ ['foobar'],
+ b'')
+ )]
+
+
+@pytest.mark.parametrize('api_version', [(0, 8), (0, 8, 1), (0, 8, 2), (0, 9)])
+def test_pattern_subscription(coordinator, api_version):
+ coordinator.config['api_version'] = api_version
+ coordinator._subscription.subscribe(pattern='foo')
+ assert coordinator._subscription.subscription == set([])
+ assert coordinator._subscription_metadata_changed() is False
+ assert coordinator._subscription.needs_partition_assignment is False
+
+ cluster = coordinator._client.cluster
+ cluster.update_metadata(MetadataResponse(
+ # brokers
+ [(0, 'foo', 12), (1, 'bar', 34)],
+ # topics
+ [(0, 'fizz', []),
+ (0, 'foo1', [(0, 0, 0, [], [])]),
+ (0, 'foo2', [(0, 0, 1, [], [])])]))
+ assert coordinator._subscription.subscription == set(['foo1', 'foo2'])
+
+ # 0.9 consumers should trigger dynamic partition assignment
+ if api_version >= (0, 9):
+ assert coordinator._subscription.needs_partition_assignment is True
+ assert coordinator._subscription.assignment == {}
+
+ # earlier consumers get all partitions assigned locally
+ else:
+ assert coordinator._subscription.needs_partition_assignment is False
+ assert set(coordinator._subscription.assignment.keys()) == set([
+ TopicPartition('foo1', 0),
+ TopicPartition('foo2', 0)])
+
+
+def test_lookup_assignor(coordinator):
+ assignor = coordinator._lookup_assignor('roundrobin')
+ assert assignor is RoundRobinPartitionAssignor
+ assert coordinator._lookup_assignor('foobar') is None
+
+
+def test_join_complete(mocker, coordinator):
+ coordinator._subscription.subscribe(topics=['foobar'])
+ assignor = RoundRobinPartitionAssignor()
+ coordinator.config['assignors'] = (assignor,)
+ mocker.spy(assignor, 'on_assignment')
+ assert assignor.on_assignment.call_count == 0
+ assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'')
+ coordinator._on_join_complete(
+ 0, 'member-foo', 'roundrobin', assignment.encode())
+ assert assignor.on_assignment.call_count == 1
+ assignor.on_assignment.assert_called_with(assignment)
+
+
+def test_subscription_listener(mocker, coordinator):
+ listener = mocker.MagicMock(spec=ConsumerRebalanceListener)
+ coordinator._subscription.subscribe(
+ topics=['foobar'],
+ listener=listener)
+
+ coordinator._on_join_prepare(0, 'member-foo')
+ assert listener.on_partitions_revoked.call_count == 1
+ listener.on_partitions_revoked.assert_called_with(set([]))
+
+ assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'')
+ coordinator._on_join_complete(
+ 0, 'member-foo', 'roundrobin', assignment.encode())
+ assert listener.on_partitions_assigned.call_count == 1
+ listener.on_partitions_assigned.assert_called_with(set([
+ TopicPartition('foobar', 0),
+ TopicPartition('foobar', 1)]))
+
+
+def test_subscription_listener_failure(mocker, coordinator):
+ listener = mocker.MagicMock(spec=ConsumerRebalanceListener)
+ coordinator._subscription.subscribe(
+ topics=['foobar'],
+ listener=listener)
+
+ # exception raised in listener should not be re-raised by coordinator
+ listener.on_partitions_revoked.side_effect = Exception('crash')
+ coordinator._on_join_prepare(0, 'member-foo')
+ assert listener.on_partitions_revoked.call_count == 1
+
+ assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'')
+ coordinator._on_join_complete(
+ 0, 'member-foo', 'roundrobin', assignment.encode())
+ assert listener.on_partitions_assigned.call_count == 1
+
+
+def test_perform_assignment(mocker, coordinator):
+ member_metadata = {
+ 'member-foo': ConsumerProtocolMemberMetadata(0, ['foo1'], b''),
+ 'member-bar': ConsumerProtocolMemberMetadata(0, ['foo1'], b'')
+ }
+ assignments = {
+ 'member-foo': ConsumerProtocolMemberAssignment(
+ 0, [('foo1', [0])], b''),
+ 'member-bar': ConsumerProtocolMemberAssignment(
+ 0, [('foo1', [1])], b'')
+ }
+
+ mocker.patch.object(RoundRobinPartitionAssignor, 'assign')
+ RoundRobinPartitionAssignor.assign.return_value = assignments
+
+ ret = coordinator._perform_assignment(
+ 'member-foo', 'roundrobin',
+ [(member, metadata.encode())
+ for member, metadata in member_metadata.items()])
+
+ assert RoundRobinPartitionAssignor.assign.call_count == 1
+ RoundRobinPartitionAssignor.assign.assert_called_with(
+ coordinator._client.cluster, member_metadata)
+ assert ret == assignments
+
+
+def test_on_join_prepare(coordinator):
+ coordinator._subscription.subscribe(topics=['foobar'])
+ coordinator._on_join_prepare(0, 'member-foo')
+ assert coordinator._subscription.needs_partition_assignment is True
+
+
+def test_need_rejoin(coordinator):
+ # No subscription - no rejoin
+ assert coordinator.need_rejoin() is False
+
+ coordinator._subscription.subscribe(topics=['foobar'])
+ assert coordinator.need_rejoin() is True
+
+ coordinator._subscription.needs_partition_assignment = False
+ coordinator.rejoin_needed = False
+ assert coordinator.need_rejoin() is False
+
+ coordinator._subscription.needs_partition_assignment = True
+ assert coordinator.need_rejoin() is True
+
+
+def test_refresh_committed_offsets_if_needed(mocker, coordinator):
+ mocker.patch.object(ConsumerCoordinator, 'fetch_committed_offsets',
+ return_value = {
+ TopicPartition('foobar', 0): OffsetAndMetadata(123, b''),
+ TopicPartition('foobar', 1): OffsetAndMetadata(234, b'')})
+ coordinator._subscription.assign_from_user([TopicPartition('foobar', 0)])
+ assert coordinator._subscription.needs_fetch_committed_offsets is True
+ coordinator.refresh_committed_offsets_if_needed()
+ assignment = coordinator._subscription.assignment
+ assert assignment[TopicPartition('foobar', 0)].committed == 123
+ assert TopicPartition('foobar', 1) not in assignment
+ assert coordinator._subscription.needs_fetch_committed_offsets is False
+
+
+def test_fetch_committed_offsets(mocker, coordinator):
+
+ # No partitions, no IO polling
+ mocker.patch.object(coordinator._client, 'poll')
+ assert coordinator.fetch_committed_offsets([]) == {}
+ assert coordinator._client.poll.call_count == 0
+
+ # general case -- send offset fetch request, get successful future
+ mocker.patch.object(coordinator, 'ensure_coordinator_known')
+ mocker.patch.object(coordinator, '_send_offset_fetch_request',
+ return_value=Future().success('foobar'))
+ partitions = [TopicPartition('foobar', 0)]
+ ret = coordinator.fetch_committed_offsets(partitions)
+ assert ret == 'foobar'
+ coordinator._send_offset_fetch_request.assert_called_with(partitions)
+ assert coordinator._client.poll.call_count == 1
+
+ # Failed future is raised if not retriable
+ coordinator._send_offset_fetch_request.return_value = Future().failure(AssertionError)
+ coordinator._client.poll.reset_mock()
+ try:
+ coordinator.fetch_committed_offsets(partitions)
+ except AssertionError:
+ pass
+ else:
+ assert False, 'Exception not raised when expected'
+ assert coordinator._client.poll.call_count == 1
+
+ coordinator._client.poll.reset_mock()
+ coordinator._send_offset_fetch_request.side_effect = [
+ Future().failure(Errors.RequestTimedOutError),
+ Future().success('fizzbuzz')]
+
+ ret = coordinator.fetch_committed_offsets(partitions)
+ assert ret == 'fizzbuzz'
+ assert coordinator._client.poll.call_count == 2 # call + retry
+
+
+def test_close(mocker, coordinator):
+ mocker.patch.object(coordinator, '_maybe_auto_commit_offsets_sync')
+ mocker.patch.object(coordinator, '_handle_leave_group_response')
+ coordinator.coordinator_id = 0
+ coordinator.generation = 1
+ cli = coordinator._client
+ mocker.patch.object(cli, 'unschedule')
+ mocker.patch.object(cli, 'send', return_value=Future().success('foobar'))
+ mocker.patch.object(cli, 'poll')
+
+ coordinator.close()
+ assert coordinator._maybe_auto_commit_offsets_sync.call_count == 1
+ cli.unschedule.assert_called_with(coordinator.heartbeat_task)
+ coordinator._handle_leave_group_response.assert_called_with('foobar')
+
+ assert coordinator.generation == -1
+ assert coordinator.member_id == ''
+ assert coordinator.rejoin_needed is True
+
+
+@pytest.fixture
+def offsets():
+ return {
+ TopicPartition('foobar', 0): OffsetAndMetadata(123, b''),
+ TopicPartition('foobar', 1): OffsetAndMetadata(234, b''),
+ }
+
+
+def test_commit_offsets_async(mocker, coordinator, offsets):
+ mocker.patch.object(coordinator._client, 'poll')
+ mocker.patch.object(coordinator, 'ensure_coordinator_known')
+ mocker.patch.object(coordinator, '_send_offset_commit_request',
+ return_value=Future().success('fizzbuzz'))
+ ret = coordinator.commit_offsets_async(offsets)
+ assert isinstance(ret, Future)
+ assert coordinator._send_offset_commit_request.call_count == 1
+
+
+def test_commit_offsets_sync(mocker, coordinator, offsets):
+ mocker.patch.object(coordinator, 'ensure_coordinator_known')
+ mocker.patch.object(coordinator, '_send_offset_commit_request',
+ return_value=Future().success('fizzbuzz'))
+ cli = coordinator._client
+ mocker.patch.object(cli, 'poll')
+
+ # No offsets, no calls
+ assert coordinator.commit_offsets_sync({}) is None
+ assert coordinator._send_offset_commit_request.call_count == 0
+ assert cli.poll.call_count == 0
+
+ ret = coordinator.commit_offsets_sync(offsets)
+ assert coordinator._send_offset_commit_request.call_count == 1
+ assert cli.poll.call_count == 1
+ assert ret == 'fizzbuzz'
+
+ # Failed future is raised if not retriable
+ coordinator._send_offset_commit_request.return_value = Future().failure(AssertionError)
+ coordinator._client.poll.reset_mock()
+ try:
+ coordinator.commit_offsets_sync(offsets)
+ except AssertionError:
+ pass
+ else:
+ assert False, 'Exception not raised when expected'
+ assert coordinator._client.poll.call_count == 1
+
+ coordinator._client.poll.reset_mock()
+ coordinator._send_offset_commit_request.side_effect = [
+ Future().failure(Errors.RequestTimedOutError),
+ Future().success('fizzbuzz')]
+
+ ret = coordinator.commit_offsets_sync(offsets)
+ assert ret == 'fizzbuzz'
+ assert coordinator._client.poll.call_count == 2 # call + retry
+
+
+@pytest.mark.parametrize(
+ 'api_version,enable,error,task_disable,commit_offsets,warn,exc', [
+ ((0, 8), True, None, False, False, False, False),
+ ((0, 9), False, None, False, False, False, False),
+ ((0, 9), True, Errors.UnknownMemberIdError(), True, True, True, False),
+ ((0, 9), True, Errors.IllegalGenerationError(), True, True, True, False),
+ ((0, 9), True, Errors.RebalanceInProgressError(), True, True, True, False),
+ ((0, 9), True, Exception(), True, True, False, True),
+ ((0, 9), True, None, True, True, False, False),
+ ])
+def test_maybe_auto_commit_offsets_sync(mocker, coordinator,
+ api_version, enable, error, task_disable,
+ commit_offsets, warn, exc):
+ auto_commit_task = mocker.patch.object(coordinator, '_auto_commit_task')
+ commit_sync = mocker.patch.object(coordinator, 'commit_offsets_sync',
+ side_effect=error)
+ mock_warn = mocker.patch('kafka.coordinator.consumer.log.warning')
+ mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception')
+
+ coordinator.config['api_version'] = api_version
+ coordinator.config['enable_auto_commit'] = enable
+ assert coordinator._maybe_auto_commit_offsets_sync() is None
+ assert auto_commit_task.disable.call_count == (1 if task_disable else 0)
+ assert commit_sync.call_count == (1 if commit_offsets else 0)
+ assert mock_warn.call_count == (1 if warn else 0)
+ assert mock_exc.call_count == (1 if exc else 0)
+
+
+@pytest.fixture
+def patched_coord(mocker, coordinator):
+ coordinator._subscription.subscribe(topics=['foobar'])
+ coordinator._subscription.needs_partition_assignment = False
+ mocker.patch.object(coordinator, 'coordinator_unknown')
+ coordinator.coordinator_unknown.return_value = False
+ coordinator.coordinator_id = 0
+ mocker.patch.object(coordinator._client, 'least_loaded_node',
+ return_value=1)
+ mocker.patch.object(coordinator._client, 'send')
+ mocker.spy(coordinator, '_failed_request')
+ mocker.spy(coordinator, '_handle_offset_commit_response')
+ mocker.spy(coordinator, '_handle_offset_fetch_response')
+ return coordinator
+
+
+def test_send_offset_commit_request_fail(patched_coord, offsets):
+ patched_coord.coordinator_unknown.return_value = True
+ patched_coord.coordinator_id = None
+
+ # No offsets
+ ret = patched_coord._send_offset_commit_request({})
+ assert isinstance(ret, Future)
+ assert ret.succeeded()
+
+ # No coordinator
+ ret = patched_coord._send_offset_commit_request(offsets)
+ assert ret.failed()
+ assert isinstance(ret.exception, Errors.GroupCoordinatorNotAvailableError)
+
+
+@pytest.mark.parametrize('api_version,req_type', [
+ ((0, 8, 1), OffsetCommitRequest_v0),
+ ((0, 8, 2), OffsetCommitRequest_v1),
+ ((0, 9), OffsetCommitRequest_v2)])
+def test_send_offset_commit_request_versions(patched_coord, offsets,
+ api_version, req_type):
+ # assuming fixture sets coordinator=0, least_loaded_node=1
+ expect_node = 0 if api_version >= (0, 8, 2) else 1
+ patched_coord.config['api_version'] = api_version
+
+ patched_coord._send_offset_commit_request(offsets)
+ (node, request), _ = patched_coord._client.send.call_args
+ assert node == expect_node, 'Unexpected coordinator node'
+ assert isinstance(request, req_type)
+
+
+def test_send_offset_commit_request_failure(patched_coord, offsets):
+ _f = Future()
+ patched_coord._client.send.return_value = _f
+ future = patched_coord._send_offset_commit_request(offsets)
+ (node, request), _ = patched_coord._client.send.call_args
+ error = Exception()
+ _f.failure(error)
+ patched_coord._failed_request.assert_called_with(0, request, future, error)
+ assert future.failed()
+ assert future.exception is error
+
+
+def test_send_offset_commit_request_success(patched_coord, offsets):
+ _f = Future()
+ patched_coord._client.send.return_value = _f
+ future = patched_coord._send_offset_commit_request(offsets)
+ (node, request), _ = patched_coord._client.send.call_args
+ response = OffsetCommitResponse([('foobar', [(0, 0), (1, 0)])])
+ _f.success(response)
+ patched_coord._handle_offset_commit_response.assert_called_with(
+ offsets, future, response)
+
+
+@pytest.mark.parametrize('response,error,dead,reassign', [
+ (OffsetCommitResponse([('foobar', [(0, 30), (1, 30)])]),
+ Errors.GroupAuthorizationFailedError, False, False),
+ (OffsetCommitResponse([('foobar', [(0, 12), (1, 12)])]),
+ Errors.OffsetMetadataTooLargeError, False, False),
+ (OffsetCommitResponse([('foobar', [(0, 28), (1, 28)])]),
+ Errors.InvalidCommitOffsetSizeError, False, False),
+ (OffsetCommitResponse([('foobar', [(0, 14), (1, 14)])]),
+ Errors.GroupLoadInProgressError, False, False),
+ (OffsetCommitResponse([('foobar', [(0, 15), (1, 15)])]),
+ Errors.GroupCoordinatorNotAvailableError, True, False),
+ (OffsetCommitResponse([('foobar', [(0, 16), (1, 16)])]),
+ Errors.NotCoordinatorForGroupError, True, False),
+ (OffsetCommitResponse([('foobar', [(0, 7), (1, 7)])]),
+ Errors.RequestTimedOutError, True, False),
+ (OffsetCommitResponse([('foobar', [(0, 25), (1, 25)])]),
+ Errors.UnknownMemberIdError, False, True),
+ (OffsetCommitResponse([('foobar', [(0, 22), (1, 22)])]),
+ Errors.IllegalGenerationError, False, True),
+ (OffsetCommitResponse([('foobar', [(0, 27), (1, 27)])]),
+ Errors.RebalanceInProgressError, False, True),
+ (OffsetCommitResponse([('foobar', [(0, 17), (1, 17)])]),
+ Errors.InvalidTopicError, False, False),
+ (OffsetCommitResponse([('foobar', [(0, 29), (1, 29)])]),
+ Errors.TopicAuthorizationFailedError, False, False),
+])
+def test_handle_offset_commit_response(patched_coord, offsets,
+ response, error, dead, reassign):
+ future = Future()
+ patched_coord._handle_offset_commit_response(offsets, future, response)
+ assert isinstance(future.exception, error)
+ assert patched_coord.coordinator_id is (None if dead else 0)
+ assert patched_coord._subscription.needs_partition_assignment is reassign
+
+
+@pytest.fixture
+def partitions():
+ return [TopicPartition('foobar', 0), TopicPartition('foobar', 1)]
+
+
+def test_send_offset_fetch_request_fail(patched_coord, partitions):
+ patched_coord.coordinator_unknown.return_value = True
+ patched_coord.coordinator_id = None
+
+ # No partitions
+ ret = patched_coord._send_offset_fetch_request([])
+ assert isinstance(ret, Future)
+ assert ret.succeeded()
+ assert ret.value == {}
+
+ # No coordinator
+ ret = patched_coord._send_offset_fetch_request(partitions)
+ assert ret.failed()
+ assert isinstance(ret.exception, Errors.GroupCoordinatorNotAvailableError)
+
+
+@pytest.mark.parametrize('api_version,req_type', [
+ ((0, 8, 1), OffsetFetchRequest_v0),
+ ((0, 8, 2), OffsetFetchRequest_v1),
+ ((0, 9), OffsetFetchRequest_v1)])
+def test_send_offset_fetch_request_versions(patched_coord, partitions,
+ api_version, req_type):
+ # assuming fixture sets coordinator=0, least_loaded_node=1
+ expect_node = 0 if api_version >= (0, 8, 2) else 1
+ patched_coord.config['api_version'] = api_version
+
+ patched_coord._send_offset_fetch_request(partitions)
+ (node, request), _ = patched_coord._client.send.call_args
+ assert node == expect_node, 'Unexpected coordinator node'
+ assert isinstance(request, req_type)
+
+
+def test_send_offset_fetch_request_failure(patched_coord, partitions):
+ _f = Future()
+ patched_coord._client.send.return_value = _f
+ future = patched_coord._send_offset_fetch_request(partitions)
+ (node, request), _ = patched_coord._client.send.call_args
+ error = Exception()
+ _f.failure(error)
+ patched_coord._failed_request.assert_called_with(0, request, future, error)
+ assert future.failed()
+ assert future.exception is error
+
+
+def test_send_offset_fetch_request_success(patched_coord, partitions):
+ _f = Future()
+ patched_coord._client.send.return_value = _f
+ future = patched_coord._send_offset_fetch_request(partitions)
+ (node, request), _ = patched_coord._client.send.call_args
+ response = OffsetFetchResponse([('foobar', [(0, 0), (1, 0)])])
+ _f.success(response)
+ patched_coord._handle_offset_fetch_response.assert_called_with(
+ future, response)
+
+
+@pytest.mark.parametrize('response,error,dead,reassign', [
+ #(OffsetFetchResponse([('foobar', [(0, 123, b'', 30), (1, 234, b'', 30)])]),
+ # Errors.GroupAuthorizationFailedError, False, False),
+ #(OffsetFetchResponse([('foobar', [(0, 123, b'', 7), (1, 234, b'', 7)])]),
+ # Errors.RequestTimedOutError, True, False),
+ #(OffsetFetchResponse([('foobar', [(0, 123, b'', 27), (1, 234, b'', 27)])]),
+ # Errors.RebalanceInProgressError, False, True),
+ (OffsetFetchResponse([('foobar', [(0, 123, b'', 14), (1, 234, b'', 14)])]),
+ Errors.GroupLoadInProgressError, False, False),
+ (OffsetFetchResponse([('foobar', [(0, 123, b'', 16), (1, 234, b'', 16)])]),
+ Errors.NotCoordinatorForGroupError, True, False),
+ (OffsetFetchResponse([('foobar', [(0, 123, b'', 25), (1, 234, b'', 25)])]),
+ Errors.UnknownMemberIdError, False, True),
+ (OffsetFetchResponse([('foobar', [(0, 123, b'', 22), (1, 234, b'', 22)])]),
+ Errors.IllegalGenerationError, False, True),
+ (OffsetFetchResponse([('foobar', [(0, 123, b'', 29), (1, 234, b'', 29)])]),
+ Errors.TopicAuthorizationFailedError, False, False),
+ (OffsetFetchResponse([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]),
+ None, False, False),
+])
+def test_handle_offset_fetch_response(patched_coord, offsets,
+ response, error, dead, reassign):
+ future = Future()
+ patched_coord._handle_offset_fetch_response(future, response)
+ if error is not None:
+ assert isinstance(future.exception, error)
+ else:
+ assert future.succeeded()
+ assert future.value == offsets
+ assert patched_coord.coordinator_id is (None if dead else 0)
+ assert patched_coord._subscription.needs_partition_assignment is reassign