diff options
author | Zack Dever <zdever@pandora.com> | 2016-04-07 17:46:55 -0700 |
---|---|---|
committer | Zack Dever <zdever@pandora.com> | 2016-04-13 17:26:39 -0700 |
commit | e010669b602ffdfddde6fa2a381dad6c3be1f05d (patch) | |
tree | 453f771664ac89d18a1962891580c1a1d7b19b3f /test | |
parent | caf4cdefe4f41b444d44ddef8f40f5ddeccf65b9 (diff) | |
download | kafka-python-e010669b602ffdfddde6fa2a381dad6c3be1f05d.tar.gz |
Beginnings of metrics instrumentation in kafka consumer.
This adds the parent metrics instance to kafka consumer, which will
eventually be used to instrument everything under consumer. To start
I ported the java consumer coordinator metrics.
Diffstat (limited to 'test')
-rw-r--r-- | test/test_coordinator.py | 19 |
1 files changed, 13 insertions, 6 deletions
diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 399609d..4b90f30 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -1,5 +1,6 @@ # pylint: skip-file from __future__ import absolute_import +import time import pytest @@ -14,6 +15,7 @@ from kafka.coordinator.protocol import ( ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment) import kafka.errors as Errors from kafka.future import Future +from kafka.metrics import Metrics from kafka.protocol.commit import ( OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse) @@ -23,12 +25,14 @@ from kafka.util import WeakMethod @pytest.fixture def coordinator(conn): - return ConsumerCoordinator(KafkaClient(), SubscriptionState()) + return ConsumerCoordinator(KafkaClient(), SubscriptionState(), Metrics(), + 'consumer') def test_init(conn): cli = KafkaClient() - coordinator = ConsumerCoordinator(cli, SubscriptionState()) + coordinator = ConsumerCoordinator(cli, SubscriptionState(), Metrics(), + 'consumer') # metadata update on init assert cli.cluster._need_update is True @@ -38,6 +42,7 @@ def test_init(conn): @pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)]) def test_autocommit_enable_api_version(conn, api_version): coordinator = ConsumerCoordinator(KafkaClient(), SubscriptionState(), + Metrics(), 'consumer', enable_auto_commit=True, group_id='foobar', api_version=api_version) @@ -354,6 +359,7 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable, mock_warn = mocker.patch('kafka.coordinator.consumer.log.warning') mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception') coordinator = ConsumerCoordinator(KafkaClient(), SubscriptionState(), + Metrics(), 'consumer', api_version=api_version, enable_auto_commit=enable, group_id=group_id) @@ -441,7 +447,7 @@ def test_send_offset_commit_request_failure(patched_coord, offsets): assert future.exception is error -def test_send_offset_commit_request_success(patched_coord, offsets): +def test_send_offset_commit_request_success(mocker, patched_coord, offsets): _f = Future() patched_coord._client.send.return_value = _f future = patched_coord._send_offset_commit_request(offsets) @@ -449,7 +455,7 @@ def test_send_offset_commit_request_success(patched_coord, offsets): response = OffsetCommitResponse[0]([('foobar', [(0, 0), (1, 0)])]) _f.success(response) patched_coord._handle_offset_commit_response.assert_called_with( - offsets, future, response) + offsets, future, mocker.ANY, response) @pytest.mark.parametrize('response,error,dead,reassign', [ @@ -478,10 +484,11 @@ def test_send_offset_commit_request_success(patched_coord, offsets): (OffsetCommitResponse[0]([('foobar', [(0, 29), (1, 29)])]), Errors.TopicAuthorizationFailedError, False, False), ]) -def test_handle_offset_commit_response(patched_coord, offsets, +def test_handle_offset_commit_response(mocker, patched_coord, offsets, response, error, dead, reassign): future = Future() - patched_coord._handle_offset_commit_response(offsets, future, response) + patched_coord._handle_offset_commit_response(offsets, future, time.time(), + response) assert isinstance(future.exception, error) assert patched_coord.coordinator_id is (None if dead else 0) assert patched_coord._subscription.needs_partition_assignment is reassign |