summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorTaras Voinarovskyi <voyn1991@gmail.com>2017-10-14 23:06:27 +0300
committerGitHub <noreply@github.com>2017-10-14 23:06:27 +0300
commitfbbd6ca5d999a8520d483ecfe0ad6f805eb8833f (patch)
tree52e5860b1f8738b15e7c757c205961b761badd2b /test
parentdd8e33654f2270097d6c1373dc272153670e48f8 (diff)
parent365cae02da59721df77923bb5f5a2d94a84b2e83 (diff)
downloadkafka-python-fbbd6ca5d999a8520d483ecfe0ad6f805eb8833f.tar.gz
Merge pull request #1252 from dpkp/legacy_records_refactor
Refactor MessageSet and Message into LegacyRecordBatch
Diffstat (limited to 'test')
-rw-r--r--test/record/test_legacy_records.py85
-rw-r--r--test/record/test_records.py108
-rw-r--r--test/test_buffer.py72
-rw-r--r--test/test_consumer_integration.py6
-rw-r--r--test/test_fetcher.py122
-rw-r--r--test/test_protocol.py5
-rw-r--r--test/test_sender.py18
7 files changed, 236 insertions, 180 deletions
diff --git a/test/record/test_legacy_records.py b/test/record/test_legacy_records.py
new file mode 100644
index 0000000..2d76695
--- /dev/null
+++ b/test/record/test_legacy_records.py
@@ -0,0 +1,85 @@
+import pytest
+from kafka.record.legacy_records import (
+ LegacyRecordBatch, LegacyRecordBatchBuilder
+)
+from kafka.protocol.message import Message
+
+
+@pytest.mark.parametrize("magic", [0, 1])
+def test_read_write_serde_v0_v1_no_compression(magic):
+ builder = LegacyRecordBatchBuilder(
+ magic=magic, compression_type=0, batch_size=9999999)
+ builder.append(
+ 0, timestamp=9999999, key=b"test", value=b"Super")
+ buffer = builder.build()
+
+ batch = LegacyRecordBatch(bytes(buffer), magic)
+ msgs = list(batch)
+ assert len(msgs) == 1
+ msg = msgs[0]
+
+ assert msg.offset == 0
+ assert msg.timestamp == (9999999 if magic else None)
+ assert msg.timestamp_type == (0 if magic else None)
+ assert msg.key == b"test"
+ assert msg.value == b"Super"
+ assert msg.checksum == (-2095076219 if magic else 278251978) & 0xffffffff
+
+
+@pytest.mark.parametrize("compression_type", [
+ Message.CODEC_GZIP,
+ Message.CODEC_SNAPPY,
+ Message.CODEC_LZ4
+])
+@pytest.mark.parametrize("magic", [0, 1])
+def test_read_write_serde_v0_v1_with_compression(compression_type, magic):
+ builder = LegacyRecordBatchBuilder(
+ magic=magic, compression_type=compression_type, batch_size=9999999)
+ for offset in range(10):
+ builder.append(
+ offset, timestamp=9999999, key=b"test", value=b"Super")
+ buffer = builder.build()
+
+ batch = LegacyRecordBatch(bytes(buffer), magic)
+ msgs = list(batch)
+
+ expected_checksum = (-2095076219 if magic else 278251978) & 0xffffffff
+ for offset, msg in enumerate(msgs):
+ assert msg.offset == offset
+ assert msg.timestamp == (9999999 if magic else None)
+ assert msg.timestamp_type == (0 if magic else None)
+ assert msg.key == b"test"
+ assert msg.value == b"Super"
+ assert msg.checksum == expected_checksum
+
+
+@pytest.mark.parametrize("magic", [0, 1])
+def test_written_bytes_equals_size_in_bytes(magic):
+ key = b"test"
+ value = b"Super"
+ builder = LegacyRecordBatchBuilder(
+ magic=magic, compression_type=0, batch_size=9999999)
+
+ size_in_bytes = builder.size_in_bytes(
+ 0, timestamp=9999999, key=key, value=value)
+
+ pos = builder.size()
+ builder.append(0, timestamp=9999999, key=key, value=value)
+
+ assert builder.size() - pos == size_in_bytes
+
+
+@pytest.mark.parametrize("magic", [0, 1])
+def test_estimate_size_in_bytes_bigger_than_batch(magic):
+ key = b"Super Key"
+ value = b"1" * 100
+ estimate_size = LegacyRecordBatchBuilder.estimate_size_in_bytes(
+ magic, compression_type=0, key=key, value=value)
+
+ builder = LegacyRecordBatchBuilder(
+ magic=magic, compression_type=0, batch_size=9999999)
+ builder.append(
+ 0, timestamp=9999999, key=key, value=value)
+ buf = builder.build()
+ assert len(buf) <= estimate_size, \
+ "Estimate should always be upper bound"
diff --git a/test/record/test_records.py b/test/record/test_records.py
new file mode 100644
index 0000000..fc3eaca
--- /dev/null
+++ b/test/record/test_records.py
@@ -0,0 +1,108 @@
+import pytest
+from kafka.record import MemoryRecords
+from kafka.errors import CorruptRecordException
+
+record_batch_data_v1 = [
+ # First Message value == "123"
+ b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19G\x86(\xc2\x01\x00\x00'
+ b'\x00\x01^\x18g\xab\xae\xff\xff\xff\xff\x00\x00\x00\x03123',
+ # Second Message value == ""
+ b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x16\xef\x98\xc9 \x01\x00'
+ b'\x00\x00\x01^\x18g\xaf\xc0\xff\xff\xff\xff\x00\x00\x00\x00',
+ # Third Message value == ""
+ b'\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x16_\xaf\xfb^\x01\x00\x00'
+ b'\x00\x01^\x18g\xb0r\xff\xff\xff\xff\x00\x00\x00\x00',
+ # Fourth Message value = "123"
+ b'\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x19\xa8\x12W \x01\x00\x00'
+ b'\x00\x01^\x18g\xb8\x03\xff\xff\xff\xff\x00\x00\x00\x03123'
+]
+
+# This is real live data from Kafka 10 broker
+record_batch_data_v0 = [
+ # First Message value == "123"
+ b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x11\xfe\xb0\x1d\xbf\x00'
+ b'\x00\xff\xff\xff\xff\x00\x00\x00\x03123',
+ # Second Message value == ""
+ b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x0eyWH\xe0\x00\x00\xff'
+ b'\xff\xff\xff\x00\x00\x00\x00',
+ # Third Message value == ""
+ b'\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x0eyWH\xe0\x00\x00\xff'
+ b'\xff\xff\xff\x00\x00\x00\x00',
+ # Fourth Message value = "123"
+ b'\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x11\xfe\xb0\x1d\xbf\x00'
+ b'\x00\xff\xff\xff\xff\x00\x00\x00\x03123'
+]
+
+
+def test_memory_records_v1():
+ data_bytes = b"".join(record_batch_data_v1) + b"\x00" * 4
+ records = MemoryRecords(data_bytes)
+
+ assert records.size_in_bytes() == 146
+ assert records.valid_bytes() == 142
+
+ assert records.has_next() is True
+ batch = records.next_batch()
+ recs = list(batch)
+ assert len(recs) == 1
+ assert recs[0].value == b"123"
+ assert recs[0].key is None
+ assert recs[0].timestamp == 1503648000942
+ assert recs[0].timestamp_type == 0
+ assert recs[0].checksum == 1199974594 & 0xffffffff
+
+ assert records.next_batch() is not None
+ assert records.next_batch() is not None
+ assert records.next_batch() is not None
+
+ assert records.has_next() is False
+ assert records.next_batch() is None
+ assert records.next_batch() is None
+
+
+def test_memory_records_v0():
+ data_bytes = b"".join(record_batch_data_v0)
+ records = MemoryRecords(data_bytes + b"\x00" * 4)
+
+ assert records.size_in_bytes() == 114
+ assert records.valid_bytes() == 110
+
+ records = MemoryRecords(data_bytes)
+
+ assert records.has_next() is True
+ batch = records.next_batch()
+ recs = list(batch)
+ assert len(recs) == 1
+ assert recs[0].value == b"123"
+ assert recs[0].key is None
+ assert recs[0].timestamp is None
+ assert recs[0].timestamp_type is None
+ assert recs[0].checksum == -22012481 & 0xffffffff
+
+ assert records.next_batch() is not None
+ assert records.next_batch() is not None
+ assert records.next_batch() is not None
+
+ assert records.has_next() is False
+ assert records.next_batch() is None
+ assert records.next_batch() is None
+
+
+def test_memory_records_corrupt():
+ records = MemoryRecords(b"")
+ assert records.size_in_bytes() == 0
+ assert records.valid_bytes() == 0
+ assert records.has_next() is False
+
+ records = MemoryRecords(b"\x00\x00\x00")
+ assert records.size_in_bytes() == 3
+ assert records.valid_bytes() == 0
+ assert records.has_next() is False
+
+ records = MemoryRecords(
+ b"\x00\x00\x00\x00\x00\x00\x00\x03" # Offset=3
+ b"\x00\x00\x00\x03" # Length=3
+ b"\xfe\xb0\x1d", # Some random bytes
+ )
+ with pytest.raises(CorruptRecordException):
+ records.next_batch()
diff --git a/test/test_buffer.py b/test/test_buffer.py
deleted file mode 100644
index db6cbb3..0000000
--- a/test/test_buffer.py
+++ /dev/null
@@ -1,72 +0,0 @@
-# pylint: skip-file
-from __future__ import absolute_import
-
-import io
-import platform
-
-import pytest
-
-from kafka.producer.buffer import MessageSetBuffer
-from kafka.protocol.message import Message, MessageSet
-
-
-def test_buffer_close():
- records = MessageSetBuffer(io.BytesIO(), 100000)
- orig_msg = Message(b'foobar')
- records.append(1234, orig_msg)
- records.close()
-
- msgset = MessageSet.decode(records.buffer())
- assert len(msgset) == 1
- (offset, size, msg) = msgset[0]
- assert offset == 1234
- assert msg == orig_msg
-
- # Closing again should work fine
- records.close()
-
- msgset = MessageSet.decode(records.buffer())
- assert len(msgset) == 1
- (offset, size, msg) = msgset[0]
- assert offset == 1234
- assert msg == orig_msg
-
-
-@pytest.mark.parametrize('compression', [
- 'gzip',
- 'snappy',
- pytest.mark.skipif(platform.python_implementation() == 'PyPy',
- reason='python-lz4 crashes on older versions of pypy')('lz4'),
-])
-def test_compressed_buffer_close(compression):
- records = MessageSetBuffer(io.BytesIO(), 100000, compression_type=compression)
- orig_msg = Message(b'foobar')
- records.append(1234, orig_msg)
- records.close()
-
- msgset = MessageSet.decode(records.buffer())
- assert len(msgset) == 1
- (offset, size, msg) = msgset[0]
- assert offset == 0
- assert msg.is_compressed()
-
- msgset = msg.decompress()
- (offset, size, msg) = msgset[0]
- assert not msg.is_compressed()
- assert offset == 1234
- assert msg == orig_msg
-
- # Closing again should work fine
- records.close()
-
- msgset = MessageSet.decode(records.buffer())
- assert len(msgset) == 1
- (offset, size, msg) = msgset[0]
- assert offset == 0
- assert msg.is_compressed()
-
- msgset = msg.decompress()
- (offset, size, msg) = msgset[0]
- assert not msg.is_compressed()
- assert offset == 1234
- assert msg == orig_msg
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 17e7401..d1843b3 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -26,6 +26,8 @@ from test.testutil import (
class TestConsumerIntegration(KafkaIntegrationTestCase):
+ maxDiff = None
+
@classmethod
def setUpClass(cls):
if not os.environ.get('KAFKA_VERSION'):
@@ -648,10 +650,10 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
kafka_producer = self.kafka_producer()
early_msg = kafka_producer.send(
self.topic, partition=0, value=b"first",
- timestamp_ms=early_time).get()
+ timestamp_ms=early_time).get(1)
late_msg = kafka_producer.send(
self.topic, partition=0, value=b"last",
- timestamp_ms=late_time).get()
+ timestamp_ms=late_time).get(1)
consumer = self.kafka_consumer()
offsets = consumer.offsets_for_times({tp: early_time})
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
index 5da597c..364a808 100644
--- a/test/test_fetcher.py
+++ b/test/test_fetcher.py
@@ -8,22 +8,20 @@ import itertools
import time
from kafka.client_async import KafkaClient
-from kafka.codec import gzip_encode
from kafka.consumer.fetcher import (
CompletedFetch, ConsumerRecord, Fetcher, NoOffsetForPartitionError
)
from kafka.consumer.subscription_state import SubscriptionState
from kafka.metrics import Metrics
from kafka.protocol.fetch import FetchRequest, FetchResponse
-from kafka.protocol.message import Message
from kafka.protocol.offset import OffsetResponse
-from kafka.protocol.types import Int64, Int32
from kafka.structs import TopicPartition
from kafka.future import Future
from kafka.errors import (
StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError,
UnknownTopicOrPartitionError, OffsetOutOfRangeError
)
+from kafka.record.memory_records import MemoryRecordsBuilder, MemoryRecords
@pytest.fixture
@@ -51,6 +49,16 @@ def fetcher(client, subscription_state, topic):
return Fetcher(client, subscription_state, Metrics())
+def _build_record_batch(msgs, compression=0):
+ builder = MemoryRecordsBuilder(
+ magic=1, compression_type=0, batch_size=9999999)
+ for msg in msgs:
+ key, value, timestamp = msg
+ builder.append(key=key, value=value, timestamp=timestamp)
+ builder.close()
+ return builder.buffer()
+
+
def test_send_fetches(fetcher, topic, mocker):
fetch_requests = [
FetchRequest[0](
@@ -321,12 +329,12 @@ def test_partition_records_offset():
def test_fetched_records(fetcher, topic, mocker):
fetcher.config['check_crcs'] = False
tp = TopicPartition(topic, 0)
+
msgs = []
for i in range(10):
- msg = Message(b'foo')
- msgs.append((i, -1, msg))
+ msgs.append((None, b"foo", None))
completed_fetch = CompletedFetch(
- tp, 0, 0, [0, 100, msgs],
+ tp, 0, 0, [0, 100, _build_record_batch(msgs)],
mocker.MagicMock()
)
fetcher._completed_fetches.append(completed_fetch)
@@ -401,11 +409,12 @@ def test__unpack_message_set(fetcher):
fetcher.config['check_crcs'] = False
tp = TopicPartition('foo', 0)
messages = [
- (0, None, Message(b'a')),
- (1, None, Message(b'b')),
- (2, None, Message(b'c'))
+ (None, b"a", None),
+ (None, b"b", None),
+ (None, b"c", None),
]
- records = list(fetcher._unpack_message_set(tp, messages))
+ memory_records = MemoryRecords(_build_record_batch(messages))
+ records = list(fetcher._unpack_message_set(tp, memory_records))
assert len(records) == 3
assert all(map(lambda x: isinstance(x, ConsumerRecord), records))
assert records[0].value == b'a'
@@ -416,88 +425,14 @@ def test__unpack_message_set(fetcher):
assert records[2].offset == 2
-def test__unpack_message_set_compressed_v0(fetcher):
- fetcher.config['check_crcs'] = False
- tp = TopicPartition('foo', 0)
- messages = [
- (0, None, Message(b'a')),
- (1, None, Message(b'b')),
- (2, None, Message(b'c')),
- ]
- message_bytes = []
- for offset, _, m in messages:
- encoded = m.encode()
- message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded)
- compressed_bytes = gzip_encode(b''.join(message_bytes))
- compressed_base_offset = 0
- compressed_msgs = [
- (compressed_base_offset, None,
- Message(compressed_bytes,
- magic=0,
- attributes=Message.CODEC_GZIP))
- ]
- records = list(fetcher._unpack_message_set(tp, compressed_msgs))
- assert len(records) == 3
- assert all(map(lambda x: isinstance(x, ConsumerRecord), records))
- assert records[0].value == b'a'
- assert records[1].value == b'b'
- assert records[2].value == b'c'
- assert records[0].offset == 0
- assert records[1].offset == 1
- assert records[2].offset == 2
-
-
-def test__unpack_message_set_compressed_v1(fetcher):
- fetcher.config['check_crcs'] = False
- tp = TopicPartition('foo', 0)
- messages = [
- (0, None, Message(b'a')),
- (1, None, Message(b'b')),
- (2, None, Message(b'c')),
- ]
- message_bytes = []
- for offset, _, m in messages:
- encoded = m.encode()
- message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded)
- compressed_bytes = gzip_encode(b''.join(message_bytes))
- compressed_base_offset = 10
- compressed_msgs = [
- (compressed_base_offset, None,
- Message(compressed_bytes,
- magic=1,
- attributes=Message.CODEC_GZIP))
- ]
- records = list(fetcher._unpack_message_set(tp, compressed_msgs))
- assert len(records) == 3
- assert all(map(lambda x: isinstance(x, ConsumerRecord), records))
- assert records[0].value == b'a'
- assert records[1].value == b'b'
- assert records[2].value == b'c'
- assert records[0].offset == 8
- assert records[1].offset == 9
- assert records[2].offset == 10
-
-
-def test__parse_record(fetcher):
- tp = TopicPartition('foo', 0)
- record = fetcher._parse_record(tp, 123, 456, Message(b'abc'))
- assert record.topic == 'foo'
- assert record.partition == 0
- assert record.offset == 123
- assert record.timestamp == 456
- assert record.value == b'abc'
- assert record.key is None
-
-
def test__message_generator(fetcher, topic, mocker):
fetcher.config['check_crcs'] = False
tp = TopicPartition(topic, 0)
msgs = []
for i in range(10):
- msg = Message(b'foo')
- msgs.append((i, -1, msg))
+ msgs.append((None, b"foo", None))
completed_fetch = CompletedFetch(
- tp, 0, 0, [0, 100, msgs],
+ tp, 0, 0, [0, 100, _build_record_batch(msgs)],
mocker.MagicMock()
)
fetcher._completed_fetches.append(completed_fetch)
@@ -513,10 +448,9 @@ def test__parse_fetched_data(fetcher, topic, mocker):
tp = TopicPartition(topic, 0)
msgs = []
for i in range(10):
- msg = Message(b'foo')
- msgs.append((i, -1, msg))
+ msgs.append((None, b"foo", None))
completed_fetch = CompletedFetch(
- tp, 0, 0, [0, 100, msgs],
+ tp, 0, 0, [0, 100, _build_record_batch(msgs)],
mocker.MagicMock()
)
partition_record = fetcher._parse_fetched_data(completed_fetch)
@@ -529,10 +463,9 @@ def test__parse_fetched_data__paused(fetcher, topic, mocker):
tp = TopicPartition(topic, 0)
msgs = []
for i in range(10):
- msg = Message(b'foo')
- msgs.append((i, -1, msg))
+ msgs.append((None, b"foo", None))
completed_fetch = CompletedFetch(
- tp, 0, 0, [0, 100, msgs],
+ tp, 0, 0, [0, 100, _build_record_batch(msgs)],
mocker.MagicMock()
)
fetcher._subscriptions.pause(tp)
@@ -545,10 +478,9 @@ def test__parse_fetched_data__stale_offset(fetcher, topic, mocker):
tp = TopicPartition(topic, 0)
msgs = []
for i in range(10):
- msg = Message(b'foo')
- msgs.append((i, -1, msg))
+ msgs.append((None, b"foo", None))
completed_fetch = CompletedFetch(
- tp, 10, 0, [0, 100, msgs],
+ tp, 10, 0, [0, 100, _build_record_batch(msgs)],
mocker.MagicMock()
)
partition_record = fetcher._parse_fetched_data(completed_fetch)
diff --git a/test/test_protocol.py b/test/test_protocol.py
index 0203614..d963650 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -260,13 +260,14 @@ def test_decode_fetch_response_partial():
struct.pack('>i', 8), # Length of value
b'ar', # Value (truncated)
])
-
resp = FetchResponse[0].decode(io.BytesIO(encoded))
assert len(resp.topics) == 1
topic, partitions = resp.topics[0]
assert topic == 'foobar'
assert len(partitions) == 2
- m1 = partitions[0][3]
+
+ m1 = MessageSet.decode(
+ partitions[0][3], bytes_to_read=len(partitions[0][3]))
assert len(m1) == 2
assert m1[1] == (None, None, PartialMessage())
diff --git a/test/test_sender.py b/test/test_sender.py
index f37e194..2a68def 100644
--- a/test/test_sender.py
+++ b/test/test_sender.py
@@ -1,20 +1,17 @@
# pylint: skip-file
from __future__ import absolute_import
-import io
-
import pytest
+import io
from kafka.client_async import KafkaClient
from kafka.cluster import ClusterMetadata
-import kafka.errors as Errors
-from kafka.future import Future
from kafka.metrics import Metrics
-from kafka.producer.buffer import MessageSetBuffer
from kafka.protocol.produce import ProduceRequest
-from kafka.producer.record_accumulator import RecordAccumulator, RecordBatch
+from kafka.producer.record_accumulator import RecordAccumulator, ProducerBatch
from kafka.producer.sender import Sender
-from kafka.structs import TopicPartition, OffsetAndMetadata
+from kafka.record.memory_records import MemoryRecordsBuilder
+from kafka.structs import TopicPartition
@pytest.fixture
@@ -47,7 +44,10 @@ def sender(client, accumulator, metrics):
def test_produce_request(sender, mocker, api_version, produce_version):
sender.config['api_version'] = api_version
tp = TopicPartition('foo', 0)
- records = MessageSetBuffer(io.BytesIO(), 100000)
- batch = RecordBatch(tp, records)
+ buffer = io.BytesIO()
+ records = MemoryRecordsBuilder(
+ magic=1, compression_type=0, batch_size=100000)
+ batch = ProducerBatch(tp, records, buffer)
+ records.close()
produce_request = sender._produce_request(0, 0, 0, [batch])
assert isinstance(produce_request, ProduceRequest[produce_version])