summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-10-07 23:31:33 -0700
committerDana Powers <dana.powers@gmail.com>2017-10-07 23:42:41 -0700
commit645129b00f63eab6368c5e9aca137463b63c0c9d (patch)
tree4d015bb5ee31b929331db17e322fd688c84ca515
parenta537eeec64581f8f51b55b0cc68f4267155337ca (diff)
downloadkafka-python-645129b00f63eab6368c5e9aca137463b63c0c9d.tar.gz
Add tests for Fetcher.fetched_records and _handle_fetch_response
-rw-r--r--test/test_fetcher.py132
1 files changed, 118 insertions, 14 deletions
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
index 86d154f..3bf26db 100644
--- a/test/test_fetcher.py
+++ b/test/test_fetcher.py
@@ -3,14 +3,18 @@ from __future__ import absolute_import
import pytest
-import itertools
from collections import OrderedDict
+import itertools
+import time
from kafka.client_async import KafkaClient
-from kafka.consumer.fetcher import ConsumerRecord, Fetcher, NoOffsetForPartitionError
+from kafka.consumer.fetcher import (
+ CompletedFetch, ConsumerRecord, Fetcher, NoOffsetForPartitionError
+)
from kafka.consumer.subscription_state import SubscriptionState
from kafka.metrics import Metrics
-from kafka.protocol.fetch import FetchRequest
+from kafka.protocol.fetch import FetchRequest, FetchResponse
+from kafka.protocol.message import Message
from kafka.protocol.offset import OffsetResponse
from kafka.structs import TopicPartition
from kafka.future import Future
@@ -31,28 +35,33 @@ def subscription_state():
@pytest.fixture
-def fetcher(client, subscription_state):
- subscription_state.subscribe(topics=['foobar'])
- assignment = [TopicPartition('foobar', i) for i in range(3)]
+def topic():
+ return 'foobar'
+
+
+@pytest.fixture
+def fetcher(client, subscription_state, topic):
+ subscription_state.subscribe(topics=[topic])
+ assignment = [TopicPartition(topic, i) for i in range(3)]
subscription_state.assign_from_subscribed(assignment)
for tp in assignment:
subscription_state.seek(tp, 0)
return Fetcher(client, subscription_state, Metrics())
-def test_send_fetches(fetcher, mocker):
+def test_send_fetches(fetcher, topic, mocker):
fetch_requests = [
FetchRequest[0](
-1, fetcher.config['fetch_max_wait_ms'],
fetcher.config['fetch_min_bytes'],
- [('foobar', [
+ [(topic, [
(0, 0, fetcher.config['max_partition_fetch_bytes']),
(1, 0, fetcher.config['max_partition_fetch_bytes']),
])]),
FetchRequest[0](
-1, fetcher.config['fetch_max_wait_ms'],
fetcher.config['fetch_min_bytes'],
- [('foobar', [
+ [(topic, [
(2, 0, fetcher.config['max_partition_fetch_bytes']),
])])
]
@@ -80,9 +89,9 @@ def test_create_fetch_requests(fetcher, mocker, api_version, fetch_version):
assert all([isinstance(r, FetchRequest[fetch_version]) for r in requests])
-def test_update_fetch_positions(fetcher, mocker):
+def test_update_fetch_positions(fetcher, topic, mocker):
mocker.patch.object(fetcher, '_reset_offset')
- partition = TopicPartition('foobar', 0)
+ partition = TopicPartition(topic, 0)
# unassigned partition
fetcher.update_fetch_positions([TopicPartition('fizzbuzz', 0)])
@@ -296,12 +305,107 @@ def test_partition_records_offset():
None, None, 'key', 'value', 'checksum', 0, 0)
for i in range(batch_start, batch_end)]
records = Fetcher.PartitionRecords(fetch_offset, None, messages)
- assert records.has_more()
+ assert len(records) > 0
msgs = records.take(1)
assert msgs[0].offset == 123
assert records.fetch_offset == 124
msgs = records.take(2)
assert len(msgs) == 2
- assert records.has_more()
+ assert len(records) > 0
records.discard()
- assert not records.has_more()
+ assert len(records) == 0
+
+
+def test_fetched_records(fetcher, topic, mocker):
+ tp = TopicPartition(topic, 0)
+ msgs = []
+ for i in range(10):
+ msg = Message(b'foo')
+ msgs.append((i, -1, msg))
+ completed_fetch = CompletedFetch(
+ tp, 0, 0, [0, 100, msgs],
+ mocker.MagicMock()
+ )
+ fetcher._completed_fetches.append(completed_fetch)
+ fetcher.config['check_crcs'] = False
+ records, partial = fetcher.fetched_records()
+ assert tp in records
+ assert len(records[tp]) == len(msgs)
+ assert all(map(lambda x: isinstance(x, ConsumerRecord), records[tp]))
+ assert partial is False
+
+
+@pytest.mark.parametrize(("fetch_request", "fetch_response", "num_partitions"), [
+ (
+ FetchRequest[0](
+ -1, 100, 100,
+ [('foo', [(0, 0, 1000),])]),
+ FetchResponse[0](
+ [("foo", [(0, 0, 1000, [(0, Message(b'abc', magic=0)._encode_self()),])]),]),
+ 1,
+ ),
+ (
+ FetchRequest[1](
+ -1, 100, 100,
+ [('foo', [(0, 0, 1000), (1, 0, 1000),])]),
+ FetchResponse[1](
+ 0,
+ [("foo", [
+ (0, 0, 1000, [(0, Message(b'abc', magic=0)._encode_self()),]),
+ (1, 0, 1000, [(0, Message(b'abc', magic=0)._encode_self()),]),
+ ]),]),
+ 2,
+ ),
+ (
+ FetchRequest[2](
+ -1, 100, 100,
+ [('foo', [(0, 0, 1000),])]),
+ FetchResponse[2](
+ 0, [("foo", [(0, 0, 1000, [(0, Message(b'abc', magic=1)._encode_self()),])]),]),
+ 1,
+ ),
+ (
+ FetchRequest[3](
+ -1, 100, 100, 10000,
+ [('foo', [(0, 0, 1000),])]),
+ FetchResponse[3](
+ 0, [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]),
+ 1,
+ ),
+ (
+ FetchRequest[4](
+ -1, 100, 100, 10000, 0,
+ [('foo', [(0, 0, 1000),])]),
+ FetchResponse[4](
+ 0, [("foo", [(0, 0, 1000, 0, [], [(0, b'xxx'),])]),]),
+ 1,
+ ),
+ (
+ # This may only be used in broker-broker api calls
+ FetchRequest[5](
+ -1, 100, 100, 10000, 0,
+ [('foo', [(0, 0, 1000),])]),
+ FetchResponse[5](
+ 0, [("foo", [(0, 0, 1000, 0, 0, [], [(0, b'xxx'),])]),]),
+ 1,
+ ),
+])
+def test__handle_fetch_response(fetcher, fetch_request, fetch_response, num_partitions):
+ fetcher._handle_fetch_response(fetch_request, time.time(), fetch_response)
+ assert len(fetcher._completed_fetches) == num_partitions
+
+
+def test__unpack_message_set():
+ pass
+
+
+def test__parse_record():
+ pass
+
+
+def test_message_generator():
+ pass
+
+
+def test__parse_fetched_data():
+ pass