summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorTaras <voyn1991@gmail.com>2017-10-10 00:13:16 +0300
committerTaras <voyn1991@gmail.com>2017-10-11 18:09:17 +0300
commitfbea5f04bccd28f3aa15a1711548b131504591ac (patch)
tree1c8a0efe687c2ace72fa146b4f03e15def8e3a95 /test
parentf04435c5ed97fef0975a77a8dc7bae7c284bba63 (diff)
downloadkafka-python-fbea5f04bccd28f3aa15a1711548b131504591ac.tar.gz
Refactor MessageSet and Message into LegacyRecordBatch to later support v2 message format
Diffstat (limited to 'test')
-rw-r--r--test/record/test_legacy_records.py85
-rw-r--r--test/record/test_records.py108
-rw-r--r--test/test_buffer.py72
-rw-r--r--test/test_consumer_integration.py6
-rw-r--r--test/test_protocol.py5
-rw-r--r--test/test_sender.py18
6 files changed, 209 insertions, 85 deletions
diff --git a/test/record/test_legacy_records.py b/test/record/test_legacy_records.py
new file mode 100644
index 0000000..2d76695
--- /dev/null
+++ b/test/record/test_legacy_records.py
@@ -0,0 +1,85 @@
+import pytest
+from kafka.record.legacy_records import (
+ LegacyRecordBatch, LegacyRecordBatchBuilder
+)
+from kafka.protocol.message import Message
+
+
+@pytest.mark.parametrize("magic", [0, 1])
+def test_read_write_serde_v0_v1_no_compression(magic):
+ builder = LegacyRecordBatchBuilder(
+ magic=magic, compression_type=0, batch_size=9999999)
+ builder.append(
+ 0, timestamp=9999999, key=b"test", value=b"Super")
+ buffer = builder.build()
+
+ batch = LegacyRecordBatch(bytes(buffer), magic)
+ msgs = list(batch)
+ assert len(msgs) == 1
+ msg = msgs[0]
+
+ assert msg.offset == 0
+ assert msg.timestamp == (9999999 if magic else None)
+ assert msg.timestamp_type == (0 if magic else None)
+ assert msg.key == b"test"
+ assert msg.value == b"Super"
+ assert msg.checksum == (-2095076219 if magic else 278251978) & 0xffffffff
+
+
+@pytest.mark.parametrize("compression_type", [
+ Message.CODEC_GZIP,
+ Message.CODEC_SNAPPY,
+ Message.CODEC_LZ4
+])
+@pytest.mark.parametrize("magic", [0, 1])
+def test_read_write_serde_v0_v1_with_compression(compression_type, magic):
+ builder = LegacyRecordBatchBuilder(
+ magic=magic, compression_type=compression_type, batch_size=9999999)
+ for offset in range(10):
+ builder.append(
+ offset, timestamp=9999999, key=b"test", value=b"Super")
+ buffer = builder.build()
+
+ batch = LegacyRecordBatch(bytes(buffer), magic)
+ msgs = list(batch)
+
+ expected_checksum = (-2095076219 if magic else 278251978) & 0xffffffff
+ for offset, msg in enumerate(msgs):
+ assert msg.offset == offset
+ assert msg.timestamp == (9999999 if magic else None)
+ assert msg.timestamp_type == (0 if magic else None)
+ assert msg.key == b"test"
+ assert msg.value == b"Super"
+ assert msg.checksum == expected_checksum
+
+
+@pytest.mark.parametrize("magic", [0, 1])
+def test_written_bytes_equals_size_in_bytes(magic):
+ key = b"test"
+ value = b"Super"
+ builder = LegacyRecordBatchBuilder(
+ magic=magic, compression_type=0, batch_size=9999999)
+
+ size_in_bytes = builder.size_in_bytes(
+ 0, timestamp=9999999, key=key, value=value)
+
+ pos = builder.size()
+ builder.append(0, timestamp=9999999, key=key, value=value)
+
+ assert builder.size() - pos == size_in_bytes
+
+
+@pytest.mark.parametrize("magic", [0, 1])
+def test_estimate_size_in_bytes_bigger_than_batch(magic):
+ key = b"Super Key"
+ value = b"1" * 100
+ estimate_size = LegacyRecordBatchBuilder.estimate_size_in_bytes(
+ magic, compression_type=0, key=key, value=value)
+
+ builder = LegacyRecordBatchBuilder(
+ magic=magic, compression_type=0, batch_size=9999999)
+ builder.append(
+ 0, timestamp=9999999, key=key, value=value)
+ buf = builder.build()
+ assert len(buf) <= estimate_size, \
+ "Estimate should always be upper bound"
diff --git a/test/record/test_records.py b/test/record/test_records.py
new file mode 100644
index 0000000..fc3eaca
--- /dev/null
+++ b/test/record/test_records.py
@@ -0,0 +1,108 @@
+import pytest
+from kafka.record import MemoryRecords
+from kafka.errors import CorruptRecordException
+
+record_batch_data_v1 = [
+ # First Message value == "123"
+ b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19G\x86(\xc2\x01\x00\x00'
+ b'\x00\x01^\x18g\xab\xae\xff\xff\xff\xff\x00\x00\x00\x03123',
+ # Second Message value == ""
+ b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x16\xef\x98\xc9 \x01\x00'
+ b'\x00\x00\x01^\x18g\xaf\xc0\xff\xff\xff\xff\x00\x00\x00\x00',
+ # Third Message value == ""
+ b'\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x16_\xaf\xfb^\x01\x00\x00'
+ b'\x00\x01^\x18g\xb0r\xff\xff\xff\xff\x00\x00\x00\x00',
+ # Fourth Message value = "123"
+ b'\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x19\xa8\x12W \x01\x00\x00'
+ b'\x00\x01^\x18g\xb8\x03\xff\xff\xff\xff\x00\x00\x00\x03123'
+]
+
+# This is real live data from Kafka 10 broker
+record_batch_data_v0 = [
+ # First Message value == "123"
+ b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x11\xfe\xb0\x1d\xbf\x00'
+ b'\x00\xff\xff\xff\xff\x00\x00\x00\x03123',
+ # Second Message value == ""
+ b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x0eyWH\xe0\x00\x00\xff'
+ b'\xff\xff\xff\x00\x00\x00\x00',
+ # Third Message value == ""
+ b'\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x0eyWH\xe0\x00\x00\xff'
+ b'\xff\xff\xff\x00\x00\x00\x00',
+ # Fourth Message value = "123"
+ b'\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x11\xfe\xb0\x1d\xbf\x00'
+ b'\x00\xff\xff\xff\xff\x00\x00\x00\x03123'
+]
+
+
+def test_memory_records_v1():
+ data_bytes = b"".join(record_batch_data_v1) + b"\x00" * 4
+ records = MemoryRecords(data_bytes)
+
+ assert records.size_in_bytes() == 146
+ assert records.valid_bytes() == 142
+
+ assert records.has_next() is True
+ batch = records.next_batch()
+ recs = list(batch)
+ assert len(recs) == 1
+ assert recs[0].value == b"123"
+ assert recs[0].key is None
+ assert recs[0].timestamp == 1503648000942
+ assert recs[0].timestamp_type == 0
+ assert recs[0].checksum == 1199974594 & 0xffffffff
+
+ assert records.next_batch() is not None
+ assert records.next_batch() is not None
+ assert records.next_batch() is not None
+
+ assert records.has_next() is False
+ assert records.next_batch() is None
+ assert records.next_batch() is None
+
+
+def test_memory_records_v0():
+ data_bytes = b"".join(record_batch_data_v0)
+ records = MemoryRecords(data_bytes + b"\x00" * 4)
+
+ assert records.size_in_bytes() == 114
+ assert records.valid_bytes() == 110
+
+ records = MemoryRecords(data_bytes)
+
+ assert records.has_next() is True
+ batch = records.next_batch()
+ recs = list(batch)
+ assert len(recs) == 1
+ assert recs[0].value == b"123"
+ assert recs[0].key is None
+ assert recs[0].timestamp is None
+ assert recs[0].timestamp_type is None
+ assert recs[0].checksum == -22012481 & 0xffffffff
+
+ assert records.next_batch() is not None
+ assert records.next_batch() is not None
+ assert records.next_batch() is not None
+
+ assert records.has_next() is False
+ assert records.next_batch() is None
+ assert records.next_batch() is None
+
+
+def test_memory_records_corrupt():
+ records = MemoryRecords(b"")
+ assert records.size_in_bytes() == 0
+ assert records.valid_bytes() == 0
+ assert records.has_next() is False
+
+ records = MemoryRecords(b"\x00\x00\x00")
+ assert records.size_in_bytes() == 3
+ assert records.valid_bytes() == 0
+ assert records.has_next() is False
+
+ records = MemoryRecords(
+ b"\x00\x00\x00\x00\x00\x00\x00\x03" # Offset=3
+ b"\x00\x00\x00\x03" # Length=3
+ b"\xfe\xb0\x1d", # Some random bytes
+ )
+ with pytest.raises(CorruptRecordException):
+ records.next_batch()
diff --git a/test/test_buffer.py b/test/test_buffer.py
deleted file mode 100644
index db6cbb3..0000000
--- a/test/test_buffer.py
+++ /dev/null
@@ -1,72 +0,0 @@
-# pylint: skip-file
-from __future__ import absolute_import
-
-import io
-import platform
-
-import pytest
-
-from kafka.producer.buffer import MessageSetBuffer
-from kafka.protocol.message import Message, MessageSet
-
-
-def test_buffer_close():
- records = MessageSetBuffer(io.BytesIO(), 100000)
- orig_msg = Message(b'foobar')
- records.append(1234, orig_msg)
- records.close()
-
- msgset = MessageSet.decode(records.buffer())
- assert len(msgset) == 1
- (offset, size, msg) = msgset[0]
- assert offset == 1234
- assert msg == orig_msg
-
- # Closing again should work fine
- records.close()
-
- msgset = MessageSet.decode(records.buffer())
- assert len(msgset) == 1
- (offset, size, msg) = msgset[0]
- assert offset == 1234
- assert msg == orig_msg
-
-
-@pytest.mark.parametrize('compression', [
- 'gzip',
- 'snappy',
- pytest.mark.skipif(platform.python_implementation() == 'PyPy',
- reason='python-lz4 crashes on older versions of pypy')('lz4'),
-])
-def test_compressed_buffer_close(compression):
- records = MessageSetBuffer(io.BytesIO(), 100000, compression_type=compression)
- orig_msg = Message(b'foobar')
- records.append(1234, orig_msg)
- records.close()
-
- msgset = MessageSet.decode(records.buffer())
- assert len(msgset) == 1
- (offset, size, msg) = msgset[0]
- assert offset == 0
- assert msg.is_compressed()
-
- msgset = msg.decompress()
- (offset, size, msg) = msgset[0]
- assert not msg.is_compressed()
- assert offset == 1234
- assert msg == orig_msg
-
- # Closing again should work fine
- records.close()
-
- msgset = MessageSet.decode(records.buffer())
- assert len(msgset) == 1
- (offset, size, msg) = msgset[0]
- assert offset == 0
- assert msg.is_compressed()
-
- msgset = msg.decompress()
- (offset, size, msg) = msgset[0]
- assert not msg.is_compressed()
- assert offset == 1234
- assert msg == orig_msg
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 17e7401..d1843b3 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -26,6 +26,8 @@ from test.testutil import (
class TestConsumerIntegration(KafkaIntegrationTestCase):
+ maxDiff = None
+
@classmethod
def setUpClass(cls):
if not os.environ.get('KAFKA_VERSION'):
@@ -648,10 +650,10 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
kafka_producer = self.kafka_producer()
early_msg = kafka_producer.send(
self.topic, partition=0, value=b"first",
- timestamp_ms=early_time).get()
+ timestamp_ms=early_time).get(1)
late_msg = kafka_producer.send(
self.topic, partition=0, value=b"last",
- timestamp_ms=late_time).get()
+ timestamp_ms=late_time).get(1)
consumer = self.kafka_consumer()
offsets = consumer.offsets_for_times({tp: early_time})
diff --git a/test/test_protocol.py b/test/test_protocol.py
index 0203614..d963650 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -260,13 +260,14 @@ def test_decode_fetch_response_partial():
struct.pack('>i', 8), # Length of value
b'ar', # Value (truncated)
])
-
resp = FetchResponse[0].decode(io.BytesIO(encoded))
assert len(resp.topics) == 1
topic, partitions = resp.topics[0]
assert topic == 'foobar'
assert len(partitions) == 2
- m1 = partitions[0][3]
+
+ m1 = MessageSet.decode(
+ partitions[0][3], bytes_to_read=len(partitions[0][3]))
assert len(m1) == 2
assert m1[1] == (None, None, PartialMessage())
diff --git a/test/test_sender.py b/test/test_sender.py
index f37e194..2a68def 100644
--- a/test/test_sender.py
+++ b/test/test_sender.py
@@ -1,20 +1,17 @@
# pylint: skip-file
from __future__ import absolute_import
-import io
-
import pytest
+import io
from kafka.client_async import KafkaClient
from kafka.cluster import ClusterMetadata
-import kafka.errors as Errors
-from kafka.future import Future
from kafka.metrics import Metrics
-from kafka.producer.buffer import MessageSetBuffer
from kafka.protocol.produce import ProduceRequest
-from kafka.producer.record_accumulator import RecordAccumulator, RecordBatch
+from kafka.producer.record_accumulator import RecordAccumulator, ProducerBatch
from kafka.producer.sender import Sender
-from kafka.structs import TopicPartition, OffsetAndMetadata
+from kafka.record.memory_records import MemoryRecordsBuilder
+from kafka.structs import TopicPartition
@pytest.fixture
@@ -47,7 +44,10 @@ def sender(client, accumulator, metrics):
def test_produce_request(sender, mocker, api_version, produce_version):
sender.config['api_version'] = api_version
tp = TopicPartition('foo', 0)
- records = MessageSetBuffer(io.BytesIO(), 100000)
- batch = RecordBatch(tp, records)
+ buffer = io.BytesIO()
+ records = MemoryRecordsBuilder(
+ magic=1, compression_type=0, batch_size=100000)
+ batch = ProducerBatch(tp, records, buffer)
+ records.close()
produce_request = sender._produce_request(0, 0, 0, [batch])
assert isinstance(produce_request, ProduceRequest[produce_version])