summaryrefslogtreecommitdiff
path: root/test/test_coordinator.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_coordinator.py')
-rw-r--r--test/test_coordinator.py67
1 files changed, 33 insertions, 34 deletions
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index 1dc7788..629b72f 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -16,9 +16,8 @@ from kafka.conn import ConnectionStates
import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.commit import (
- OffsetCommitRequest_v0, OffsetCommitRequest_v1, OffsetCommitRequest_v2,
- OffsetCommitResponse, OffsetFetchRequest_v0, OffsetFetchRequest_v1,
- OffsetFetchResponse)
+ OffsetCommitRequest, OffsetCommitResponse,
+ OffsetFetchRequest, OffsetFetchResponse)
from kafka.protocol.metadata import MetadataResponse
from kafka.util import WeakMethod
@@ -29,7 +28,7 @@ def conn(mocker):
conn.return_value = conn
conn.state = ConnectionStates.CONNECTED
conn.send.return_value = Future().success(
- MetadataResponse(
+ MetadataResponse[0](
[(0, 'foo', 12), (1, 'bar', 34)], # brokers
[])) # topics
return conn
@@ -98,7 +97,7 @@ def test_pattern_subscription(coordinator, api_version):
assert coordinator._subscription.needs_partition_assignment is False
cluster = coordinator._client.cluster
- cluster.update_metadata(MetadataResponse(
+ cluster.update_metadata(MetadataResponse[0](
# brokers
[(0, 'foo', 12), (1, 'bar', 34)],
# topics
@@ -428,9 +427,9 @@ def test_send_offset_commit_request_fail(patched_coord, offsets):
@pytest.mark.parametrize('api_version,req_type', [
- ((0, 8, 1), OffsetCommitRequest_v0),
- ((0, 8, 2), OffsetCommitRequest_v1),
- ((0, 9), OffsetCommitRequest_v2)])
+ ((0, 8, 1), OffsetCommitRequest[0]),
+ ((0, 8, 2), OffsetCommitRequest[1]),
+ ((0, 9), OffsetCommitRequest[2])])
def test_send_offset_commit_request_versions(patched_coord, offsets,
api_version, req_type):
# assuming fixture sets coordinator=0, least_loaded_node=1
@@ -460,36 +459,36 @@ def test_send_offset_commit_request_success(patched_coord, offsets):
patched_coord._client.send.return_value = _f
future = patched_coord._send_offset_commit_request(offsets)
(node, request), _ = patched_coord._client.send.call_args
- response = OffsetCommitResponse([('foobar', [(0, 0), (1, 0)])])
+ response = OffsetCommitResponse[0]([('foobar', [(0, 0), (1, 0)])])
_f.success(response)
patched_coord._handle_offset_commit_response.assert_called_with(
offsets, future, response)
@pytest.mark.parametrize('response,error,dead,reassign', [
- (OffsetCommitResponse([('foobar', [(0, 30), (1, 30)])]),
+ (OffsetCommitResponse[0]([('foobar', [(0, 30), (1, 30)])]),
Errors.GroupAuthorizationFailedError, False, False),
- (OffsetCommitResponse([('foobar', [(0, 12), (1, 12)])]),
+ (OffsetCommitResponse[0]([('foobar', [(0, 12), (1, 12)])]),
Errors.OffsetMetadataTooLargeError, False, False),
- (OffsetCommitResponse([('foobar', [(0, 28), (1, 28)])]),
+ (OffsetCommitResponse[0]([('foobar', [(0, 28), (1, 28)])]),
Errors.InvalidCommitOffsetSizeError, False, False),
- (OffsetCommitResponse([('foobar', [(0, 14), (1, 14)])]),
+ (OffsetCommitResponse[0]([('foobar', [(0, 14), (1, 14)])]),
Errors.GroupLoadInProgressError, False, False),
- (OffsetCommitResponse([('foobar', [(0, 15), (1, 15)])]),
+ (OffsetCommitResponse[0]([('foobar', [(0, 15), (1, 15)])]),
Errors.GroupCoordinatorNotAvailableError, True, False),
- (OffsetCommitResponse([('foobar', [(0, 16), (1, 16)])]),
+ (OffsetCommitResponse[0]([('foobar', [(0, 16), (1, 16)])]),
Errors.NotCoordinatorForGroupError, True, False),
- (OffsetCommitResponse([('foobar', [(0, 7), (1, 7)])]),
+ (OffsetCommitResponse[0]([('foobar', [(0, 7), (1, 7)])]),
Errors.RequestTimedOutError, True, False),
- (OffsetCommitResponse([('foobar', [(0, 25), (1, 25)])]),
+ (OffsetCommitResponse[0]([('foobar', [(0, 25), (1, 25)])]),
Errors.CommitFailedError, False, True),
- (OffsetCommitResponse([('foobar', [(0, 22), (1, 22)])]),
+ (OffsetCommitResponse[0]([('foobar', [(0, 22), (1, 22)])]),
Errors.CommitFailedError, False, True),
- (OffsetCommitResponse([('foobar', [(0, 27), (1, 27)])]),
+ (OffsetCommitResponse[0]([('foobar', [(0, 27), (1, 27)])]),
Errors.CommitFailedError, False, True),
- (OffsetCommitResponse([('foobar', [(0, 17), (1, 17)])]),
+ (OffsetCommitResponse[0]([('foobar', [(0, 17), (1, 17)])]),
Errors.InvalidTopicError, False, False),
- (OffsetCommitResponse([('foobar', [(0, 29), (1, 29)])]),
+ (OffsetCommitResponse[0]([('foobar', [(0, 29), (1, 29)])]),
Errors.TopicAuthorizationFailedError, False, False),
])
def test_handle_offset_commit_response(patched_coord, offsets,
@@ -523,9 +522,9 @@ def test_send_offset_fetch_request_fail(patched_coord, partitions):
@pytest.mark.parametrize('api_version,req_type', [
- ((0, 8, 1), OffsetFetchRequest_v0),
- ((0, 8, 2), OffsetFetchRequest_v1),
- ((0, 9), OffsetFetchRequest_v1)])
+ ((0, 8, 1), OffsetFetchRequest[0]),
+ ((0, 8, 2), OffsetFetchRequest[1]),
+ ((0, 9), OffsetFetchRequest[1])])
def test_send_offset_fetch_request_versions(patched_coord, partitions,
api_version, req_type):
# assuming fixture sets coordinator=0, least_loaded_node=1
@@ -555,30 +554,30 @@ def test_send_offset_fetch_request_success(patched_coord, partitions):
patched_coord._client.send.return_value = _f
future = patched_coord._send_offset_fetch_request(partitions)
(node, request), _ = patched_coord._client.send.call_args
- response = OffsetFetchResponse([('foobar', [(0, 0), (1, 0)])])
+ response = OffsetFetchResponse[0]([('foobar', [(0, 0), (1, 0)])])
_f.success(response)
patched_coord._handle_offset_fetch_response.assert_called_with(
future, response)
@pytest.mark.parametrize('response,error,dead,reassign', [
- #(OffsetFetchResponse([('foobar', [(0, 123, b'', 30), (1, 234, b'', 30)])]),
+ #(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 30), (1, 234, b'', 30)])]),
# Errors.GroupAuthorizationFailedError, False, False),
- #(OffsetFetchResponse([('foobar', [(0, 123, b'', 7), (1, 234, b'', 7)])]),
+ #(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 7), (1, 234, b'', 7)])]),
# Errors.RequestTimedOutError, True, False),
- #(OffsetFetchResponse([('foobar', [(0, 123, b'', 27), (1, 234, b'', 27)])]),
+ #(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 27), (1, 234, b'', 27)])]),
# Errors.RebalanceInProgressError, False, True),
- (OffsetFetchResponse([('foobar', [(0, 123, b'', 14), (1, 234, b'', 14)])]),
+ (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 14), (1, 234, b'', 14)])]),
Errors.GroupLoadInProgressError, False, False),
- (OffsetFetchResponse([('foobar', [(0, 123, b'', 16), (1, 234, b'', 16)])]),
+ (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 16), (1, 234, b'', 16)])]),
Errors.NotCoordinatorForGroupError, True, False),
- (OffsetFetchResponse([('foobar', [(0, 123, b'', 25), (1, 234, b'', 25)])]),
+ (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 25), (1, 234, b'', 25)])]),
Errors.UnknownMemberIdError, False, True),
- (OffsetFetchResponse([('foobar', [(0, 123, b'', 22), (1, 234, b'', 22)])]),
+ (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 22), (1, 234, b'', 22)])]),
Errors.IllegalGenerationError, False, True),
- (OffsetFetchResponse([('foobar', [(0, 123, b'', 29), (1, 234, b'', 29)])]),
+ (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 29), (1, 234, b'', 29)])]),
Errors.TopicAuthorizationFailedError, False, False),
- (OffsetFetchResponse([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]),
+ (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]),
None, False, False),
])
def test_handle_offset_fetch_response(patched_coord, offsets,