diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-25 16:41:50 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-25 16:41:50 -0800 |
commit | 78bbc6d4d4ad67a7af32e10b08cc89ddfdd86322 (patch) | |
tree | 9ae57d5ed5fe0953945a2efee4c06701fd33ff33 | |
parent | d9f886b88de92b266e32b6c9c4706728d6169645 (diff) | |
download | kafka-python-78bbc6d4d4ad67a7af32e10b08cc89ddfdd86322.tar.gz |
Add Fetcher unit tests
-rw-r--r-- | test/test_fetcher.py | 101 |
1 files changed, 101 insertions, 0 deletions
diff --git a/test/test_fetcher.py b/test/test_fetcher.py new file mode 100644 index 0000000..a252f6c --- /dev/null +++ b/test/test_fetcher.py @@ -0,0 +1,101 @@ +# pylint: skip-file +from __future__ import absolute_import + +import pytest + +from kafka.client_async import KafkaClient +from kafka.common import TopicPartition, OffsetAndMetadata +from kafka.consumer.fetcher import Fetcher +from kafka.consumer.subscription_state import SubscriptionState +from kafka.future import Future +from kafka.protocol.fetch import FetchRequest + +import kafka.common as Errors + + +@pytest.fixture +def client(mocker): + return mocker.Mock(spec=KafkaClient) + + +@pytest.fixture +def subscription_state(): + return SubscriptionState() + + +@pytest.fixture +def fetcher(client, subscription_state): + subscription_state.subscribe(topics=['foobar']) + assignment = [TopicPartition('foobar', i) for i in range(3)] + subscription_state.assign_from_subscribed(assignment) + for tp in assignment: + subscription_state.seek(tp, 0) + return Fetcher(client, subscription_state) + + +def test_init_fetches(fetcher, mocker): + fetch_requests = [ + FetchRequest(-1, fetcher.config['fetch_max_wait_ms'], + fetcher.config['fetch_min_bytes'], + [('foobar', [ + (0, 0, fetcher.config['max_partition_fetch_bytes']), + (1, 0, fetcher.config['max_partition_fetch_bytes']), + ])]), + FetchRequest(-1, fetcher.config['fetch_max_wait_ms'], + fetcher.config['fetch_min_bytes'], + [('foobar', [ + (2, 0, fetcher.config['max_partition_fetch_bytes']), + ])]) + ] + + mocker.patch.object(fetcher, '_create_fetch_requests', + return_value = dict(enumerate(fetch_requests))) + + fetcher._records.append('foobar') + ret = fetcher.init_fetches() + assert fetcher._create_fetch_requests.call_count == 0 + assert ret == [] + fetcher._records.clear() + + fetcher._iterator = 'foo' + ret = fetcher.init_fetches() + assert fetcher._create_fetch_requests.call_count == 0 + assert ret == [] + fetcher._iterator = None + + ret = fetcher.init_fetches() + for node, request in enumerate(fetch_requests): + fetcher._client.send.assert_any_call(node, request) + assert len(ret) == len(fetch_requests) + + +def test_update_fetch_positions(fetcher, mocker): + mocker.patch.object(fetcher, '_reset_offset') + partition = TopicPartition('foobar', 0) + + # unassigned partition + fetcher.update_fetch_positions([TopicPartition('fizzbuzz', 0)]) + assert fetcher._reset_offset.call_count == 0 + + # fetchable partition (has offset, not paused) + fetcher.update_fetch_positions([partition]) + assert fetcher._reset_offset.call_count == 0 + + # partition needs reset, no committed offset + fetcher._subscriptions.need_offset_reset(partition) + fetcher._subscriptions.assignment[partition].awaiting_reset = False + fetcher.update_fetch_positions([partition]) + fetcher._reset_offset.assert_called_with(partition) + assert fetcher._subscriptions.assignment[partition].awaiting_reset is True + fetcher.update_fetch_positions([partition]) + fetcher._reset_offset.assert_called_with(partition) + + # partition needs reset, has committed offset + fetcher._reset_offset.reset_mock() + fetcher._subscriptions.need_offset_reset(partition) + fetcher._subscriptions.assignment[partition].awaiting_reset = False + fetcher._subscriptions.assignment[partition].committed = 123 + mocker.patch.object(fetcher._subscriptions, 'seek') + fetcher.update_fetch_positions([partition]) + assert fetcher._reset_offset.call_count == 0 + fetcher._subscriptions.seek.assert_called_with(partition, 123) |