summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2018-02-02 16:36:30 -0800
committerGitHub <noreply@github.com>2018-02-02 16:36:30 -0800
commit618c5051493693c1305aa9f08e8a0583d5fcf0e3 (patch)
tree3a2fcec8260915a83f19a603671c4a0e5461cca0 /test
parent08a7fb7b754a754c6c64e96d4ba5c4f56cf38a5f (diff)
downloadkafka-python-618c5051493693c1305aa9f08e8a0583d5fcf0e3.tar.gz
KAFKA-3949: Avoid race condition when subscription changes during rebalance (#1364)
Diffstat (limited to 'test')
-rw-r--r--test/test_coordinator.py58
1 files changed, 21 insertions, 37 deletions
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index e094b9c..7a2627e 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -62,7 +62,7 @@ def test_group_protocols(coordinator):
# Requires a subscription
try:
coordinator.group_protocols()
- except AssertionError:
+ except Errors.IllegalStateError:
pass
else:
assert False, 'Exception not raised when expected'
@@ -85,8 +85,7 @@ def test_pattern_subscription(coordinator, api_version):
coordinator.config['api_version'] = api_version
coordinator._subscription.subscribe(pattern='foo')
assert coordinator._subscription.subscription == set([])
- assert coordinator._subscription_metadata_changed({}) is False
- assert coordinator._subscription.needs_partition_assignment is False
+ assert coordinator._metadata_snapshot == coordinator._build_metadata_snapshot(coordinator._subscription, {})
cluster = coordinator._client.cluster
cluster.update_metadata(MetadataResponse[0](
@@ -100,12 +99,10 @@ def test_pattern_subscription(coordinator, api_version):
# 0.9 consumers should trigger dynamic partition assignment
if api_version >= (0, 9):
- assert coordinator._subscription.needs_partition_assignment is True
assert coordinator._subscription.assignment == {}
# earlier consumers get all partitions assigned locally
else:
- assert coordinator._subscription.needs_partition_assignment is False
assert set(coordinator._subscription.assignment.keys()) == set([
TopicPartition('foo1', 0),
TopicPartition('foo2', 0)])
@@ -195,7 +192,6 @@ def test_perform_assignment(mocker, coordinator):
def test_on_join_prepare(coordinator):
coordinator._subscription.subscribe(topics=['foobar'])
coordinator._on_join_prepare(0, 'member-foo')
- assert coordinator._subscription.needs_partition_assignment is True
def test_need_rejoin(coordinator):
@@ -205,13 +201,6 @@ def test_need_rejoin(coordinator):
coordinator._subscription.subscribe(topics=['foobar'])
assert coordinator.need_rejoin() is True
- coordinator._subscription.needs_partition_assignment = False
- coordinator.rejoin_needed = False
- assert coordinator.need_rejoin() is False
-
- coordinator._subscription.needs_partition_assignment = True
- assert coordinator.need_rejoin() is True
-
def test_refresh_committed_offsets_if_needed(mocker, coordinator):
mocker.patch.object(ConsumerCoordinator, 'fetch_committed_offsets',
@@ -388,7 +377,6 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable,
@pytest.fixture
def patched_coord(mocker, coordinator):
coordinator._subscription.subscribe(topics=['foobar'])
- coordinator._subscription.needs_partition_assignment = False
mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False)
coordinator.coordinator_id = 0
mocker.patch.object(coordinator, 'coordinator', return_value=0)
@@ -461,47 +449,39 @@ def test_send_offset_commit_request_success(mocker, patched_coord, offsets):
offsets, future, mocker.ANY, response)
-@pytest.mark.parametrize('response,error,dead,reassign', [
+@pytest.mark.parametrize('response,error,dead', [
(OffsetCommitResponse[0]([('foobar', [(0, 30), (1, 30)])]),
- Errors.GroupAuthorizationFailedError, False, False),
+ Errors.GroupAuthorizationFailedError, False),
(OffsetCommitResponse[0]([('foobar', [(0, 12), (1, 12)])]),
- Errors.OffsetMetadataTooLargeError, False, False),
+ Errors.OffsetMetadataTooLargeError, False),
(OffsetCommitResponse[0]([('foobar', [(0, 28), (1, 28)])]),
- Errors.InvalidCommitOffsetSizeError, False, False),
+ Errors.InvalidCommitOffsetSizeError, False),
(OffsetCommitResponse[0]([('foobar', [(0, 14), (1, 14)])]),
- Errors.GroupLoadInProgressError, False, False),
+ Errors.GroupLoadInProgressError, False),
(OffsetCommitResponse[0]([('foobar', [(0, 15), (1, 15)])]),
- Errors.GroupCoordinatorNotAvailableError, True, False),
+ Errors.GroupCoordinatorNotAvailableError, True),
(OffsetCommitResponse[0]([('foobar', [(0, 16), (1, 16)])]),
- Errors.NotCoordinatorForGroupError, True, False),
+ Errors.NotCoordinatorForGroupError, True),
(OffsetCommitResponse[0]([('foobar', [(0, 7), (1, 7)])]),
- Errors.RequestTimedOutError, True, False),
+ Errors.RequestTimedOutError, True),
(OffsetCommitResponse[0]([('foobar', [(0, 25), (1, 25)])]),
- Errors.CommitFailedError, False, True),
+ Errors.CommitFailedError, False),
(OffsetCommitResponse[0]([('foobar', [(0, 22), (1, 22)])]),
- Errors.CommitFailedError, False, True),
+ Errors.CommitFailedError, False),
(OffsetCommitResponse[0]([('foobar', [(0, 27), (1, 27)])]),
- Errors.CommitFailedError, False, True),
+ Errors.CommitFailedError, False),
(OffsetCommitResponse[0]([('foobar', [(0, 17), (1, 17)])]),
- Errors.InvalidTopicError, False, False),
+ Errors.InvalidTopicError, False),
(OffsetCommitResponse[0]([('foobar', [(0, 29), (1, 29)])]),
- Errors.TopicAuthorizationFailedError, False, False),
+ Errors.TopicAuthorizationFailedError, False),
])
def test_handle_offset_commit_response(mocker, patched_coord, offsets,
- response, error, dead, reassign):
+ response, error, dead):
future = Future()
patched_coord._handle_offset_commit_response(offsets, future, time.time(),
response)
assert isinstance(future.exception, error)
assert patched_coord.coordinator_id is (None if dead else 0)
- if reassign:
- assert patched_coord._generation is Generation.NO_GENERATION
- assert patched_coord.rejoin_needed is True
- assert patched_coord.state is MemberState.UNJOINED
- else:
- assert patched_coord._generation is not Generation.NO_GENERATION
- assert patched_coord.rejoin_needed is False
- assert patched_coord.state is MemberState.STABLE
@pytest.fixture
@@ -570,6 +550,10 @@ def test_send_offset_fetch_request_success(patched_coord, partitions):
Errors.GroupLoadInProgressError, False),
(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 16), (1, 234, b'', 16)])]),
Errors.NotCoordinatorForGroupError, True),
+ (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 25), (1, 234, b'', 25)])]),
+ Errors.UnknownMemberIdError, False),
+ (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 22), (1, 234, b'', 22)])]),
+ Errors.IllegalGenerationError, False),
(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 29), (1, 234, b'', 29)])]),
Errors.TopicAuthorizationFailedError, False),
(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]),
@@ -627,7 +611,7 @@ def test_ensure_active_group(mocker, coordinator):
coordinator._subscription.subscribe(topics=['foobar'])
mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False)
mocker.patch.object(coordinator, '_send_join_group_request', return_value=Future().success(True))
- mocker.patch.object(coordinator, 'need_rejoin', side_effect=[True, True, False])
+ mocker.patch.object(coordinator, 'need_rejoin', side_effect=[True, False])
mocker.patch.object(coordinator, '_on_join_complete')
mocker.patch.object(coordinator, '_heartbeat_thread')