diff options
author | Taras <voyn1991@gmail.com> | 2017-10-10 00:13:16 +0300 |
---|---|---|
committer | Taras <voyn1991@gmail.com> | 2017-10-11 18:09:17 +0300 |
commit | fbea5f04bccd28f3aa15a1711548b131504591ac (patch) | |
tree | 1c8a0efe687c2ace72fa146b4f03e15def8e3a95 /test | |
parent | f04435c5ed97fef0975a77a8dc7bae7c284bba63 (diff) | |
download | kafka-python-fbea5f04bccd28f3aa15a1711548b131504591ac.tar.gz |
Refactor MessageSet and Message into LegacyRecordBatch to later support v2 message format
Diffstat (limited to 'test')
-rw-r--r-- | test/record/test_legacy_records.py | 85 | ||||
-rw-r--r-- | test/record/test_records.py | 108 | ||||
-rw-r--r-- | test/test_buffer.py | 72 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 6 | ||||
-rw-r--r-- | test/test_protocol.py | 5 | ||||
-rw-r--r-- | test/test_sender.py | 18 |
6 files changed, 209 insertions, 85 deletions
diff --git a/test/record/test_legacy_records.py b/test/record/test_legacy_records.py new file mode 100644 index 0000000..2d76695 --- /dev/null +++ b/test/record/test_legacy_records.py @@ -0,0 +1,85 @@ +import pytest +from kafka.record.legacy_records import ( + LegacyRecordBatch, LegacyRecordBatchBuilder +) +from kafka.protocol.message import Message + + +@pytest.mark.parametrize("magic", [0, 1]) +def test_read_write_serde_v0_v1_no_compression(magic): + builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=0, batch_size=9999999) + builder.append( + 0, timestamp=9999999, key=b"test", value=b"Super") + buffer = builder.build() + + batch = LegacyRecordBatch(bytes(buffer), magic) + msgs = list(batch) + assert len(msgs) == 1 + msg = msgs[0] + + assert msg.offset == 0 + assert msg.timestamp == (9999999 if magic else None) + assert msg.timestamp_type == (0 if magic else None) + assert msg.key == b"test" + assert msg.value == b"Super" + assert msg.checksum == (-2095076219 if magic else 278251978) & 0xffffffff + + +@pytest.mark.parametrize("compression_type", [ + Message.CODEC_GZIP, + Message.CODEC_SNAPPY, + Message.CODEC_LZ4 +]) +@pytest.mark.parametrize("magic", [0, 1]) +def test_read_write_serde_v0_v1_with_compression(compression_type, magic): + builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=compression_type, batch_size=9999999) + for offset in range(10): + builder.append( + offset, timestamp=9999999, key=b"test", value=b"Super") + buffer = builder.build() + + batch = LegacyRecordBatch(bytes(buffer), magic) + msgs = list(batch) + + expected_checksum = (-2095076219 if magic else 278251978) & 0xffffffff + for offset, msg in enumerate(msgs): + assert msg.offset == offset + assert msg.timestamp == (9999999 if magic else None) + assert msg.timestamp_type == (0 if magic else None) + assert msg.key == b"test" + assert msg.value == b"Super" + assert msg.checksum == expected_checksum + + +@pytest.mark.parametrize("magic", [0, 1]) +def test_written_bytes_equals_size_in_bytes(magic): + key = b"test" + value = b"Super" + builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=0, batch_size=9999999) + + size_in_bytes = builder.size_in_bytes( + 0, timestamp=9999999, key=key, value=value) + + pos = builder.size() + builder.append(0, timestamp=9999999, key=key, value=value) + + assert builder.size() - pos == size_in_bytes + + +@pytest.mark.parametrize("magic", [0, 1]) +def test_estimate_size_in_bytes_bigger_than_batch(magic): + key = b"Super Key" + value = b"1" * 100 + estimate_size = LegacyRecordBatchBuilder.estimate_size_in_bytes( + magic, compression_type=0, key=key, value=value) + + builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=0, batch_size=9999999) + builder.append( + 0, timestamp=9999999, key=key, value=value) + buf = builder.build() + assert len(buf) <= estimate_size, \ + "Estimate should always be upper bound" diff --git a/test/record/test_records.py b/test/record/test_records.py new file mode 100644 index 0000000..fc3eaca --- /dev/null +++ b/test/record/test_records.py @@ -0,0 +1,108 @@ +import pytest +from kafka.record import MemoryRecords +from kafka.errors import CorruptRecordException + +record_batch_data_v1 = [ + # First Message value == "123" + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19G\x86(\xc2\x01\x00\x00' + b'\x00\x01^\x18g\xab\xae\xff\xff\xff\xff\x00\x00\x00\x03123', + # Second Message value == "" + b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x16\xef\x98\xc9 \x01\x00' + b'\x00\x00\x01^\x18g\xaf\xc0\xff\xff\xff\xff\x00\x00\x00\x00', + # Third Message value == "" + b'\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x16_\xaf\xfb^\x01\x00\x00' + b'\x00\x01^\x18g\xb0r\xff\xff\xff\xff\x00\x00\x00\x00', + # Fourth Message value = "123" + b'\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x19\xa8\x12W \x01\x00\x00' + b'\x00\x01^\x18g\xb8\x03\xff\xff\xff\xff\x00\x00\x00\x03123' +] + +# This is real live data from Kafka 10 broker +record_batch_data_v0 = [ + # First Message value == "123" + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x11\xfe\xb0\x1d\xbf\x00' + b'\x00\xff\xff\xff\xff\x00\x00\x00\x03123', + # Second Message value == "" + b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x0eyWH\xe0\x00\x00\xff' + b'\xff\xff\xff\x00\x00\x00\x00', + # Third Message value == "" + b'\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x0eyWH\xe0\x00\x00\xff' + b'\xff\xff\xff\x00\x00\x00\x00', + # Fourth Message value = "123" + b'\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x11\xfe\xb0\x1d\xbf\x00' + b'\x00\xff\xff\xff\xff\x00\x00\x00\x03123' +] + + +def test_memory_records_v1(): + data_bytes = b"".join(record_batch_data_v1) + b"\x00" * 4 + records = MemoryRecords(data_bytes) + + assert records.size_in_bytes() == 146 + assert records.valid_bytes() == 142 + + assert records.has_next() is True + batch = records.next_batch() + recs = list(batch) + assert len(recs) == 1 + assert recs[0].value == b"123" + assert recs[0].key is None + assert recs[0].timestamp == 1503648000942 + assert recs[0].timestamp_type == 0 + assert recs[0].checksum == 1199974594 & 0xffffffff + + assert records.next_batch() is not None + assert records.next_batch() is not None + assert records.next_batch() is not None + + assert records.has_next() is False + assert records.next_batch() is None + assert records.next_batch() is None + + +def test_memory_records_v0(): + data_bytes = b"".join(record_batch_data_v0) + records = MemoryRecords(data_bytes + b"\x00" * 4) + + assert records.size_in_bytes() == 114 + assert records.valid_bytes() == 110 + + records = MemoryRecords(data_bytes) + + assert records.has_next() is True + batch = records.next_batch() + recs = list(batch) + assert len(recs) == 1 + assert recs[0].value == b"123" + assert recs[0].key is None + assert recs[0].timestamp is None + assert recs[0].timestamp_type is None + assert recs[0].checksum == -22012481 & 0xffffffff + + assert records.next_batch() is not None + assert records.next_batch() is not None + assert records.next_batch() is not None + + assert records.has_next() is False + assert records.next_batch() is None + assert records.next_batch() is None + + +def test_memory_records_corrupt(): + records = MemoryRecords(b"") + assert records.size_in_bytes() == 0 + assert records.valid_bytes() == 0 + assert records.has_next() is False + + records = MemoryRecords(b"\x00\x00\x00") + assert records.size_in_bytes() == 3 + assert records.valid_bytes() == 0 + assert records.has_next() is False + + records = MemoryRecords( + b"\x00\x00\x00\x00\x00\x00\x00\x03" # Offset=3 + b"\x00\x00\x00\x03" # Length=3 + b"\xfe\xb0\x1d", # Some random bytes + ) + with pytest.raises(CorruptRecordException): + records.next_batch() diff --git a/test/test_buffer.py b/test/test_buffer.py deleted file mode 100644 index db6cbb3..0000000 --- a/test/test_buffer.py +++ /dev/null @@ -1,72 +0,0 @@ -# pylint: skip-file -from __future__ import absolute_import - -import io -import platform - -import pytest - -from kafka.producer.buffer import MessageSetBuffer -from kafka.protocol.message import Message, MessageSet - - -def test_buffer_close(): - records = MessageSetBuffer(io.BytesIO(), 100000) - orig_msg = Message(b'foobar') - records.append(1234, orig_msg) - records.close() - - msgset = MessageSet.decode(records.buffer()) - assert len(msgset) == 1 - (offset, size, msg) = msgset[0] - assert offset == 1234 - assert msg == orig_msg - - # Closing again should work fine - records.close() - - msgset = MessageSet.decode(records.buffer()) - assert len(msgset) == 1 - (offset, size, msg) = msgset[0] - assert offset == 1234 - assert msg == orig_msg - - -@pytest.mark.parametrize('compression', [ - 'gzip', - 'snappy', - pytest.mark.skipif(platform.python_implementation() == 'PyPy', - reason='python-lz4 crashes on older versions of pypy')('lz4'), -]) -def test_compressed_buffer_close(compression): - records = MessageSetBuffer(io.BytesIO(), 100000, compression_type=compression) - orig_msg = Message(b'foobar') - records.append(1234, orig_msg) - records.close() - - msgset = MessageSet.decode(records.buffer()) - assert len(msgset) == 1 - (offset, size, msg) = msgset[0] - assert offset == 0 - assert msg.is_compressed() - - msgset = msg.decompress() - (offset, size, msg) = msgset[0] - assert not msg.is_compressed() - assert offset == 1234 - assert msg == orig_msg - - # Closing again should work fine - records.close() - - msgset = MessageSet.decode(records.buffer()) - assert len(msgset) == 1 - (offset, size, msg) = msgset[0] - assert offset == 0 - assert msg.is_compressed() - - msgset = msg.decompress() - (offset, size, msg) = msgset[0] - assert not msg.is_compressed() - assert offset == 1234 - assert msg == orig_msg diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 17e7401..d1843b3 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -26,6 +26,8 @@ from test.testutil import ( class TestConsumerIntegration(KafkaIntegrationTestCase): + maxDiff = None + @classmethod def setUpClass(cls): if not os.environ.get('KAFKA_VERSION'): @@ -648,10 +650,10 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): kafka_producer = self.kafka_producer() early_msg = kafka_producer.send( self.topic, partition=0, value=b"first", - timestamp_ms=early_time).get() + timestamp_ms=early_time).get(1) late_msg = kafka_producer.send( self.topic, partition=0, value=b"last", - timestamp_ms=late_time).get() + timestamp_ms=late_time).get(1) consumer = self.kafka_consumer() offsets = consumer.offsets_for_times({tp: early_time}) diff --git a/test/test_protocol.py b/test/test_protocol.py index 0203614..d963650 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -260,13 +260,14 @@ def test_decode_fetch_response_partial(): struct.pack('>i', 8), # Length of value b'ar', # Value (truncated) ]) - resp = FetchResponse[0].decode(io.BytesIO(encoded)) assert len(resp.topics) == 1 topic, partitions = resp.topics[0] assert topic == 'foobar' assert len(partitions) == 2 - m1 = partitions[0][3] + + m1 = MessageSet.decode( + partitions[0][3], bytes_to_read=len(partitions[0][3])) assert len(m1) == 2 assert m1[1] == (None, None, PartialMessage()) diff --git a/test/test_sender.py b/test/test_sender.py index f37e194..2a68def 100644 --- a/test/test_sender.py +++ b/test/test_sender.py @@ -1,20 +1,17 @@ # pylint: skip-file from __future__ import absolute_import -import io - import pytest +import io from kafka.client_async import KafkaClient from kafka.cluster import ClusterMetadata -import kafka.errors as Errors -from kafka.future import Future from kafka.metrics import Metrics -from kafka.producer.buffer import MessageSetBuffer from kafka.protocol.produce import ProduceRequest -from kafka.producer.record_accumulator import RecordAccumulator, RecordBatch +from kafka.producer.record_accumulator import RecordAccumulator, ProducerBatch from kafka.producer.sender import Sender -from kafka.structs import TopicPartition, OffsetAndMetadata +from kafka.record.memory_records import MemoryRecordsBuilder +from kafka.structs import TopicPartition @pytest.fixture @@ -47,7 +44,10 @@ def sender(client, accumulator, metrics): def test_produce_request(sender, mocker, api_version, produce_version): sender.config['api_version'] = api_version tp = TopicPartition('foo', 0) - records = MessageSetBuffer(io.BytesIO(), 100000) - batch = RecordBatch(tp, records) + buffer = io.BytesIO() + records = MemoryRecordsBuilder( + magic=1, compression_type=0, batch_size=100000) + batch = ProducerBatch(tp, records, buffer) + records.close() produce_request = sender._produce_request(0, 0, 0, [batch]) assert isinstance(produce_request, ProduceRequest[produce_version]) |