diff options
author | Taras <voyn1991@gmail.com> | 2017-10-11 20:02:18 +0300 |
---|---|---|
committer | Taras <voyn1991@gmail.com> | 2017-10-12 11:10:44 +0300 |
commit | e992fbfad926486766ff7b63a499f9cf29984fec (patch) | |
tree | 5d0ede2b60c84d06cecc9e9c2a1ee914d64f4bef | |
parent | 0557983b2ae05adc2f1076d5e670d693c8327ab9 (diff) | |
download | kafka-python-e992fbfad926486766ff7b63a499f9cf29984fec.tar.gz |
Fix tests and rebase problems
-rw-r--r-- | kafka/consumer/fetcher.py | 3 | ||||
-rw-r--r-- | test/test_fetcher.py | 122 |
2 files changed, 28 insertions, 97 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 493c1ff..dd90c2e 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -728,7 +728,6 @@ class Fetcher(six.Iterator): def _parse_fetched_data(self, completed_fetch): tp = completed_fetch.topic_partition - partition = completed_fetch.partition_data fetch_offset = completed_fetch.fetched_offset num_bytes = 0 records_count = 0 @@ -736,7 +735,6 @@ class Fetcher(six.Iterator): error_code, highwater = completed_fetch.partition_data[:2] error_type = Errors.for_code(error_code) - records = MemoryRecords(partition_data[-1]) try: if not self._subscriptions.is_fetchable(tp): @@ -760,6 +758,7 @@ class Fetcher(six.Iterator): position) return None + records = MemoryRecords(completed_fetch.partition_data[-1]) if records.has_next(): log.debug("Adding fetched record for partition %s with" " offset %d to buffered record list", tp, diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 5da597c..364a808 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -8,22 +8,20 @@ import itertools import time from kafka.client_async import KafkaClient -from kafka.codec import gzip_encode from kafka.consumer.fetcher import ( CompletedFetch, ConsumerRecord, Fetcher, NoOffsetForPartitionError ) from kafka.consumer.subscription_state import SubscriptionState from kafka.metrics import Metrics from kafka.protocol.fetch import FetchRequest, FetchResponse -from kafka.protocol.message import Message from kafka.protocol.offset import OffsetResponse -from kafka.protocol.types import Int64, Int32 from kafka.structs import TopicPartition from kafka.future import Future from kafka.errors import ( StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError, UnknownTopicOrPartitionError, OffsetOutOfRangeError ) +from kafka.record.memory_records import MemoryRecordsBuilder, MemoryRecords @pytest.fixture @@ -51,6 +49,16 @@ def fetcher(client, subscription_state, topic): return Fetcher(client, subscription_state, Metrics()) +def _build_record_batch(msgs, compression=0): + builder = MemoryRecordsBuilder( + magic=1, compression_type=0, batch_size=9999999) + for msg in msgs: + key, value, timestamp = msg + builder.append(key=key, value=value, timestamp=timestamp) + builder.close() + return builder.buffer() + + def test_send_fetches(fetcher, topic, mocker): fetch_requests = [ FetchRequest[0]( @@ -321,12 +329,12 @@ def test_partition_records_offset(): def test_fetched_records(fetcher, topic, mocker): fetcher.config['check_crcs'] = False tp = TopicPartition(topic, 0) + msgs = [] for i in range(10): - msg = Message(b'foo') - msgs.append((i, -1, msg)) + msgs.append((None, b"foo", None)) completed_fetch = CompletedFetch( - tp, 0, 0, [0, 100, msgs], + tp, 0, 0, [0, 100, _build_record_batch(msgs)], mocker.MagicMock() ) fetcher._completed_fetches.append(completed_fetch) @@ -401,11 +409,12 @@ def test__unpack_message_set(fetcher): fetcher.config['check_crcs'] = False tp = TopicPartition('foo', 0) messages = [ - (0, None, Message(b'a')), - (1, None, Message(b'b')), - (2, None, Message(b'c')) + (None, b"a", None), + (None, b"b", None), + (None, b"c", None), ] - records = list(fetcher._unpack_message_set(tp, messages)) + memory_records = MemoryRecords(_build_record_batch(messages)) + records = list(fetcher._unpack_message_set(tp, memory_records)) assert len(records) == 3 assert all(map(lambda x: isinstance(x, ConsumerRecord), records)) assert records[0].value == b'a' @@ -416,88 +425,14 @@ def test__unpack_message_set(fetcher): assert records[2].offset == 2 -def test__unpack_message_set_compressed_v0(fetcher): - fetcher.config['check_crcs'] = False - tp = TopicPartition('foo', 0) - messages = [ - (0, None, Message(b'a')), - (1, None, Message(b'b')), - (2, None, Message(b'c')), - ] - message_bytes = [] - for offset, _, m in messages: - encoded = m.encode() - message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded) - compressed_bytes = gzip_encode(b''.join(message_bytes)) - compressed_base_offset = 0 - compressed_msgs = [ - (compressed_base_offset, None, - Message(compressed_bytes, - magic=0, - attributes=Message.CODEC_GZIP)) - ] - records = list(fetcher._unpack_message_set(tp, compressed_msgs)) - assert len(records) == 3 - assert all(map(lambda x: isinstance(x, ConsumerRecord), records)) - assert records[0].value == b'a' - assert records[1].value == b'b' - assert records[2].value == b'c' - assert records[0].offset == 0 - assert records[1].offset == 1 - assert records[2].offset == 2 - - -def test__unpack_message_set_compressed_v1(fetcher): - fetcher.config['check_crcs'] = False - tp = TopicPartition('foo', 0) - messages = [ - (0, None, Message(b'a')), - (1, None, Message(b'b')), - (2, None, Message(b'c')), - ] - message_bytes = [] - for offset, _, m in messages: - encoded = m.encode() - message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded) - compressed_bytes = gzip_encode(b''.join(message_bytes)) - compressed_base_offset = 10 - compressed_msgs = [ - (compressed_base_offset, None, - Message(compressed_bytes, - magic=1, - attributes=Message.CODEC_GZIP)) - ] - records = list(fetcher._unpack_message_set(tp, compressed_msgs)) - assert len(records) == 3 - assert all(map(lambda x: isinstance(x, ConsumerRecord), records)) - assert records[0].value == b'a' - assert records[1].value == b'b' - assert records[2].value == b'c' - assert records[0].offset == 8 - assert records[1].offset == 9 - assert records[2].offset == 10 - - -def test__parse_record(fetcher): - tp = TopicPartition('foo', 0) - record = fetcher._parse_record(tp, 123, 456, Message(b'abc')) - assert record.topic == 'foo' - assert record.partition == 0 - assert record.offset == 123 - assert record.timestamp == 456 - assert record.value == b'abc' - assert record.key is None - - def test__message_generator(fetcher, topic, mocker): fetcher.config['check_crcs'] = False tp = TopicPartition(topic, 0) msgs = [] for i in range(10): - msg = Message(b'foo') - msgs.append((i, -1, msg)) + msgs.append((None, b"foo", None)) completed_fetch = CompletedFetch( - tp, 0, 0, [0, 100, msgs], + tp, 0, 0, [0, 100, _build_record_batch(msgs)], mocker.MagicMock() ) fetcher._completed_fetches.append(completed_fetch) @@ -513,10 +448,9 @@ def test__parse_fetched_data(fetcher, topic, mocker): tp = TopicPartition(topic, 0) msgs = [] for i in range(10): - msg = Message(b'foo') - msgs.append((i, -1, msg)) + msgs.append((None, b"foo", None)) completed_fetch = CompletedFetch( - tp, 0, 0, [0, 100, msgs], + tp, 0, 0, [0, 100, _build_record_batch(msgs)], mocker.MagicMock() ) partition_record = fetcher._parse_fetched_data(completed_fetch) @@ -529,10 +463,9 @@ def test__parse_fetched_data__paused(fetcher, topic, mocker): tp = TopicPartition(topic, 0) msgs = [] for i in range(10): - msg = Message(b'foo') - msgs.append((i, -1, msg)) + msgs.append((None, b"foo", None)) completed_fetch = CompletedFetch( - tp, 0, 0, [0, 100, msgs], + tp, 0, 0, [0, 100, _build_record_batch(msgs)], mocker.MagicMock() ) fetcher._subscriptions.pause(tp) @@ -545,10 +478,9 @@ def test__parse_fetched_data__stale_offset(fetcher, topic, mocker): tp = TopicPartition(topic, 0) msgs = [] for i in range(10): - msg = Message(b'foo') - msgs.append((i, -1, msg)) + msgs.append((None, b"foo", None)) completed_fetch = CompletedFetch( - tp, 10, 0, [0, 100, msgs], + tp, 10, 0, [0, 100, _build_record_batch(msgs)], mocker.MagicMock() ) partition_record = fetcher._parse_fetched_data(completed_fetch) |