summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-25 16:41:50 -0800
committerDana Powers <dana.powers@rd.io>2016-01-25 16:41:50 -0800
commit78bbc6d4d4ad67a7af32e10b08cc89ddfdd86322 (patch)
tree9ae57d5ed5fe0953945a2efee4c06701fd33ff33
parentd9f886b88de92b266e32b6c9c4706728d6169645 (diff)
downloadkafka-python-78bbc6d4d4ad67a7af32e10b08cc89ddfdd86322.tar.gz
Add Fetcher unit tests
-rw-r--r--test/test_fetcher.py101
1 files changed, 101 insertions, 0 deletions
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
new file mode 100644
index 0000000..a252f6c
--- /dev/null
+++ b/test/test_fetcher.py
@@ -0,0 +1,101 @@
+# pylint: skip-file
+from __future__ import absolute_import
+
+import pytest
+
+from kafka.client_async import KafkaClient
+from kafka.common import TopicPartition, OffsetAndMetadata
+from kafka.consumer.fetcher import Fetcher
+from kafka.consumer.subscription_state import SubscriptionState
+from kafka.future import Future
+from kafka.protocol.fetch import FetchRequest
+
+import kafka.common as Errors
+
+
+@pytest.fixture
+def client(mocker):
+ return mocker.Mock(spec=KafkaClient)
+
+
+@pytest.fixture
+def subscription_state():
+ return SubscriptionState()
+
+
+@pytest.fixture
+def fetcher(client, subscription_state):
+ subscription_state.subscribe(topics=['foobar'])
+ assignment = [TopicPartition('foobar', i) for i in range(3)]
+ subscription_state.assign_from_subscribed(assignment)
+ for tp in assignment:
+ subscription_state.seek(tp, 0)
+ return Fetcher(client, subscription_state)
+
+
+def test_init_fetches(fetcher, mocker):
+ fetch_requests = [
+ FetchRequest(-1, fetcher.config['fetch_max_wait_ms'],
+ fetcher.config['fetch_min_bytes'],
+ [('foobar', [
+ (0, 0, fetcher.config['max_partition_fetch_bytes']),
+ (1, 0, fetcher.config['max_partition_fetch_bytes']),
+ ])]),
+ FetchRequest(-1, fetcher.config['fetch_max_wait_ms'],
+ fetcher.config['fetch_min_bytes'],
+ [('foobar', [
+ (2, 0, fetcher.config['max_partition_fetch_bytes']),
+ ])])
+ ]
+
+ mocker.patch.object(fetcher, '_create_fetch_requests',
+ return_value = dict(enumerate(fetch_requests)))
+
+ fetcher._records.append('foobar')
+ ret = fetcher.init_fetches()
+ assert fetcher._create_fetch_requests.call_count == 0
+ assert ret == []
+ fetcher._records.clear()
+
+ fetcher._iterator = 'foo'
+ ret = fetcher.init_fetches()
+ assert fetcher._create_fetch_requests.call_count == 0
+ assert ret == []
+ fetcher._iterator = None
+
+ ret = fetcher.init_fetches()
+ for node, request in enumerate(fetch_requests):
+ fetcher._client.send.assert_any_call(node, request)
+ assert len(ret) == len(fetch_requests)
+
+
+def test_update_fetch_positions(fetcher, mocker):
+ mocker.patch.object(fetcher, '_reset_offset')
+ partition = TopicPartition('foobar', 0)
+
+ # unassigned partition
+ fetcher.update_fetch_positions([TopicPartition('fizzbuzz', 0)])
+ assert fetcher._reset_offset.call_count == 0
+
+ # fetchable partition (has offset, not paused)
+ fetcher.update_fetch_positions([partition])
+ assert fetcher._reset_offset.call_count == 0
+
+ # partition needs reset, no committed offset
+ fetcher._subscriptions.need_offset_reset(partition)
+ fetcher._subscriptions.assignment[partition].awaiting_reset = False
+ fetcher.update_fetch_positions([partition])
+ fetcher._reset_offset.assert_called_with(partition)
+ assert fetcher._subscriptions.assignment[partition].awaiting_reset is True
+ fetcher.update_fetch_positions([partition])
+ fetcher._reset_offset.assert_called_with(partition)
+
+ # partition needs reset, has committed offset
+ fetcher._reset_offset.reset_mock()
+ fetcher._subscriptions.need_offset_reset(partition)
+ fetcher._subscriptions.assignment[partition].awaiting_reset = False
+ fetcher._subscriptions.assignment[partition].committed = 123
+ mocker.patch.object(fetcher._subscriptions, 'seek')
+ fetcher.update_fetch_positions([partition])
+ assert fetcher._reset_offset.call_count == 0
+ fetcher._subscriptions.seek.assert_called_with(partition, 123)