diff options
author | Taras Voinarovskyi <voyn1991@gmail.com> | 2017-10-14 23:06:27 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-14 23:06:27 +0300 |
commit | fbbd6ca5d999a8520d483ecfe0ad6f805eb8833f (patch) | |
tree | 52e5860b1f8738b15e7c757c205961b761badd2b /test | |
parent | dd8e33654f2270097d6c1373dc272153670e48f8 (diff) | |
parent | 365cae02da59721df77923bb5f5a2d94a84b2e83 (diff) | |
download | kafka-python-fbbd6ca5d999a8520d483ecfe0ad6f805eb8833f.tar.gz |
Merge pull request #1252 from dpkp/legacy_records_refactor
Refactor MessageSet and Message into LegacyRecordBatch
Diffstat (limited to 'test')
-rw-r--r-- | test/record/test_legacy_records.py | 85 | ||||
-rw-r--r-- | test/record/test_records.py | 108 | ||||
-rw-r--r-- | test/test_buffer.py | 72 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 6 | ||||
-rw-r--r-- | test/test_fetcher.py | 122 | ||||
-rw-r--r-- | test/test_protocol.py | 5 | ||||
-rw-r--r-- | test/test_sender.py | 18 |
7 files changed, 236 insertions, 180 deletions
diff --git a/test/record/test_legacy_records.py b/test/record/test_legacy_records.py new file mode 100644 index 0000000..2d76695 --- /dev/null +++ b/test/record/test_legacy_records.py @@ -0,0 +1,85 @@ +import pytest +from kafka.record.legacy_records import ( + LegacyRecordBatch, LegacyRecordBatchBuilder +) +from kafka.protocol.message import Message + + +@pytest.mark.parametrize("magic", [0, 1]) +def test_read_write_serde_v0_v1_no_compression(magic): + builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=0, batch_size=9999999) + builder.append( + 0, timestamp=9999999, key=b"test", value=b"Super") + buffer = builder.build() + + batch = LegacyRecordBatch(bytes(buffer), magic) + msgs = list(batch) + assert len(msgs) == 1 + msg = msgs[0] + + assert msg.offset == 0 + assert msg.timestamp == (9999999 if magic else None) + assert msg.timestamp_type == (0 if magic else None) + assert msg.key == b"test" + assert msg.value == b"Super" + assert msg.checksum == (-2095076219 if magic else 278251978) & 0xffffffff + + +@pytest.mark.parametrize("compression_type", [ + Message.CODEC_GZIP, + Message.CODEC_SNAPPY, + Message.CODEC_LZ4 +]) +@pytest.mark.parametrize("magic", [0, 1]) +def test_read_write_serde_v0_v1_with_compression(compression_type, magic): + builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=compression_type, batch_size=9999999) + for offset in range(10): + builder.append( + offset, timestamp=9999999, key=b"test", value=b"Super") + buffer = builder.build() + + batch = LegacyRecordBatch(bytes(buffer), magic) + msgs = list(batch) + + expected_checksum = (-2095076219 if magic else 278251978) & 0xffffffff + for offset, msg in enumerate(msgs): + assert msg.offset == offset + assert msg.timestamp == (9999999 if magic else None) + assert msg.timestamp_type == (0 if magic else None) + assert msg.key == b"test" + assert msg.value == b"Super" + assert msg.checksum == expected_checksum + + +@pytest.mark.parametrize("magic", [0, 1]) +def test_written_bytes_equals_size_in_bytes(magic): + key = b"test" + value = b"Super" + builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=0, batch_size=9999999) + + size_in_bytes = builder.size_in_bytes( + 0, timestamp=9999999, key=key, value=value) + + pos = builder.size() + builder.append(0, timestamp=9999999, key=key, value=value) + + assert builder.size() - pos == size_in_bytes + + +@pytest.mark.parametrize("magic", [0, 1]) +def test_estimate_size_in_bytes_bigger_than_batch(magic): + key = b"Super Key" + value = b"1" * 100 + estimate_size = LegacyRecordBatchBuilder.estimate_size_in_bytes( + magic, compression_type=0, key=key, value=value) + + builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=0, batch_size=9999999) + builder.append( + 0, timestamp=9999999, key=key, value=value) + buf = builder.build() + assert len(buf) <= estimate_size, \ + "Estimate should always be upper bound" diff --git a/test/record/test_records.py b/test/record/test_records.py new file mode 100644 index 0000000..fc3eaca --- /dev/null +++ b/test/record/test_records.py @@ -0,0 +1,108 @@ +import pytest +from kafka.record import MemoryRecords +from kafka.errors import CorruptRecordException + +record_batch_data_v1 = [ + # First Message value == "123" + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19G\x86(\xc2\x01\x00\x00' + b'\x00\x01^\x18g\xab\xae\xff\xff\xff\xff\x00\x00\x00\x03123', + # Second Message value == "" + b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x16\xef\x98\xc9 \x01\x00' + b'\x00\x00\x01^\x18g\xaf\xc0\xff\xff\xff\xff\x00\x00\x00\x00', + # Third Message value == "" + b'\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x16_\xaf\xfb^\x01\x00\x00' + b'\x00\x01^\x18g\xb0r\xff\xff\xff\xff\x00\x00\x00\x00', + # Fourth Message value = "123" + b'\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x19\xa8\x12W \x01\x00\x00' + b'\x00\x01^\x18g\xb8\x03\xff\xff\xff\xff\x00\x00\x00\x03123' +] + +# This is real live data from Kafka 10 broker +record_batch_data_v0 = [ + # First Message value == "123" + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x11\xfe\xb0\x1d\xbf\x00' + b'\x00\xff\xff\xff\xff\x00\x00\x00\x03123', + # Second Message value == "" + b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x0eyWH\xe0\x00\x00\xff' + b'\xff\xff\xff\x00\x00\x00\x00', + # Third Message value == "" + b'\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x0eyWH\xe0\x00\x00\xff' + b'\xff\xff\xff\x00\x00\x00\x00', + # Fourth Message value = "123" + b'\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x11\xfe\xb0\x1d\xbf\x00' + b'\x00\xff\xff\xff\xff\x00\x00\x00\x03123' +] + + +def test_memory_records_v1(): + data_bytes = b"".join(record_batch_data_v1) + b"\x00" * 4 + records = MemoryRecords(data_bytes) + + assert records.size_in_bytes() == 146 + assert records.valid_bytes() == 142 + + assert records.has_next() is True + batch = records.next_batch() + recs = list(batch) + assert len(recs) == 1 + assert recs[0].value == b"123" + assert recs[0].key is None + assert recs[0].timestamp == 1503648000942 + assert recs[0].timestamp_type == 0 + assert recs[0].checksum == 1199974594 & 0xffffffff + + assert records.next_batch() is not None + assert records.next_batch() is not None + assert records.next_batch() is not None + + assert records.has_next() is False + assert records.next_batch() is None + assert records.next_batch() is None + + +def test_memory_records_v0(): + data_bytes = b"".join(record_batch_data_v0) + records = MemoryRecords(data_bytes + b"\x00" * 4) + + assert records.size_in_bytes() == 114 + assert records.valid_bytes() == 110 + + records = MemoryRecords(data_bytes) + + assert records.has_next() is True + batch = records.next_batch() + recs = list(batch) + assert len(recs) == 1 + assert recs[0].value == b"123" + assert recs[0].key is None + assert recs[0].timestamp is None + assert recs[0].timestamp_type is None + assert recs[0].checksum == -22012481 & 0xffffffff + + assert records.next_batch() is not None + assert records.next_batch() is not None + assert records.next_batch() is not None + + assert records.has_next() is False + assert records.next_batch() is None + assert records.next_batch() is None + + +def test_memory_records_corrupt(): + records = MemoryRecords(b"") + assert records.size_in_bytes() == 0 + assert records.valid_bytes() == 0 + assert records.has_next() is False + + records = MemoryRecords(b"\x00\x00\x00") + assert records.size_in_bytes() == 3 + assert records.valid_bytes() == 0 + assert records.has_next() is False + + records = MemoryRecords( + b"\x00\x00\x00\x00\x00\x00\x00\x03" # Offset=3 + b"\x00\x00\x00\x03" # Length=3 + b"\xfe\xb0\x1d", # Some random bytes + ) + with pytest.raises(CorruptRecordException): + records.next_batch() diff --git a/test/test_buffer.py b/test/test_buffer.py deleted file mode 100644 index db6cbb3..0000000 --- a/test/test_buffer.py +++ /dev/null @@ -1,72 +0,0 @@ -# pylint: skip-file -from __future__ import absolute_import - -import io -import platform - -import pytest - -from kafka.producer.buffer import MessageSetBuffer -from kafka.protocol.message import Message, MessageSet - - -def test_buffer_close(): - records = MessageSetBuffer(io.BytesIO(), 100000) - orig_msg = Message(b'foobar') - records.append(1234, orig_msg) - records.close() - - msgset = MessageSet.decode(records.buffer()) - assert len(msgset) == 1 - (offset, size, msg) = msgset[0] - assert offset == 1234 - assert msg == orig_msg - - # Closing again should work fine - records.close() - - msgset = MessageSet.decode(records.buffer()) - assert len(msgset) == 1 - (offset, size, msg) = msgset[0] - assert offset == 1234 - assert msg == orig_msg - - -@pytest.mark.parametrize('compression', [ - 'gzip', - 'snappy', - pytest.mark.skipif(platform.python_implementation() == 'PyPy', - reason='python-lz4 crashes on older versions of pypy')('lz4'), -]) -def test_compressed_buffer_close(compression): - records = MessageSetBuffer(io.BytesIO(), 100000, compression_type=compression) - orig_msg = Message(b'foobar') - records.append(1234, orig_msg) - records.close() - - msgset = MessageSet.decode(records.buffer()) - assert len(msgset) == 1 - (offset, size, msg) = msgset[0] - assert offset == 0 - assert msg.is_compressed() - - msgset = msg.decompress() - (offset, size, msg) = msgset[0] - assert not msg.is_compressed() - assert offset == 1234 - assert msg == orig_msg - - # Closing again should work fine - records.close() - - msgset = MessageSet.decode(records.buffer()) - assert len(msgset) == 1 - (offset, size, msg) = msgset[0] - assert offset == 0 - assert msg.is_compressed() - - msgset = msg.decompress() - (offset, size, msg) = msgset[0] - assert not msg.is_compressed() - assert offset == 1234 - assert msg == orig_msg diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 17e7401..d1843b3 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -26,6 +26,8 @@ from test.testutil import ( class TestConsumerIntegration(KafkaIntegrationTestCase): + maxDiff = None + @classmethod def setUpClass(cls): if not os.environ.get('KAFKA_VERSION'): @@ -648,10 +650,10 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): kafka_producer = self.kafka_producer() early_msg = kafka_producer.send( self.topic, partition=0, value=b"first", - timestamp_ms=early_time).get() + timestamp_ms=early_time).get(1) late_msg = kafka_producer.send( self.topic, partition=0, value=b"last", - timestamp_ms=late_time).get() + timestamp_ms=late_time).get(1) consumer = self.kafka_consumer() offsets = consumer.offsets_for_times({tp: early_time}) diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 5da597c..364a808 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -8,22 +8,20 @@ import itertools import time from kafka.client_async import KafkaClient -from kafka.codec import gzip_encode from kafka.consumer.fetcher import ( CompletedFetch, ConsumerRecord, Fetcher, NoOffsetForPartitionError ) from kafka.consumer.subscription_state import SubscriptionState from kafka.metrics import Metrics from kafka.protocol.fetch import FetchRequest, FetchResponse -from kafka.protocol.message import Message from kafka.protocol.offset import OffsetResponse -from kafka.protocol.types import Int64, Int32 from kafka.structs import TopicPartition from kafka.future import Future from kafka.errors import ( StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError, UnknownTopicOrPartitionError, OffsetOutOfRangeError ) +from kafka.record.memory_records import MemoryRecordsBuilder, MemoryRecords @pytest.fixture @@ -51,6 +49,16 @@ def fetcher(client, subscription_state, topic): return Fetcher(client, subscription_state, Metrics()) +def _build_record_batch(msgs, compression=0): + builder = MemoryRecordsBuilder( + magic=1, compression_type=0, batch_size=9999999) + for msg in msgs: + key, value, timestamp = msg + builder.append(key=key, value=value, timestamp=timestamp) + builder.close() + return builder.buffer() + + def test_send_fetches(fetcher, topic, mocker): fetch_requests = [ FetchRequest[0]( @@ -321,12 +329,12 @@ def test_partition_records_offset(): def test_fetched_records(fetcher, topic, mocker): fetcher.config['check_crcs'] = False tp = TopicPartition(topic, 0) + msgs = [] for i in range(10): - msg = Message(b'foo') - msgs.append((i, -1, msg)) + msgs.append((None, b"foo", None)) completed_fetch = CompletedFetch( - tp, 0, 0, [0, 100, msgs], + tp, 0, 0, [0, 100, _build_record_batch(msgs)], mocker.MagicMock() ) fetcher._completed_fetches.append(completed_fetch) @@ -401,11 +409,12 @@ def test__unpack_message_set(fetcher): fetcher.config['check_crcs'] = False tp = TopicPartition('foo', 0) messages = [ - (0, None, Message(b'a')), - (1, None, Message(b'b')), - (2, None, Message(b'c')) + (None, b"a", None), + (None, b"b", None), + (None, b"c", None), ] - records = list(fetcher._unpack_message_set(tp, messages)) + memory_records = MemoryRecords(_build_record_batch(messages)) + records = list(fetcher._unpack_message_set(tp, memory_records)) assert len(records) == 3 assert all(map(lambda x: isinstance(x, ConsumerRecord), records)) assert records[0].value == b'a' @@ -416,88 +425,14 @@ def test__unpack_message_set(fetcher): assert records[2].offset == 2 -def test__unpack_message_set_compressed_v0(fetcher): - fetcher.config['check_crcs'] = False - tp = TopicPartition('foo', 0) - messages = [ - (0, None, Message(b'a')), - (1, None, Message(b'b')), - (2, None, Message(b'c')), - ] - message_bytes = [] - for offset, _, m in messages: - encoded = m.encode() - message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded) - compressed_bytes = gzip_encode(b''.join(message_bytes)) - compressed_base_offset = 0 - compressed_msgs = [ - (compressed_base_offset, None, - Message(compressed_bytes, - magic=0, - attributes=Message.CODEC_GZIP)) - ] - records = list(fetcher._unpack_message_set(tp, compressed_msgs)) - assert len(records) == 3 - assert all(map(lambda x: isinstance(x, ConsumerRecord), records)) - assert records[0].value == b'a' - assert records[1].value == b'b' - assert records[2].value == b'c' - assert records[0].offset == 0 - assert records[1].offset == 1 - assert records[2].offset == 2 - - -def test__unpack_message_set_compressed_v1(fetcher): - fetcher.config['check_crcs'] = False - tp = TopicPartition('foo', 0) - messages = [ - (0, None, Message(b'a')), - (1, None, Message(b'b')), - (2, None, Message(b'c')), - ] - message_bytes = [] - for offset, _, m in messages: - encoded = m.encode() - message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded) - compressed_bytes = gzip_encode(b''.join(message_bytes)) - compressed_base_offset = 10 - compressed_msgs = [ - (compressed_base_offset, None, - Message(compressed_bytes, - magic=1, - attributes=Message.CODEC_GZIP)) - ] - records = list(fetcher._unpack_message_set(tp, compressed_msgs)) - assert len(records) == 3 - assert all(map(lambda x: isinstance(x, ConsumerRecord), records)) - assert records[0].value == b'a' - assert records[1].value == b'b' - assert records[2].value == b'c' - assert records[0].offset == 8 - assert records[1].offset == 9 - assert records[2].offset == 10 - - -def test__parse_record(fetcher): - tp = TopicPartition('foo', 0) - record = fetcher._parse_record(tp, 123, 456, Message(b'abc')) - assert record.topic == 'foo' - assert record.partition == 0 - assert record.offset == 123 - assert record.timestamp == 456 - assert record.value == b'abc' - assert record.key is None - - def test__message_generator(fetcher, topic, mocker): fetcher.config['check_crcs'] = False tp = TopicPartition(topic, 0) msgs = [] for i in range(10): - msg = Message(b'foo') - msgs.append((i, -1, msg)) + msgs.append((None, b"foo", None)) completed_fetch = CompletedFetch( - tp, 0, 0, [0, 100, msgs], + tp, 0, 0, [0, 100, _build_record_batch(msgs)], mocker.MagicMock() ) fetcher._completed_fetches.append(completed_fetch) @@ -513,10 +448,9 @@ def test__parse_fetched_data(fetcher, topic, mocker): tp = TopicPartition(topic, 0) msgs = [] for i in range(10): - msg = Message(b'foo') - msgs.append((i, -1, msg)) + msgs.append((None, b"foo", None)) completed_fetch = CompletedFetch( - tp, 0, 0, [0, 100, msgs], + tp, 0, 0, [0, 100, _build_record_batch(msgs)], mocker.MagicMock() ) partition_record = fetcher._parse_fetched_data(completed_fetch) @@ -529,10 +463,9 @@ def test__parse_fetched_data__paused(fetcher, topic, mocker): tp = TopicPartition(topic, 0) msgs = [] for i in range(10): - msg = Message(b'foo') - msgs.append((i, -1, msg)) + msgs.append((None, b"foo", None)) completed_fetch = CompletedFetch( - tp, 0, 0, [0, 100, msgs], + tp, 0, 0, [0, 100, _build_record_batch(msgs)], mocker.MagicMock() ) fetcher._subscriptions.pause(tp) @@ -545,10 +478,9 @@ def test__parse_fetched_data__stale_offset(fetcher, topic, mocker): tp = TopicPartition(topic, 0) msgs = [] for i in range(10): - msg = Message(b'foo') - msgs.append((i, -1, msg)) + msgs.append((None, b"foo", None)) completed_fetch = CompletedFetch( - tp, 10, 0, [0, 100, msgs], + tp, 10, 0, [0, 100, _build_record_batch(msgs)], mocker.MagicMock() ) partition_record = fetcher._parse_fetched_data(completed_fetch) diff --git a/test/test_protocol.py b/test/test_protocol.py index 0203614..d963650 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -260,13 +260,14 @@ def test_decode_fetch_response_partial(): struct.pack('>i', 8), # Length of value b'ar', # Value (truncated) ]) - resp = FetchResponse[0].decode(io.BytesIO(encoded)) assert len(resp.topics) == 1 topic, partitions = resp.topics[0] assert topic == 'foobar' assert len(partitions) == 2 - m1 = partitions[0][3] + + m1 = MessageSet.decode( + partitions[0][3], bytes_to_read=len(partitions[0][3])) assert len(m1) == 2 assert m1[1] == (None, None, PartialMessage()) diff --git a/test/test_sender.py b/test/test_sender.py index f37e194..2a68def 100644 --- a/test/test_sender.py +++ b/test/test_sender.py @@ -1,20 +1,17 @@ # pylint: skip-file from __future__ import absolute_import -import io - import pytest +import io from kafka.client_async import KafkaClient from kafka.cluster import ClusterMetadata -import kafka.errors as Errors -from kafka.future import Future from kafka.metrics import Metrics -from kafka.producer.buffer import MessageSetBuffer from kafka.protocol.produce import ProduceRequest -from kafka.producer.record_accumulator import RecordAccumulator, RecordBatch +from kafka.producer.record_accumulator import RecordAccumulator, ProducerBatch from kafka.producer.sender import Sender -from kafka.structs import TopicPartition, OffsetAndMetadata +from kafka.record.memory_records import MemoryRecordsBuilder +from kafka.structs import TopicPartition @pytest.fixture @@ -47,7 +44,10 @@ def sender(client, accumulator, metrics): def test_produce_request(sender, mocker, api_version, produce_version): sender.config['api_version'] = api_version tp = TopicPartition('foo', 0) - records = MessageSetBuffer(io.BytesIO(), 100000) - batch = RecordBatch(tp, records) + buffer = io.BytesIO() + records = MemoryRecordsBuilder( + magic=1, compression_type=0, batch_size=100000) + batch = ProducerBatch(tp, records, buffer) + records.close() produce_request = sender._produce_request(0, 0, 0, [batch]) assert isinstance(produce_request, ProduceRequest[produce_version]) |