diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-10-07 23:31:33 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-10-07 23:42:41 -0700 |
commit | 645129b00f63eab6368c5e9aca137463b63c0c9d (patch) | |
tree | 4d015bb5ee31b929331db17e322fd688c84ca515 | |
parent | a537eeec64581f8f51b55b0cc68f4267155337ca (diff) | |
download | kafka-python-645129b00f63eab6368c5e9aca137463b63c0c9d.tar.gz |
Add tests for Fetcher.fetched_records and _handle_fetch_response
-rw-r--r-- | test/test_fetcher.py | 132 |
1 files changed, 118 insertions, 14 deletions
diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 86d154f..3bf26db 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -3,14 +3,18 @@ from __future__ import absolute_import import pytest -import itertools from collections import OrderedDict +import itertools +import time from kafka.client_async import KafkaClient -from kafka.consumer.fetcher import ConsumerRecord, Fetcher, NoOffsetForPartitionError +from kafka.consumer.fetcher import ( + CompletedFetch, ConsumerRecord, Fetcher, NoOffsetForPartitionError +) from kafka.consumer.subscription_state import SubscriptionState from kafka.metrics import Metrics -from kafka.protocol.fetch import FetchRequest +from kafka.protocol.fetch import FetchRequest, FetchResponse +from kafka.protocol.message import Message from kafka.protocol.offset import OffsetResponse from kafka.structs import TopicPartition from kafka.future import Future @@ -31,28 +35,33 @@ def subscription_state(): @pytest.fixture -def fetcher(client, subscription_state): - subscription_state.subscribe(topics=['foobar']) - assignment = [TopicPartition('foobar', i) for i in range(3)] +def topic(): + return 'foobar' + + +@pytest.fixture +def fetcher(client, subscription_state, topic): + subscription_state.subscribe(topics=[topic]) + assignment = [TopicPartition(topic, i) for i in range(3)] subscription_state.assign_from_subscribed(assignment) for tp in assignment: subscription_state.seek(tp, 0) return Fetcher(client, subscription_state, Metrics()) -def test_send_fetches(fetcher, mocker): +def test_send_fetches(fetcher, topic, mocker): fetch_requests = [ FetchRequest[0]( -1, fetcher.config['fetch_max_wait_ms'], fetcher.config['fetch_min_bytes'], - [('foobar', [ + [(topic, [ (0, 0, fetcher.config['max_partition_fetch_bytes']), (1, 0, fetcher.config['max_partition_fetch_bytes']), ])]), FetchRequest[0]( -1, fetcher.config['fetch_max_wait_ms'], fetcher.config['fetch_min_bytes'], - [('foobar', [ + [(topic, [ (2, 0, fetcher.config['max_partition_fetch_bytes']), ])]) ] @@ -80,9 +89,9 @@ def test_create_fetch_requests(fetcher, mocker, api_version, fetch_version): assert all([isinstance(r, FetchRequest[fetch_version]) for r in requests]) -def test_update_fetch_positions(fetcher, mocker): +def test_update_fetch_positions(fetcher, topic, mocker): mocker.patch.object(fetcher, '_reset_offset') - partition = TopicPartition('foobar', 0) + partition = TopicPartition(topic, 0) # unassigned partition fetcher.update_fetch_positions([TopicPartition('fizzbuzz', 0)]) @@ -296,12 +305,107 @@ def test_partition_records_offset(): None, None, 'key', 'value', 'checksum', 0, 0) for i in range(batch_start, batch_end)] records = Fetcher.PartitionRecords(fetch_offset, None, messages) - assert records.has_more() + assert len(records) > 0 msgs = records.take(1) assert msgs[0].offset == 123 assert records.fetch_offset == 124 msgs = records.take(2) assert len(msgs) == 2 - assert records.has_more() + assert len(records) > 0 records.discard() - assert not records.has_more() + assert len(records) == 0 + + +def test_fetched_records(fetcher, topic, mocker): + tp = TopicPartition(topic, 0) + msgs = [] + for i in range(10): + msg = Message(b'foo') + msgs.append((i, -1, msg)) + completed_fetch = CompletedFetch( + tp, 0, 0, [0, 100, msgs], + mocker.MagicMock() + ) + fetcher._completed_fetches.append(completed_fetch) + fetcher.config['check_crcs'] = False + records, partial = fetcher.fetched_records() + assert tp in records + assert len(records[tp]) == len(msgs) + assert all(map(lambda x: isinstance(x, ConsumerRecord), records[tp])) + assert partial is False + + +@pytest.mark.parametrize(("fetch_request", "fetch_response", "num_partitions"), [ + ( + FetchRequest[0]( + -1, 100, 100, + [('foo', [(0, 0, 1000),])]), + FetchResponse[0]( + [("foo", [(0, 0, 1000, [(0, Message(b'abc', magic=0)._encode_self()),])]),]), + 1, + ), + ( + FetchRequest[1]( + -1, 100, 100, + [('foo', [(0, 0, 1000), (1, 0, 1000),])]), + FetchResponse[1]( + 0, + [("foo", [ + (0, 0, 1000, [(0, Message(b'abc', magic=0)._encode_self()),]), + (1, 0, 1000, [(0, Message(b'abc', magic=0)._encode_self()),]), + ]),]), + 2, + ), + ( + FetchRequest[2]( + -1, 100, 100, + [('foo', [(0, 0, 1000),])]), + FetchResponse[2]( + 0, [("foo", [(0, 0, 1000, [(0, Message(b'abc', magic=1)._encode_self()),])]),]), + 1, + ), + ( + FetchRequest[3]( + -1, 100, 100, 10000, + [('foo', [(0, 0, 1000),])]), + FetchResponse[3]( + 0, [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]), + 1, + ), + ( + FetchRequest[4]( + -1, 100, 100, 10000, 0, + [('foo', [(0, 0, 1000),])]), + FetchResponse[4]( + 0, [("foo", [(0, 0, 1000, 0, [], [(0, b'xxx'),])]),]), + 1, + ), + ( + # This may only be used in broker-broker api calls + FetchRequest[5]( + -1, 100, 100, 10000, 0, + [('foo', [(0, 0, 1000),])]), + FetchResponse[5]( + 0, [("foo", [(0, 0, 1000, 0, 0, [], [(0, b'xxx'),])]),]), + 1, + ), +]) +def test__handle_fetch_response(fetcher, fetch_request, fetch_response, num_partitions): + fetcher._handle_fetch_response(fetch_request, time.time(), fetch_response) + assert len(fetcher._completed_fetches) == num_partitions + + +def test__unpack_message_set(): + pass + + +def test__parse_record(): + pass + + +def test_message_generator(): + pass + + +def test__parse_fetched_data(): + pass |