diff options
author | Taras Voinarovskyi <voyn1991@gmail.com> | 2017-10-22 16:56:28 +0900 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-22 16:56:28 +0900 |
commit | a345dcd2ca1b0f8934864c512a4a78c65034dd36 (patch) | |
tree | 0b7ea8c67b015f944b9a401f5e024a2eff7c7db9 | |
parent | 4dbf34abce9b4addbb304520e2f692fbaef60ae5 (diff) | |
download | kafka-python-a345dcd2ca1b0f8934864c512a4a78c65034dd36.tar.gz |
Fix timestamp not passed to RecordMetadata (#1273)
* Fix timestamp not being passed to RecordMetadata properly
* Add more tests for LegacyBatch
* Fix producer test for recordmetadata
-rw-r--r-- | kafka/producer/future.py | 4 | ||||
-rw-r--r-- | kafka/producer/record_accumulator.py | 11 | ||||
-rw-r--r-- | kafka/record/legacy_records.py | 48 | ||||
-rw-r--r-- | kafka/record/memory_records.py | 9 | ||||
-rw-r--r-- | test/record/test_legacy_records.py | 93 | ||||
-rw-r--r-- | test/test_producer.py | 54 |
6 files changed, 195 insertions, 24 deletions
diff --git a/kafka/producer/future.py b/kafka/producer/future.py index bc50d0d..e39a0a9 100644 --- a/kafka/producer/future.py +++ b/kafka/producer/future.py @@ -44,7 +44,9 @@ class FutureRecordMetadata(Future): (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size) = self.args - if produce_timestamp_ms is not None: + # None is when Broker does not support the API (<0.10) and + # -1 is when the broker is configured for CREATE_TIME timestamps + if produce_timestamp_ms is not None and produce_timestamp_ms != -1: timestamp_ms = produce_timestamp_ms if offset != -1 and relative_offset is not None: offset += relative_offset diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 716ae65..5158474 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -56,15 +56,14 @@ class ProducerBatch(object): return self.records.next_offset() def try_append(self, timestamp_ms, key, value): - offset = self.records.next_offset() - checksum, record_size = self.records.append(timestamp_ms, key, value) - if record_size == 0: + metadata = self.records.append(timestamp_ms, key, value) + if metadata is None: return None - self.max_record_size = max(self.max_record_size, record_size) + self.max_record_size = max(self.max_record_size, metadata.size) self.last_append = time.time() - future = FutureRecordMetadata(self.produce_future, offset, - timestamp_ms, checksum, + future = FutureRecordMetadata(self.produce_future, metadata.offset, + metadata.timestamp, metadata.crc, len(key) if key is not None else -1, len(value) if value is not None else -1) return future diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py index 98c8e30..055914c 100644 --- a/kafka/record/legacy_records.py +++ b/kafka/record/legacy_records.py @@ -110,6 +110,8 @@ class LegacyRecordBase(object): LOG_APPEND_TIME = 1 CREATE_TIME = 0 + NO_TIMESTAMP = -1 + class LegacyRecordBatch(ABCRecordBatch, LegacyRecordBase): @@ -333,10 +335,14 @@ class LegacyRecordBatchBuilder(ABCRecordBatchBuilder, LegacyRecordBase): # Check types if type(offset) != int: raise TypeError(offset) - if timestamp is None: + if self._magic == 0: + timestamp = self.NO_TIMESTAMP + elif timestamp is None: timestamp = int(time.time() * 1000) elif type(timestamp) != int: - raise TypeError(timestamp) + raise TypeError( + "`timestamp` should be int, but {} provided".format( + type(timestamp))) if not (key is None or isinstance(key, (bytes, bytearray, memoryview))): raise TypeError( @@ -351,7 +357,7 @@ class LegacyRecordBatchBuilder(ABCRecordBatchBuilder, LegacyRecordBase): size = self.size_in_bytes(offset, timestamp, key, value) # We always allow at least one record to be appended if offset != 0 and pos + size >= self._batch_size: - return None, 0 + return None # Allocate proper buffer length self._buffer.extend(bytearray(size)) @@ -359,7 +365,7 @@ class LegacyRecordBatchBuilder(ABCRecordBatchBuilder, LegacyRecordBase): # Encode message crc = self._encode_msg(pos, offset, timestamp, key, value) - return crc, size + return LegacyRecordMetadata(offset, crc, size, timestamp) def _encode_msg(self, start_pos, offset, timestamp, key, value, attributes=0): @@ -484,3 +490,37 @@ class LegacyRecordBatchBuilder(ABCRecordBatchBuilder, LegacyRecordBase): cls.record_size(magic, key, value) ) return cls.LOG_OVERHEAD + cls.record_size(magic, key, value) + + +class LegacyRecordMetadata(object): + + __slots__ = ("_crc", "_size", "_timestamp", "_offset") + + def __init__(self, offset, crc, size, timestamp): + self._offset = offset + self._crc = crc + self._size = size + self._timestamp = timestamp + + @property + def offset(self): + return self._offset + + @property + def crc(self): + return self._crc + + @property + def size(self): + return self._size + + @property + def timestamp(self): + return self._timestamp + + def __repr__(self): + return ( + "LegacyRecordMetadata(offset={!r}, crc={!r}, size={!r}," + " timestamp={!r})".format( + self._offset, self._crc, self._size, self._timestamp) + ) diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py index c6a28be..4ed992c 100644 --- a/kafka/record/memory_records.py +++ b/kafka/record/memory_records.py @@ -131,14 +131,13 @@ class MemoryRecordsBuilder(object): return None, 0 offset = self._next_offset - checksum, actual_size = self._builder.append( - offset, timestamp, key, value) + metadata = self._builder.append(offset, timestamp, key, value) # Return of 0 size means there's no space to add a new message - if actual_size == 0: - return None, 0 + if metadata is None: + return None self._next_offset += 1 - return checksum, actual_size + return metadata def close(self): # This method may be called multiple times on the same batch diff --git a/test/record/test_legacy_records.py b/test/record/test_legacy_records.py index 2d76695..ffe8a35 100644 --- a/test/record/test_legacy_records.py +++ b/test/record/test_legacy_records.py @@ -1,8 +1,8 @@ +from __future__ import unicode_literals import pytest from kafka.record.legacy_records import ( LegacyRecordBatch, LegacyRecordBatchBuilder ) -from kafka.protocol.message import Message @pytest.mark.parametrize("magic", [0, 1]) @@ -27,9 +27,9 @@ def test_read_write_serde_v0_v1_no_compression(magic): @pytest.mark.parametrize("compression_type", [ - Message.CODEC_GZIP, - Message.CODEC_SNAPPY, - Message.CODEC_LZ4 + LegacyRecordBatch.CODEC_GZIP, + LegacyRecordBatch.CODEC_SNAPPY, + LegacyRecordBatch.CODEC_LZ4 ]) @pytest.mark.parametrize("magic", [0, 1]) def test_read_write_serde_v0_v1_with_compression(compression_type, magic): @@ -43,14 +43,14 @@ def test_read_write_serde_v0_v1_with_compression(compression_type, magic): batch = LegacyRecordBatch(bytes(buffer), magic) msgs = list(batch) - expected_checksum = (-2095076219 if magic else 278251978) & 0xffffffff for offset, msg in enumerate(msgs): assert msg.offset == offset assert msg.timestamp == (9999999 if magic else None) assert msg.timestamp_type == (0 if magic else None) assert msg.key == b"test" assert msg.value == b"Super" - assert msg.checksum == expected_checksum + assert msg.checksum == (-2095076219 if magic else 278251978) & \ + 0xffffffff @pytest.mark.parametrize("magic", [0, 1]) @@ -83,3 +83,84 @@ def test_estimate_size_in_bytes_bigger_than_batch(magic): buf = builder.build() assert len(buf) <= estimate_size, \ "Estimate should always be upper bound" + + +@pytest.mark.parametrize("magic", [0, 1]) +def test_legacy_batch_builder_validates_arguments(magic): + builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=0, batch_size=1024 * 1024) + + # Key should not be str + with pytest.raises(TypeError): + builder.append( + 0, timestamp=9999999, key="some string", value=None) + + # Value should not be str + with pytest.raises(TypeError): + builder.append( + 0, timestamp=9999999, key=None, value="some string") + + # Timestamp should be of proper type + if magic != 0: + with pytest.raises(TypeError): + builder.append( + 0, timestamp="1243812793", key=None, value=b"some string") + + # Offset of invalid type + with pytest.raises(TypeError): + builder.append( + "0", timestamp=9999999, key=None, value=b"some string") + + # Ok to pass value as None + builder.append( + 0, timestamp=9999999, key=b"123", value=None) + + # Timestamp can be None + builder.append( + 1, timestamp=None, key=None, value=b"some string") + + # Ok to pass offsets in not incremental order. This should not happen thou + builder.append( + 5, timestamp=9999999, key=b"123", value=None) + + # in case error handling code fails to fix inner buffer in builder + assert len(builder.build()) == 119 if magic else 95 + + +@pytest.mark.parametrize("magic", [0, 1]) +def test_legacy_correct_metadata_response(magic): + builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=0, batch_size=1024 * 1024) + meta = builder.append( + 0, timestamp=9999999, key=b"test", value=b"Super") + + assert meta.offset == 0 + assert meta.timestamp == (9999999 if magic else -1) + assert meta.crc == (-2095076219 if magic else 278251978) & 0xffffffff + assert repr(meta) == ( + "LegacyRecordMetadata(offset=0, crc={}, size={}, " + "timestamp={})".format(meta.crc, meta.size, meta.timestamp) + ) + + +@pytest.mark.parametrize("magic", [0, 1]) +def test_legacy_batch_size_limit(magic): + # First message can be added even if it's too big + builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=0, batch_size=1024) + meta = builder.append(0, timestamp=None, key=None, value=b"M" * 2000) + assert meta.size > 0 + assert meta.crc is not None + assert meta.offset == 0 + assert meta.timestamp is not None + assert len(builder.build()) > 2000 + + builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=0, batch_size=1024) + meta = builder.append(0, timestamp=None, key=None, value=b"M" * 700) + assert meta is not None + meta = builder.append(1, timestamp=None, key=None, value=b"M" * 700) + assert meta is None + meta = builder.append(2, timestamp=None, key=None, value=b"M" * 700) + assert meta is None + assert len(builder.build()) < 1000 diff --git a/test/test_producer.py b/test/test_producer.py index 1f6608a..41bd52e 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -1,11 +1,11 @@ import gc import platform -import sys +import time import threading import pytest -from kafka import KafkaConsumer, KafkaProducer +from kafka import KafkaConsumer, KafkaProducer, TopicPartition from kafka.producer.buffer import SimpleBufferPool from test.conftest import version from test.testutil import random_string @@ -78,3 +78,53 @@ def test_kafka_producer_gc_cleanup(): del(producer) gc.collect() assert threading.active_count() == threads + + +@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") +@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4']) +def test_kafka_producer_proper_record_metadata(kafka_broker, compression): + connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) + producer = KafkaProducer(bootstrap_servers=connect_str, + retries=5, + max_block_ms=10000, + compression_type=compression) + if producer.config['api_version'] >= (0, 10): + magic = 1 + else: + magic = 0 + + topic = random_string(5) + future = producer.send( + topic, + value=b"Simple value", key=b"Simple key", timestamp_ms=9999999, + partition=0) + record = future.get(timeout=5) + assert record is not None + assert record.topic == topic + assert record.partition == 0 + assert record.topic_partition == TopicPartition(topic, 0) + assert record.offset == 0 + if magic >= 1: + assert record.timestamp == 9999999 + else: + assert record.timestamp == -1 # NO_TIMESTAMP + + if magic == 1: + assert record.checksum == 1370034956 + else: + assert record.checksum == 3296137851 + + assert record.serialized_key_size == 10 + assert record.serialized_value_size == 12 + + # generated timestamp case is skipped for broker 0.9 and below + if magic == 0: + return + + send_time = time.time() * 1000 + future = producer.send( + topic, + value=b"Simple value", key=b"Simple key", timestamp_ms=None, + partition=0) + record = future.get(timeout=5) + assert abs(record.timestamp - send_time) <= 1000 # Allow 1s deviation |