summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTaras Voinarovskyi <voyn1991@gmail.com>2017-10-22 16:56:28 +0900
committerGitHub <noreply@github.com>2017-10-22 16:56:28 +0900
commita345dcd2ca1b0f8934864c512a4a78c65034dd36 (patch)
tree0b7ea8c67b015f944b9a401f5e024a2eff7c7db9
parent4dbf34abce9b4addbb304520e2f692fbaef60ae5 (diff)
downloadkafka-python-a345dcd2ca1b0f8934864c512a4a78c65034dd36.tar.gz
Fix timestamp not passed to RecordMetadata (#1273)
* Fix timestamp not being passed to RecordMetadata properly * Add more tests for LegacyBatch * Fix producer test for recordmetadata
-rw-r--r--kafka/producer/future.py4
-rw-r--r--kafka/producer/record_accumulator.py11
-rw-r--r--kafka/record/legacy_records.py48
-rw-r--r--kafka/record/memory_records.py9
-rw-r--r--test/record/test_legacy_records.py93
-rw-r--r--test/test_producer.py54
6 files changed, 195 insertions, 24 deletions
diff --git a/kafka/producer/future.py b/kafka/producer/future.py
index bc50d0d..e39a0a9 100644
--- a/kafka/producer/future.py
+++ b/kafka/producer/future.py
@@ -44,7 +44,9 @@ class FutureRecordMetadata(Future):
(relative_offset, timestamp_ms, checksum,
serialized_key_size, serialized_value_size) = self.args
- if produce_timestamp_ms is not None:
+ # None is when Broker does not support the API (<0.10) and
+ # -1 is when the broker is configured for CREATE_TIME timestamps
+ if produce_timestamp_ms is not None and produce_timestamp_ms != -1:
timestamp_ms = produce_timestamp_ms
if offset != -1 and relative_offset is not None:
offset += relative_offset
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 716ae65..5158474 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -56,15 +56,14 @@ class ProducerBatch(object):
return self.records.next_offset()
def try_append(self, timestamp_ms, key, value):
- offset = self.records.next_offset()
- checksum, record_size = self.records.append(timestamp_ms, key, value)
- if record_size == 0:
+ metadata = self.records.append(timestamp_ms, key, value)
+ if metadata is None:
return None
- self.max_record_size = max(self.max_record_size, record_size)
+ self.max_record_size = max(self.max_record_size, metadata.size)
self.last_append = time.time()
- future = FutureRecordMetadata(self.produce_future, offset,
- timestamp_ms, checksum,
+ future = FutureRecordMetadata(self.produce_future, metadata.offset,
+ metadata.timestamp, metadata.crc,
len(key) if key is not None else -1,
len(value) if value is not None else -1)
return future
diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py
index 98c8e30..055914c 100644
--- a/kafka/record/legacy_records.py
+++ b/kafka/record/legacy_records.py
@@ -110,6 +110,8 @@ class LegacyRecordBase(object):
LOG_APPEND_TIME = 1
CREATE_TIME = 0
+ NO_TIMESTAMP = -1
+
class LegacyRecordBatch(ABCRecordBatch, LegacyRecordBase):
@@ -333,10 +335,14 @@ class LegacyRecordBatchBuilder(ABCRecordBatchBuilder, LegacyRecordBase):
# Check types
if type(offset) != int:
raise TypeError(offset)
- if timestamp is None:
+ if self._magic == 0:
+ timestamp = self.NO_TIMESTAMP
+ elif timestamp is None:
timestamp = int(time.time() * 1000)
elif type(timestamp) != int:
- raise TypeError(timestamp)
+ raise TypeError(
+ "`timestamp` should be int, but {} provided".format(
+ type(timestamp)))
if not (key is None or
isinstance(key, (bytes, bytearray, memoryview))):
raise TypeError(
@@ -351,7 +357,7 @@ class LegacyRecordBatchBuilder(ABCRecordBatchBuilder, LegacyRecordBase):
size = self.size_in_bytes(offset, timestamp, key, value)
# We always allow at least one record to be appended
if offset != 0 and pos + size >= self._batch_size:
- return None, 0
+ return None
# Allocate proper buffer length
self._buffer.extend(bytearray(size))
@@ -359,7 +365,7 @@ class LegacyRecordBatchBuilder(ABCRecordBatchBuilder, LegacyRecordBase):
# Encode message
crc = self._encode_msg(pos, offset, timestamp, key, value)
- return crc, size
+ return LegacyRecordMetadata(offset, crc, size, timestamp)
def _encode_msg(self, start_pos, offset, timestamp, key, value,
attributes=0):
@@ -484,3 +490,37 @@ class LegacyRecordBatchBuilder(ABCRecordBatchBuilder, LegacyRecordBase):
cls.record_size(magic, key, value)
)
return cls.LOG_OVERHEAD + cls.record_size(magic, key, value)
+
+
+class LegacyRecordMetadata(object):
+
+ __slots__ = ("_crc", "_size", "_timestamp", "_offset")
+
+ def __init__(self, offset, crc, size, timestamp):
+ self._offset = offset
+ self._crc = crc
+ self._size = size
+ self._timestamp = timestamp
+
+ @property
+ def offset(self):
+ return self._offset
+
+ @property
+ def crc(self):
+ return self._crc
+
+ @property
+ def size(self):
+ return self._size
+
+ @property
+ def timestamp(self):
+ return self._timestamp
+
+ def __repr__(self):
+ return (
+ "LegacyRecordMetadata(offset={!r}, crc={!r}, size={!r},"
+ " timestamp={!r})".format(
+ self._offset, self._crc, self._size, self._timestamp)
+ )
diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py
index c6a28be..4ed992c 100644
--- a/kafka/record/memory_records.py
+++ b/kafka/record/memory_records.py
@@ -131,14 +131,13 @@ class MemoryRecordsBuilder(object):
return None, 0
offset = self._next_offset
- checksum, actual_size = self._builder.append(
- offset, timestamp, key, value)
+ metadata = self._builder.append(offset, timestamp, key, value)
# Return of 0 size means there's no space to add a new message
- if actual_size == 0:
- return None, 0
+ if metadata is None:
+ return None
self._next_offset += 1
- return checksum, actual_size
+ return metadata
def close(self):
# This method may be called multiple times on the same batch
diff --git a/test/record/test_legacy_records.py b/test/record/test_legacy_records.py
index 2d76695..ffe8a35 100644
--- a/test/record/test_legacy_records.py
+++ b/test/record/test_legacy_records.py
@@ -1,8 +1,8 @@
+from __future__ import unicode_literals
import pytest
from kafka.record.legacy_records import (
LegacyRecordBatch, LegacyRecordBatchBuilder
)
-from kafka.protocol.message import Message
@pytest.mark.parametrize("magic", [0, 1])
@@ -27,9 +27,9 @@ def test_read_write_serde_v0_v1_no_compression(magic):
@pytest.mark.parametrize("compression_type", [
- Message.CODEC_GZIP,
- Message.CODEC_SNAPPY,
- Message.CODEC_LZ4
+ LegacyRecordBatch.CODEC_GZIP,
+ LegacyRecordBatch.CODEC_SNAPPY,
+ LegacyRecordBatch.CODEC_LZ4
])
@pytest.mark.parametrize("magic", [0, 1])
def test_read_write_serde_v0_v1_with_compression(compression_type, magic):
@@ -43,14 +43,14 @@ def test_read_write_serde_v0_v1_with_compression(compression_type, magic):
batch = LegacyRecordBatch(bytes(buffer), magic)
msgs = list(batch)
- expected_checksum = (-2095076219 if magic else 278251978) & 0xffffffff
for offset, msg in enumerate(msgs):
assert msg.offset == offset
assert msg.timestamp == (9999999 if magic else None)
assert msg.timestamp_type == (0 if magic else None)
assert msg.key == b"test"
assert msg.value == b"Super"
- assert msg.checksum == expected_checksum
+ assert msg.checksum == (-2095076219 if magic else 278251978) & \
+ 0xffffffff
@pytest.mark.parametrize("magic", [0, 1])
@@ -83,3 +83,84 @@ def test_estimate_size_in_bytes_bigger_than_batch(magic):
buf = builder.build()
assert len(buf) <= estimate_size, \
"Estimate should always be upper bound"
+
+
+@pytest.mark.parametrize("magic", [0, 1])
+def test_legacy_batch_builder_validates_arguments(magic):
+ builder = LegacyRecordBatchBuilder(
+ magic=magic, compression_type=0, batch_size=1024 * 1024)
+
+ # Key should not be str
+ with pytest.raises(TypeError):
+ builder.append(
+ 0, timestamp=9999999, key="some string", value=None)
+
+ # Value should not be str
+ with pytest.raises(TypeError):
+ builder.append(
+ 0, timestamp=9999999, key=None, value="some string")
+
+ # Timestamp should be of proper type
+ if magic != 0:
+ with pytest.raises(TypeError):
+ builder.append(
+ 0, timestamp="1243812793", key=None, value=b"some string")
+
+ # Offset of invalid type
+ with pytest.raises(TypeError):
+ builder.append(
+ "0", timestamp=9999999, key=None, value=b"some string")
+
+ # Ok to pass value as None
+ builder.append(
+ 0, timestamp=9999999, key=b"123", value=None)
+
+ # Timestamp can be None
+ builder.append(
+ 1, timestamp=None, key=None, value=b"some string")
+
+ # Ok to pass offsets in not incremental order. This should not happen thou
+ builder.append(
+ 5, timestamp=9999999, key=b"123", value=None)
+
+ # in case error handling code fails to fix inner buffer in builder
+ assert len(builder.build()) == 119 if magic else 95
+
+
+@pytest.mark.parametrize("magic", [0, 1])
+def test_legacy_correct_metadata_response(magic):
+ builder = LegacyRecordBatchBuilder(
+ magic=magic, compression_type=0, batch_size=1024 * 1024)
+ meta = builder.append(
+ 0, timestamp=9999999, key=b"test", value=b"Super")
+
+ assert meta.offset == 0
+ assert meta.timestamp == (9999999 if magic else -1)
+ assert meta.crc == (-2095076219 if magic else 278251978) & 0xffffffff
+ assert repr(meta) == (
+ "LegacyRecordMetadata(offset=0, crc={}, size={}, "
+ "timestamp={})".format(meta.crc, meta.size, meta.timestamp)
+ )
+
+
+@pytest.mark.parametrize("magic", [0, 1])
+def test_legacy_batch_size_limit(magic):
+ # First message can be added even if it's too big
+ builder = LegacyRecordBatchBuilder(
+ magic=magic, compression_type=0, batch_size=1024)
+ meta = builder.append(0, timestamp=None, key=None, value=b"M" * 2000)
+ assert meta.size > 0
+ assert meta.crc is not None
+ assert meta.offset == 0
+ assert meta.timestamp is not None
+ assert len(builder.build()) > 2000
+
+ builder = LegacyRecordBatchBuilder(
+ magic=magic, compression_type=0, batch_size=1024)
+ meta = builder.append(0, timestamp=None, key=None, value=b"M" * 700)
+ assert meta is not None
+ meta = builder.append(1, timestamp=None, key=None, value=b"M" * 700)
+ assert meta is None
+ meta = builder.append(2, timestamp=None, key=None, value=b"M" * 700)
+ assert meta is None
+ assert len(builder.build()) < 1000
diff --git a/test/test_producer.py b/test/test_producer.py
index 1f6608a..41bd52e 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -1,11 +1,11 @@
import gc
import platform
-import sys
+import time
import threading
import pytest
-from kafka import KafkaConsumer, KafkaProducer
+from kafka import KafkaConsumer, KafkaProducer, TopicPartition
from kafka.producer.buffer import SimpleBufferPool
from test.conftest import version
from test.testutil import random_string
@@ -78,3 +78,53 @@ def test_kafka_producer_gc_cleanup():
del(producer)
gc.collect()
assert threading.active_count() == threads
+
+
+@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
+@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4'])
+def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
+ connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
+ producer = KafkaProducer(bootstrap_servers=connect_str,
+ retries=5,
+ max_block_ms=10000,
+ compression_type=compression)
+ if producer.config['api_version'] >= (0, 10):
+ magic = 1
+ else:
+ magic = 0
+
+ topic = random_string(5)
+ future = producer.send(
+ topic,
+ value=b"Simple value", key=b"Simple key", timestamp_ms=9999999,
+ partition=0)
+ record = future.get(timeout=5)
+ assert record is not None
+ assert record.topic == topic
+ assert record.partition == 0
+ assert record.topic_partition == TopicPartition(topic, 0)
+ assert record.offset == 0
+ if magic >= 1:
+ assert record.timestamp == 9999999
+ else:
+ assert record.timestamp == -1 # NO_TIMESTAMP
+
+ if magic == 1:
+ assert record.checksum == 1370034956
+ else:
+ assert record.checksum == 3296137851
+
+ assert record.serialized_key_size == 10
+ assert record.serialized_value_size == 12
+
+ # generated timestamp case is skipped for broker 0.9 and below
+ if magic == 0:
+ return
+
+ send_time = time.time() * 1000
+ future = producer.send(
+ topic,
+ value=b"Simple value", key=b"Simple key", timestamp_ms=None,
+ partition=0)
+ record = future.get(timeout=5)
+ assert abs(record.timestamp - send_time) <= 1000 # Allow 1s deviation