summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorZack Dever <zdever@pandora.com>2016-04-07 17:46:55 -0700
committerZack Dever <zdever@pandora.com>2016-04-13 17:26:39 -0700
commite010669b602ffdfddde6fa2a381dad6c3be1f05d (patch)
tree453f771664ac89d18a1962891580c1a1d7b19b3f /test
parentcaf4cdefe4f41b444d44ddef8f40f5ddeccf65b9 (diff)
downloadkafka-python-e010669b602ffdfddde6fa2a381dad6c3be1f05d.tar.gz
Beginnings of metrics instrumentation in kafka consumer.
This adds the parent metrics instance to kafka consumer, which will eventually be used to instrument everything under consumer. To start I ported the java consumer coordinator metrics.
Diffstat (limited to 'test')
-rw-r--r--test/test_coordinator.py19
1 files changed, 13 insertions, 6 deletions
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index 399609d..4b90f30 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -1,5 +1,6 @@
# pylint: skip-file
from __future__ import absolute_import
+import time
import pytest
@@ -14,6 +15,7 @@ from kafka.coordinator.protocol import (
ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment)
import kafka.errors as Errors
from kafka.future import Future
+from kafka.metrics import Metrics
from kafka.protocol.commit import (
OffsetCommitRequest, OffsetCommitResponse,
OffsetFetchRequest, OffsetFetchResponse)
@@ -23,12 +25,14 @@ from kafka.util import WeakMethod
@pytest.fixture
def coordinator(conn):
- return ConsumerCoordinator(KafkaClient(), SubscriptionState())
+ return ConsumerCoordinator(KafkaClient(), SubscriptionState(), Metrics(),
+ 'consumer')
def test_init(conn):
cli = KafkaClient()
- coordinator = ConsumerCoordinator(cli, SubscriptionState())
+ coordinator = ConsumerCoordinator(cli, SubscriptionState(), Metrics(),
+ 'consumer')
# metadata update on init
assert cli.cluster._need_update is True
@@ -38,6 +42,7 @@ def test_init(conn):
@pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)])
def test_autocommit_enable_api_version(conn, api_version):
coordinator = ConsumerCoordinator(KafkaClient(), SubscriptionState(),
+ Metrics(), 'consumer',
enable_auto_commit=True,
group_id='foobar',
api_version=api_version)
@@ -354,6 +359,7 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable,
mock_warn = mocker.patch('kafka.coordinator.consumer.log.warning')
mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception')
coordinator = ConsumerCoordinator(KafkaClient(), SubscriptionState(),
+ Metrics(), 'consumer',
api_version=api_version,
enable_auto_commit=enable,
group_id=group_id)
@@ -441,7 +447,7 @@ def test_send_offset_commit_request_failure(patched_coord, offsets):
assert future.exception is error
-def test_send_offset_commit_request_success(patched_coord, offsets):
+def test_send_offset_commit_request_success(mocker, patched_coord, offsets):
_f = Future()
patched_coord._client.send.return_value = _f
future = patched_coord._send_offset_commit_request(offsets)
@@ -449,7 +455,7 @@ def test_send_offset_commit_request_success(patched_coord, offsets):
response = OffsetCommitResponse[0]([('foobar', [(0, 0), (1, 0)])])
_f.success(response)
patched_coord._handle_offset_commit_response.assert_called_with(
- offsets, future, response)
+ offsets, future, mocker.ANY, response)
@pytest.mark.parametrize('response,error,dead,reassign', [
@@ -478,10 +484,11 @@ def test_send_offset_commit_request_success(patched_coord, offsets):
(OffsetCommitResponse[0]([('foobar', [(0, 29), (1, 29)])]),
Errors.TopicAuthorizationFailedError, False, False),
])
-def test_handle_offset_commit_response(patched_coord, offsets,
+def test_handle_offset_commit_response(mocker, patched_coord, offsets,
response, error, dead, reassign):
future = Future()
- patched_coord._handle_offset_commit_response(offsets, future, response)
+ patched_coord._handle_offset_commit_response(offsets, future, time.time(),
+ response)
assert isinstance(future.exception, error)
assert patched_coord.coordinator_id is (None if dead else 0)
assert patched_coord._subscription.needs_partition_assignment is reassign