diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-10-08 10:50:06 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-10-08 10:50:06 -0700 |
commit | bc573e3d63a687903a9be2e1b3da2f943a7208e1 (patch) | |
tree | bea57a0f395112acab103392988f26c62510c2f4 | |
parent | a7d8ae5411cc74b119ca91a8ff84ddc68cd47c93 (diff) | |
download | kafka-python-bc573e3d63a687903a9be2e1b3da2f943a7208e1.tar.gz |
More testsKAFKA_3977_defer_fetch_parsing
-rw-r--r-- | kafka/consumer/fetcher.py | 5 | ||||
-rw-r--r-- | test/test_fetcher.py | 210 |
2 files changed, 200 insertions, 15 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index d059a10..c4fa546 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -400,6 +400,11 @@ class Fetcher(six.Iterator): tp = self._next_partition_records.topic_partition + # We can ignore any prior signal to drop pending message sets + # because we are starting from a fresh one where fetch_offset == position + # i.e., the user seek()'d to this position + self._subscriptions.assignment[tp].drop_pending_message_set = False + for msg in self._next_partition_records.take(): # Because we are in a generator, it is possible for diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 3bf26db..5da597c 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -8,6 +8,7 @@ import itertools import time from kafka.client_async import KafkaClient +from kafka.codec import gzip_encode from kafka.consumer.fetcher import ( CompletedFetch, ConsumerRecord, Fetcher, NoOffsetForPartitionError ) @@ -16,11 +17,12 @@ from kafka.metrics import Metrics from kafka.protocol.fetch import FetchRequest, FetchResponse from kafka.protocol.message import Message from kafka.protocol.offset import OffsetResponse +from kafka.protocol.types import Int64, Int32 from kafka.structs import TopicPartition from kafka.future import Future from kafka.errors import ( StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError, - UnknownTopicOrPartitionError + UnknownTopicOrPartitionError, OffsetOutOfRangeError ) @@ -294,7 +296,7 @@ def test__handle_offset_response(fetcher, mocker): def test_partition_records_offset(): - """Test that compressed messagesets are handle correctly + """Test that compressed messagesets are handled correctly when fetch offset is in the middle of the message list """ batch_start = 120 @@ -317,6 +319,7 @@ def test_partition_records_offset(): def test_fetched_records(fetcher, topic, mocker): + fetcher.config['check_crcs'] = False tp = TopicPartition(topic, 0) msgs = [] for i in range(10): @@ -327,7 +330,6 @@ def test_fetched_records(fetcher, topic, mocker): mocker.MagicMock() ) fetcher._completed_fetches.append(completed_fetch) - fetcher.config['check_crcs'] = False records, partial = fetcher.fetched_records() assert tp in records assert len(records[tp]) == len(msgs) @@ -341,7 +343,7 @@ def test_fetched_records(fetcher, topic, mocker): -1, 100, 100, [('foo', [(0, 0, 1000),])]), FetchResponse[0]( - [("foo", [(0, 0, 1000, [(0, Message(b'abc', magic=0)._encode_self()),])]),]), + [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]), 1, ), ( @@ -351,8 +353,8 @@ def test_fetched_records(fetcher, topic, mocker): FetchResponse[1]( 0, [("foo", [ - (0, 0, 1000, [(0, Message(b'abc', magic=0)._encode_self()),]), - (1, 0, 1000, [(0, Message(b'abc', magic=0)._encode_self()),]), + (0, 0, 1000, [(0, b'xxx'),]), + (1, 0, 1000, [(0, b'xxx'),]), ]),]), 2, ), @@ -361,7 +363,7 @@ def test_fetched_records(fetcher, topic, mocker): -1, 100, 100, [('foo', [(0, 0, 1000),])]), FetchResponse[2]( - 0, [("foo", [(0, 0, 1000, [(0, Message(b'abc', magic=1)._encode_self()),])]),]), + 0, [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]), 1, ), ( @@ -395,17 +397,195 @@ def test__handle_fetch_response(fetcher, fetch_request, fetch_response, num_part assert len(fetcher._completed_fetches) == num_partitions -def test__unpack_message_set(): - pass +def test__unpack_message_set(fetcher): + fetcher.config['check_crcs'] = False + tp = TopicPartition('foo', 0) + messages = [ + (0, None, Message(b'a')), + (1, None, Message(b'b')), + (2, None, Message(b'c')) + ] + records = list(fetcher._unpack_message_set(tp, messages)) + assert len(records) == 3 + assert all(map(lambda x: isinstance(x, ConsumerRecord), records)) + assert records[0].value == b'a' + assert records[1].value == b'b' + assert records[2].value == b'c' + assert records[0].offset == 0 + assert records[1].offset == 1 + assert records[2].offset == 2 + + +def test__unpack_message_set_compressed_v0(fetcher): + fetcher.config['check_crcs'] = False + tp = TopicPartition('foo', 0) + messages = [ + (0, None, Message(b'a')), + (1, None, Message(b'b')), + (2, None, Message(b'c')), + ] + message_bytes = [] + for offset, _, m in messages: + encoded = m.encode() + message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded) + compressed_bytes = gzip_encode(b''.join(message_bytes)) + compressed_base_offset = 0 + compressed_msgs = [ + (compressed_base_offset, None, + Message(compressed_bytes, + magic=0, + attributes=Message.CODEC_GZIP)) + ] + records = list(fetcher._unpack_message_set(tp, compressed_msgs)) + assert len(records) == 3 + assert all(map(lambda x: isinstance(x, ConsumerRecord), records)) + assert records[0].value == b'a' + assert records[1].value == b'b' + assert records[2].value == b'c' + assert records[0].offset == 0 + assert records[1].offset == 1 + assert records[2].offset == 2 + + +def test__unpack_message_set_compressed_v1(fetcher): + fetcher.config['check_crcs'] = False + tp = TopicPartition('foo', 0) + messages = [ + (0, None, Message(b'a')), + (1, None, Message(b'b')), + (2, None, Message(b'c')), + ] + message_bytes = [] + for offset, _, m in messages: + encoded = m.encode() + message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded) + compressed_bytes = gzip_encode(b''.join(message_bytes)) + compressed_base_offset = 10 + compressed_msgs = [ + (compressed_base_offset, None, + Message(compressed_bytes, + magic=1, + attributes=Message.CODEC_GZIP)) + ] + records = list(fetcher._unpack_message_set(tp, compressed_msgs)) + assert len(records) == 3 + assert all(map(lambda x: isinstance(x, ConsumerRecord), records)) + assert records[0].value == b'a' + assert records[1].value == b'b' + assert records[2].value == b'c' + assert records[0].offset == 8 + assert records[1].offset == 9 + assert records[2].offset == 10 + + +def test__parse_record(fetcher): + tp = TopicPartition('foo', 0) + record = fetcher._parse_record(tp, 123, 456, Message(b'abc')) + assert record.topic == 'foo' + assert record.partition == 0 + assert record.offset == 123 + assert record.timestamp == 456 + assert record.value == b'abc' + assert record.key is None -def test__parse_record(): - pass +def test__message_generator(fetcher, topic, mocker): + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + msgs = [] + for i in range(10): + msg = Message(b'foo') + msgs.append((i, -1, msg)) + completed_fetch = CompletedFetch( + tp, 0, 0, [0, 100, msgs], + mocker.MagicMock() + ) + fetcher._completed_fetches.append(completed_fetch) + for i in range(10): + msg = next(fetcher) + assert isinstance(msg, ConsumerRecord) + assert msg.offset == i + assert msg.value == b'foo' -def test_message_generator(): - pass +def test__parse_fetched_data(fetcher, topic, mocker): + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + msgs = [] + for i in range(10): + msg = Message(b'foo') + msgs.append((i, -1, msg)) + completed_fetch = CompletedFetch( + tp, 0, 0, [0, 100, msgs], + mocker.MagicMock() + ) + partition_record = fetcher._parse_fetched_data(completed_fetch) + assert isinstance(partition_record, fetcher.PartitionRecords) + assert len(partition_record) == 10 -def test__parse_fetched_data(): - pass +def test__parse_fetched_data__paused(fetcher, topic, mocker): + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + msgs = [] + for i in range(10): + msg = Message(b'foo') + msgs.append((i, -1, msg)) + completed_fetch = CompletedFetch( + tp, 0, 0, [0, 100, msgs], + mocker.MagicMock() + ) + fetcher._subscriptions.pause(tp) + partition_record = fetcher._parse_fetched_data(completed_fetch) + assert partition_record is None + + +def test__parse_fetched_data__stale_offset(fetcher, topic, mocker): + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + msgs = [] + for i in range(10): + msg = Message(b'foo') + msgs.append((i, -1, msg)) + completed_fetch = CompletedFetch( + tp, 10, 0, [0, 100, msgs], + mocker.MagicMock() + ) + partition_record = fetcher._parse_fetched_data(completed_fetch) + assert partition_record is None + + +def test__parse_fetched_data__not_leader(fetcher, topic, mocker): + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + completed_fetch = CompletedFetch( + tp, 0, 0, [NotLeaderForPartitionError.errno, -1, None], + mocker.MagicMock() + ) + partition_record = fetcher._parse_fetched_data(completed_fetch) + assert partition_record is None + fetcher._client.cluster.request_update.assert_called_with() + + +def test__parse_fetched_data__unknown_tp(fetcher, topic, mocker): + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + completed_fetch = CompletedFetch( + tp, 0, 0, [UnknownTopicOrPartitionError.errno, -1, None], + mocker.MagicMock() + ) + partition_record = fetcher._parse_fetched_data(completed_fetch) + assert partition_record is None + fetcher._client.cluster.request_update.assert_called_with() + + +def test__parse_fetched_data__out_of_range(fetcher, topic, mocker): + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + completed_fetch = CompletedFetch( + tp, 0, 0, [OffsetOutOfRangeError.errno, -1, None], + mocker.MagicMock() + ) + partition_record = fetcher._parse_fetched_data(completed_fetch) + assert partition_record is None + assert fetcher._subscriptions.assignment[tp].awaiting_reset is True |