summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTaras <voyn1991@gmail.com>2017-10-10 00:13:16 +0300
committerTaras <voyn1991@gmail.com>2017-10-11 18:09:17 +0300
commitfbea5f04bccd28f3aa15a1711548b131504591ac (patch)
tree1c8a0efe687c2ace72fa146b4f03e15def8e3a95
parentf04435c5ed97fef0975a77a8dc7bae7c284bba63 (diff)
downloadkafka-python-fbea5f04bccd28f3aa15a1711548b131504591ac.tar.gz
Refactor MessageSet and Message into LegacyRecordBatch to later support v2 message format
-rw-r--r--kafka/consumer/fetcher.py99
-rw-r--r--kafka/errors.py7
-rw-r--r--kafka/producer/buffer.py126
-rw-r--r--kafka/producer/kafka.py43
-rw-r--r--kafka/producer/record_accumulator.py100
-rw-r--r--kafka/producer/sender.py1
-rw-r--r--kafka/protocol/fetch.py11
-rw-r--r--kafka/protocol/legacy.py6
-rw-r--r--kafka/protocol/message.py12
-rw-r--r--kafka/protocol/produce.py7
-rw-r--r--kafka/record/__init__.py3
-rw-r--r--kafka/record/abc.py119
-rw-r--r--kafka/record/legacy_records.py485
-rw-r--r--kafka/record/memory_records.py176
-rw-r--r--kafka/record/util.py8
-rw-r--r--test/record/test_legacy_records.py85
-rw-r--r--test/record/test_records.py108
-rw-r--r--test/test_buffer.py72
-rw-r--r--test/test_consumer_integration.py6
-rw-r--r--test/test_protocol.py5
-rw-r--r--test/test_sender.py18
21 files changed, 1142 insertions, 355 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index c4fa546..54a771a 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -13,10 +13,10 @@ import kafka.errors as Errors
from kafka.future import Future
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.fetch import FetchRequest
-from kafka.protocol.message import PartialMessage
from kafka.protocol.offset import (
OffsetRequest, OffsetResetStrategy, UNKNOWN_OFFSET
)
+from kafka.record import MemoryRecords
from kafka.serializer import Deserializer
from kafka.structs import TopicPartition, OffsetAndTimestamp
@@ -295,7 +295,7 @@ class Fetcher(six.Iterator):
Raises:
OffsetOutOfRangeError: if no subscription offset_reset_strategy
- InvalidMessageError: if message crc validation fails (check_crcs
+ CorruptRecordException: if message crc validation fails (check_crcs
must be set to True)
RecordTooLargeError: if a message is larger than the currently
configured max_partition_fetch_bytes
@@ -440,57 +440,25 @@ class Fetcher(six.Iterator):
self._next_partition_records = None
- def _unpack_message_set(self, tp, messages):
+ def _unpack_message_set(self, tp, records):
try:
- for offset, size, msg in messages:
- if self.config['check_crcs'] and not msg.validate_crc():
- raise Errors.InvalidMessageError(msg)
-
- if not msg.is_compressed():
- yield self._parse_record(tp, offset, msg.timestamp, msg)
-
- else:
- # If relative offset is used, we need to decompress the entire message first
- # to compute the absolute offset.
- inner_mset = msg.decompress()
-
- # There should only ever be a single layer of compression
- if inner_mset[0][-1].is_compressed():
- log.warning('MessageSet at %s offset %d appears '
- ' double-compressed. This should not'
- ' happen -- check your producers!',
- tp, offset)
- if self.config['skip_double_compressed_messages']:
- log.warning('Skipping double-compressed message at'
- ' %s %d', tp, offset)
- continue
-
- if msg.magic > 0:
- last_offset, _, _ = inner_mset[-1]
- absolute_base_offset = offset - last_offset
- else:
- absolute_base_offset = -1
-
- for inner_offset, inner_size, inner_msg in inner_mset:
- if msg.magic > 0:
- # When magic value is greater than 0, the timestamp
- # of a compressed message depends on the
- # typestamp type of the wrapper message:
-
- if msg.timestamp_type == 0: # CREATE_TIME (0)
- inner_timestamp = inner_msg.timestamp
-
- elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1)
- inner_timestamp = msg.timestamp
-
- else:
- raise ValueError('Unknown timestamp type: {0}'.format(msg.timestamp_type))
- else:
- inner_timestamp = msg.timestamp
-
- if absolute_base_offset >= 0:
- inner_offset += absolute_base_offset
- yield self._parse_record(tp, inner_offset, inner_timestamp, inner_msg)
+ batch = records.next_batch()
+ while batch is not None:
+ for record in batch:
+ key_size = len(record.key) if record.key is not None else -1
+ value_size = len(record.value) if record.value is not None else -1
+ key = self._deserialize(
+ self.config['key_deserializer'],
+ tp.topic, record.key)
+ value = self._deserialize(
+ self.config['value_deserializer'],
+ tp.topic, record.value)
+ yield ConsumerRecord(
+ tp.topic, tp.partition, record.offset, record.timestamp,
+ record.timestamp_type, key, value, record.checksum,
+ key_size, value_size)
+
+ batch = records.next_batch()
# If unpacking raises StopIteration, it is erroneously
# caught by the generator. We want all exceptions to be raised
@@ -505,15 +473,6 @@ class Fetcher(six.Iterator):
log.exception('AssertionError raised unpacking messageset: %s', e)
raise
- def _parse_record(self, tp, offset, timestamp, msg):
- key = self._deserialize(self.config['key_deserializer'], tp.topic, msg.key)
- value = self._deserialize(self.config['value_deserializer'], tp.topic, msg.value)
- return ConsumerRecord(tp.topic, tp.partition, offset,
- timestamp, msg.timestamp_type,
- key, value, msg.crc,
- len(msg.key) if msg.key is not None else -1,
- len(msg.value) if msg.value is not None else -1)
-
def __iter__(self): # pylint: disable=non-iterator-returned
return self
@@ -783,7 +742,7 @@ class Fetcher(six.Iterator):
error_code, highwater = completed_fetch.partition_data[:2]
error_type = Errors.for_code(error_code)
- messages = completed_fetch.partition_data[-1]
+ records = MemoryRecords(partition_data[-1])
try:
if not self._subscriptions.is_fetchable(tp):
@@ -807,21 +766,17 @@ class Fetcher(six.Iterator):
position)
return None
- partial = None
- if messages and isinstance(messages[-1][-1], PartialMessage):
- partial = messages.pop()
-
- if messages:
+ if records.has_next():
log.debug("Adding fetched record for partition %s with"
" offset %d to buffered record list", tp,
position)
- unpacked = list(self._unpack_message_set(tp, messages))
+ unpacked = list(self._unpack_message_set(tp, records))
parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked)
- last_offset, _, _ = messages[-1]
+ last_offset = unpacked[-1].offset
self._sensors.records_fetch_lag.record(highwater - last_offset)
- num_bytes = sum(msg[1] for msg in messages)
- records_count = len(messages)
- elif partial:
+ num_bytes = records.valid_bytes()
+ records_count = len(unpacked)
+ elif records.size_in_bytes() > 0:
# we did not read a single message from a non-empty
# buffer because that message's size is larger than
# fetch size, in this case record this exception
diff --git a/kafka/errors.py b/kafka/errors.py
index c72455a..4a409db 100644
--- a/kafka/errors.py
+++ b/kafka/errors.py
@@ -101,12 +101,15 @@ class OffsetOutOfRangeError(BrokerResponseError):
' maintained by the server for the given topic/partition.')
-class InvalidMessageError(BrokerResponseError):
+class CorruptRecordException(BrokerResponseError):
errno = 2
- message = 'INVALID_MESSAGE'
+ message = 'CORRUPT_MESSAGE'
description = ('This message has failed its CRC checksum, exceeds the'
' valid size, or is otherwise corrupt.')
+# Backward compatibility
+InvalidMessageError = CorruptRecordException
+
class UnknownTopicOrPartitionError(BrokerResponseError):
errno = 3
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py
index d1eeaf1..19ea732 100644
--- a/kafka/producer/buffer.py
+++ b/kafka/producer/buffer.py
@@ -5,133 +5,9 @@ import io
import threading
import time
-from ..codec import (has_gzip, has_snappy, has_lz4,
- gzip_encode, snappy_encode,
- lz4_encode, lz4_encode_old_kafka)
-from .. import errors as Errors
from ..metrics.stats import Rate
-from ..protocol.types import Int32, Int64
-from ..protocol.message import MessageSet, Message
-
-
-class MessageSetBuffer(object):
- """Wrap a buffer for writing MessageSet batches.
-
- Arguments:
- buf (IO stream): a buffer for writing data. Typically BytesIO.
- batch_size (int): maximum number of bytes to write to the buffer.
-
- Keyword Arguments:
- compression_type ('gzip', 'snappy', None): compress messages before
- publishing. Default: None.
- """
- _COMPRESSORS = {
- 'gzip': (has_gzip, gzip_encode, Message.CODEC_GZIP),
- 'snappy': (has_snappy, snappy_encode, Message.CODEC_SNAPPY),
- 'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4),
- 'lz4-old-kafka': (has_lz4, lz4_encode_old_kafka, Message.CODEC_LZ4),
- }
- def __init__(self, buf, batch_size, compression_type=None, message_version=0):
- if compression_type is not None:
- assert compression_type in self._COMPRESSORS, 'Unrecognized compression type'
-
- # Kafka 0.8/0.9 had a quirky lz4...
- if compression_type == 'lz4' and message_version == 0:
- compression_type = 'lz4-old-kafka'
-
- checker, encoder, attributes = self._COMPRESSORS[compression_type]
- assert checker(), 'Compression Libraries Not Found'
- self._compressor = encoder
- self._compression_attributes = attributes
- else:
- self._compressor = None
- self._compression_attributes = None
-
- self._message_version = message_version
- self._buffer = buf
- # Init MessageSetSize to 0 -- update on close
- self._buffer.seek(0)
- self._buffer.write(Int32.encode(0))
- self._batch_size = batch_size
- self._closed = False
- self._messages = 0
- self._bytes_written = 4 # Int32 header is 4 bytes
- self._final_size = None
-
- def append(self, offset, message):
- """Append a Message to the MessageSet.
-
- Arguments:
- offset (int): offset of the message
- message (Message or bytes): message struct or encoded bytes
-
- Returns: bytes written
- """
- if isinstance(message, Message):
- encoded = message.encode()
- else:
- encoded = bytes(message)
- msg = Int64.encode(offset) + Int32.encode(len(encoded)) + encoded
- self._buffer.write(msg)
- self._messages += 1
- self._bytes_written += len(msg)
- return len(msg)
-
- def has_room_for(self, key, value):
- if self._closed:
- return False
- if not self._messages:
- return True
- needed_bytes = MessageSet.HEADER_SIZE + Message.HEADER_SIZE
- if key is not None:
- needed_bytes += len(key)
- if value is not None:
- needed_bytes += len(value)
- return self._buffer.tell() + needed_bytes < self._batch_size
-
- def is_full(self):
- if self._closed:
- return True
- return self._buffer.tell() >= self._batch_size
-
- def close(self):
- # This method may be called multiple times on the same batch
- # i.e., on retries
- # we need to make sure we only close it out once
- # otherwise compressed messages may be double-compressed
- # see Issue 718
- if not self._closed:
- if self._compressor:
- # TODO: avoid copies with bytearray / memoryview
- uncompressed_size = self._buffer.tell()
- self._buffer.seek(4)
- msg = Message(self._compressor(self._buffer.read(uncompressed_size - 4)),
- attributes=self._compression_attributes,
- magic=self._message_version)
- encoded = msg.encode()
- self._buffer.seek(4)
- self._buffer.write(Int64.encode(0)) # offset 0 for wrapper msg
- self._buffer.write(Int32.encode(len(encoded)))
- self._buffer.write(encoded)
-
- # Update the message set size (less the 4 byte header),
- # and return with buffer ready for full read()
- self._final_size = self._buffer.tell()
- self._buffer.seek(0)
- self._buffer.write(Int32.encode(self._final_size - 4))
-
- self._buffer.seek(0)
- self._closed = True
-
- def size_in_bytes(self):
- return self._final_size or self._buffer.tell()
-
- def compression_rate(self):
- return self.size_in_bytes() / self._bytes_written
-
- def buffer(self):
- return self._buffer
+import kafka.errors as Errors
class SimpleBufferPool(object):
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index de9dcd2..f2a480b 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -12,9 +12,10 @@ from ..vendor import six
from .. import errors as Errors
from ..client_async import KafkaClient, selectors
+from ..codec import has_gzip, has_snappy, has_lz4
from ..metrics import MetricConfig, Metrics
from ..partitioner.default import DefaultPartitioner
-from ..protocol.message import Message, MessageSet
+from ..record.legacy_records import LegacyRecordBatchBuilder
from ..serializer import Serializer
from ..structs import TopicPartition
from .future import FutureRecordMetadata, FutureProduceResult
@@ -310,6 +311,13 @@ class KafkaProducer(object):
'sasl_plain_password': None,
}
+ _COMPRESSORS = {
+ 'gzip': (has_gzip, LegacyRecordBatchBuilder.CODEC_GZIP),
+ 'snappy': (has_snappy, LegacyRecordBatchBuilder.CODEC_SNAPPY),
+ 'lz4': (has_lz4, LegacyRecordBatchBuilder.CODEC_LZ4),
+ None: (lambda: True, LegacyRecordBatchBuilder.CODEC_NONE),
+ }
+
def __init__(self, **configs):
log.debug("Starting the Kafka producer") # trace
self.config = copy.copy(self.DEFAULT_CONFIG)
@@ -355,7 +363,16 @@ class KafkaProducer(object):
if self.config['compression_type'] == 'lz4':
assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers'
- message_version = 1 if self.config['api_version'] >= (0, 10) else 0
+ # Check compression_type for library support
+ ct = self.config['compression_type']
+ if ct not in self._COMPRESSORS:
+ raise ValueError("Not supported codec: {}".format(ct))
+ else:
+ checker, compression_attrs = self._COMPRESSORS[ct]
+ assert checker(), "Libraries for {} compression codec not found".format(ct)
+ self.config['compression_type'] = compression_attrs
+
+ message_version = self._max_usable_produce_magic()
self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config)
self._metadata = client.cluster
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
@@ -465,6 +482,17 @@ class KafkaProducer(object):
max_wait = self.config['max_block_ms'] / 1000.0
return self._wait_on_metadata(topic, max_wait)
+ def _max_usable_produce_magic(self):
+ if self.config['api_version'] >= (0, 10):
+ return 1
+ else:
+ return 0
+
+ def _estimate_size_in_bytes(self, key, value):
+ magic = self._max_usable_produce_magic()
+ return LegacyRecordBatchBuilder.estimate_size_in_bytes(
+ magic, self.config['compression_type'], key, value)
+
def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
"""Publish a message to a topic.
@@ -514,11 +542,7 @@ class KafkaProducer(object):
partition = self._partition(topic, partition, key, value,
key_bytes, value_bytes)
- message_size = MessageSet.HEADER_SIZE + Message.HEADER_SIZE
- if key_bytes is not None:
- message_size += len(key_bytes)
- if value_bytes is not None:
- message_size += len(value_bytes)
+ message_size = self._estimate_size_in_bytes(key, value)
self._ensure_valid_record_size(message_size)
tp = TopicPartition(topic, partition)
@@ -527,11 +551,12 @@ class KafkaProducer(object):
log.debug("Sending (key=%r value=%r) to %s", key, value, tp)
result = self._accumulator.append(tp, timestamp_ms,
key_bytes, value_bytes,
- self.config['max_block_ms'])
+ self.config['max_block_ms'],
+ estimated_size=message_size)
future, batch_is_full, new_batch_created = result
if batch_is_full or new_batch_created:
log.debug("Waking up the sender since %s is either full or"
- " getting a new batch", tp)
+ " getting a new batch", tp)
self._sender.wakeup()
return future
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index fa835f3..0c0ce27 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -7,10 +7,11 @@ import threading
import time
from .. import errors as Errors
-from ..protocol.message import Message, MessageSet
-from .buffer import MessageSetBuffer, SimpleBufferPool
+from .buffer import SimpleBufferPool
from .future import FutureRecordMetadata, FutureProduceResult
from ..structs import TopicPartition
+from kafka.record.memory_records import MemoryRecordsBuilder
+from kafka.record.legacy_records import LegacyRecordBatchBuilder
log = logging.getLogger(__name__)
@@ -35,9 +36,8 @@ class AtomicInteger(object):
return self._val
-class RecordBatch(object):
- def __init__(self, tp, records, message_version=0):
- self.record_count = 0
+class ProducerBatch(object):
+ def __init__(self, tp, records, buffer):
self.max_record_size = 0
now = time.time()
self.created = now
@@ -46,35 +46,33 @@ class RecordBatch(object):
self.last_attempt = now
self.last_append = now
self.records = records
- self.message_version = message_version
self.topic_partition = tp
self.produce_future = FutureProduceResult(tp)
self._retry = False
+ self._buffer = buffer # We only save it, we don't write to it
+
+ @property
+ def record_count(self):
+ return self.records.next_offset()
def try_append(self, timestamp_ms, key, value):
- if not self.records.has_room_for(key, value):
+ offset = self.records.next_offset()
+ checksum, record_size = self.records.append(timestamp_ms, key, value)
+ if record_size == 0:
return None
- if self.message_version == 0:
- msg = Message(value, key=key, magic=self.message_version)
- else:
- msg = Message(value, key=key, magic=self.message_version,
- timestamp=timestamp_ms)
- record_size = self.records.append(self.record_count, msg)
- checksum = msg.crc # crc is recalculated during records.append()
self.max_record_size = max(self.max_record_size, record_size)
self.last_append = time.time()
- future = FutureRecordMetadata(self.produce_future, self.record_count,
+ future = FutureRecordMetadata(self.produce_future, offset,
timestamp_ms, checksum,
len(key) if key is not None else -1,
len(value) if value is not None else -1)
- self.record_count += 1
return future
def done(self, base_offset=None, timestamp_ms=None, exception=None):
log.debug("Produced messages to topic-partition %s with base offset"
" %s and error %s.", self.topic_partition, base_offset,
- exception) # trace
+ exception) # trace
if self.produce_future.is_done:
log.warning('Batch is already closed -- ignoring batch.done()')
return
@@ -113,7 +111,7 @@ class RecordBatch(object):
self.records.close()
self.done(-1, None, Errors.KafkaTimeoutError(
"Batch for %s containing %s record(s) expired: %s" % (
- self.topic_partition, self.record_count, error)))
+ self.topic_partition, self.records.next_offset(), error)))
return True
return False
@@ -123,9 +121,12 @@ class RecordBatch(object):
def set_retry(self):
self._retry = True
+ def buffer(self):
+ return self._buffer
+
def __str__(self):
- return 'RecordBatch(topic_partition=%s, record_count=%d)' % (
- self.topic_partition, self.record_count)
+ return 'ProducerBatch(topic_partition=%s, record_count=%d)' % (
+ self.topic_partition, self.records.next_offset())
class RecordAccumulator(object):
@@ -148,8 +149,9 @@ class RecordAccumulator(object):
will block up to max_block_ms, raising an exception on timeout.
In the current implementation, this setting is an approximation.
Default: 33554432 (32MB)
- compression_type (str): The compression type for all data generated by
- the producer. Valid values are 'gzip', 'snappy', 'lz4', or None.
+ compression_type (int): The compression type for all data generated by
+ the producer. Valid values are gzip(1), snappy(2), lz4(3), or
+ none(0).
Compression is of full batches of data, so the efficacy of batching
will also impact the compression ratio (more batching means better
compression). Default: None.
@@ -174,28 +176,41 @@ class RecordAccumulator(object):
'metric_group_prefix': 'producer-metrics',
}
+ _COMPRESSORS = {
+ 'gzip': LegacyRecordBatchBuilder.CODEC_GZIP,
+ 'snappy': LegacyRecordBatchBuilder.CODEC_SNAPPY,
+ 'lz4': LegacyRecordBatchBuilder.CODEC_LZ4,
+ None: LegacyRecordBatchBuilder.CODEC_NONE
+ }
+
def __init__(self, **configs):
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs:
self.config[key] = configs.pop(key)
+ # Convert compression type to INT presentation. Mostly for unit tests,
+ # as Producer should pass already converted values.
+ ct = self.config["compression_type"]
+ self.config["compression_type"] = self._COMPRESSORS.get(ct, ct)
+
self._closed = False
self._flushes_in_progress = AtomicInteger()
self._appends_in_progress = AtomicInteger()
- self._batches = collections.defaultdict(collections.deque) # TopicPartition: [RecordBatch]
+ self._batches = collections.defaultdict(collections.deque) # TopicPartition: [ProducerBatch]
self._tp_locks = {None: threading.Lock()} # TopicPartition: Lock, plus a lock to add entries
self._free = SimpleBufferPool(self.config['buffer_memory'],
self.config['batch_size'],
metrics=self.config['metrics'],
metric_group_prefix=self.config['metric_group_prefix'])
- self._incomplete = IncompleteRecordBatches()
+ self._incomplete = IncompleteProducerBatches()
# The following variables should only be accessed by the sender thread,
# so we don't need to protect them w/ locking.
self.muted = set()
self._drain_index = 0
- def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms):
+ def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms,
+ estimated_size=0):
"""Add a record to the accumulator, return the append result.
The append result will contain the future metadata, and flag for
@@ -215,8 +230,8 @@ class RecordAccumulator(object):
"""
assert isinstance(tp, TopicPartition), 'not TopicPartition'
assert not self._closed, 'RecordAccumulator is closed'
- # We keep track of the number of appending thread to make sure we do not miss batches in
- # abortIncompleteBatches().
+ # We keep track of the number of appending thread to make sure we do
+ # not miss batches in abortIncompleteBatches().
self._appends_in_progress.increment()
try:
if tp not in self._tp_locks:
@@ -234,15 +249,7 @@ class RecordAccumulator(object):
batch_is_full = len(dq) > 1 or last.records.is_full()
return future, batch_is_full, False
- # we don't have an in-progress record batch try to allocate a new batch
- message_size = MessageSet.HEADER_SIZE + Message.HEADER_SIZE
- if key is not None:
- message_size += len(key)
- if value is not None:
- message_size += len(value)
- assert message_size <= self.config['buffer_memory'], 'message too big'
-
- size = max(self.config['batch_size'], message_size)
+ size = max(self.config['batch_size'], estimated_size)
log.debug("Allocating a new %d byte message buffer for %s", size, tp) # trace
buf = self._free.allocate(size, max_time_to_block_ms)
with self._tp_locks[tp]:
@@ -260,10 +267,13 @@ class RecordAccumulator(object):
batch_is_full = len(dq) > 1 or last.records.is_full()
return future, batch_is_full, False
- records = MessageSetBuffer(buf, self.config['batch_size'],
- self.config['compression_type'],
- self.config['message_version'])
- batch = RecordBatch(tp, records, self.config['message_version'])
+ records = MemoryRecordsBuilder(
+ self.config['message_version'],
+ self.config['compression_type'],
+ self.config['batch_size']
+ )
+
+ batch = ProducerBatch(tp, records, buf)
future = batch.try_append(timestamp_ms, key, value)
if not future:
raise Exception()
@@ -285,7 +295,7 @@ class RecordAccumulator(object):
cluster (ClusterMetadata): current metadata for kafka cluster
Returns:
- list of RecordBatch that were expired
+ list of ProducerBatch that were expired
"""
expired_batches = []
to_remove = []
@@ -449,7 +459,7 @@ class RecordAccumulator(object):
max_size (int): maximum number of bytes to drain
Returns:
- dict: {node_id: list of RecordBatch} with total size less than the
+ dict: {node_id: list of ProducerBatch} with total size less than the
requested max_size.
"""
if not nodes:
@@ -505,7 +515,7 @@ class RecordAccumulator(object):
def deallocate(self, batch):
"""Deallocate the record batch."""
self._incomplete.remove(batch)
- self._free.deallocate(batch.records.buffer())
+ self._free.deallocate(batch.buffer())
def _flush_in_progress(self):
"""Are there any threads currently waiting on a flush?"""
@@ -571,8 +581,8 @@ class RecordAccumulator(object):
self._closed = True
-class IncompleteRecordBatches(object):
- """A threadsafe helper class to hold RecordBatches that haven't been ack'd yet"""
+class IncompleteProducerBatches(object):
+ """A threadsafe helper class to hold ProducerBatches that haven't been ack'd yet"""
def __init__(self):
self._incomplete = set()
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 679efb0..72a15bb 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -288,7 +288,6 @@ class Sender(threading.Thread):
topic = batch.topic_partition.topic
partition = batch.topic_partition.partition
- # TODO: bytearray / memoryview
buf = batch.records.buffer()
produce_records_by_partition[topic][partition] = buf
diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py
index 359f197..0b03845 100644
--- a/kafka/protocol/fetch.py
+++ b/kafka/protocol/fetch.py
@@ -1,8 +1,7 @@
from __future__ import absolute_import
from .api import Request, Response
-from .message import MessageSet
-from .types import Array, Int8, Int16, Int32, Int64, Schema, String
+from .types import Array, Int8, Int16, Int32, Int64, Schema, String, Bytes
class FetchResponse_v0(Response):
@@ -15,7 +14,7 @@ class FetchResponse_v0(Response):
('partition', Int32),
('error_code', Int16),
('highwater_offset', Int64),
- ('message_set', MessageSet)))))
+ ('message_set', Bytes)))))
)
@@ -30,7 +29,7 @@ class FetchResponse_v1(Response):
('partition', Int32),
('error_code', Int16),
('highwater_offset', Int64),
- ('message_set', MessageSet)))))
+ ('message_set', Bytes)))))
)
@@ -61,7 +60,7 @@ class FetchResponse_v4(Response):
('aborted_transactions', Array(
('producer_id', Int64),
('first_offset', Int64))),
- ('message_set', MessageSet)))))
+ ('message_set', Bytes)))))
)
@@ -81,7 +80,7 @@ class FetchResponse_v5(Response):
('aborted_transactions', Array(
('producer_id', Int64),
('first_offset', Int64))),
- ('message_set', MessageSet)))))
+ ('message_set', Bytes)))))
)
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
index 37145b7..b8f84e7 100644
--- a/kafka/protocol/legacy.py
+++ b/kafka/protocol/legacy.py
@@ -19,6 +19,7 @@ from kafka.structs import ConsumerMetadataResponse
from kafka.util import (
crc32, read_short_string, relative_unpack,
write_int_string, group_by_topic_and_partition)
+from kafka.protocol.message import MessageSet
log = logging.getLogger(__name__)
@@ -144,7 +145,7 @@ class KafkaProtocol(object):
magic=msg.magic, attributes=msg.attributes
)
partition_msgs.append((0, m.encode()))
- topic_msgs.append((partition, partition_msgs))
+ topic_msgs.append((partition, MessageSet.encode(partition_msgs, prepend_size=False)))
topics.append((topic, topic_msgs))
@@ -215,7 +216,8 @@ class KafkaProtocol(object):
]
@classmethod
- def decode_message_set(cls, messages):
+ def decode_message_set(cls, raw_data):
+ messages = MessageSet.decode(raw_data, bytes_to_read=len(raw_data))
for offset, _, message in messages:
if isinstance(message, kafka.protocol.message.Message) and message.is_compressed():
inner_messages = message.decompress()
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index 70d5b36..f5a51a9 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -154,12 +154,13 @@ class MessageSet(AbstractType):
HEADER_SIZE = 12 # offset + message_size
@classmethod
- def encode(cls, items):
+ def encode(cls, items, prepend_size=True):
# RecordAccumulator encodes messagesets internally
if isinstance(items, (io.BytesIO, KafkaBytes)):
size = Int32.decode(items)
- # rewind and return all the bytes
- items.seek(items.tell() - 4)
+ if prepend_size:
+ # rewind and return all the bytes
+ items.seek(items.tell() - 4)
return items.read(size + 4)
encoded_values = []
@@ -167,7 +168,10 @@ class MessageSet(AbstractType):
encoded_values.append(Int64.encode(offset))
encoded_values.append(Bytes.encode(message))
encoded = b''.join(encoded_values)
- return Bytes.encode(encoded)
+ if prepend_size:
+ return Bytes.encode(encoded)
+ else:
+ return encoded
@classmethod
def decode(cls, data, bytes_to_read=None):
diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py
index da1f308..34ff949 100644
--- a/kafka/protocol/produce.py
+++ b/kafka/protocol/produce.py
@@ -1,8 +1,7 @@
from __future__ import absolute_import
from .api import Request, Response
-from .message import MessageSet
-from .types import Int16, Int32, Int64, String, Array, Schema
+from .types import Int16, Int32, Int64, String, Array, Schema, Bytes
class ProduceResponse_v0(Response):
@@ -64,7 +63,7 @@ class ProduceRequest_v0(Request):
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
- ('messages', MessageSet)))))
+ ('messages', Bytes)))))
)
def expect_response(self):
@@ -109,7 +108,7 @@ class ProduceRequest_v3(Request):
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
- ('messages', MessageSet)))))
+ ('messages', Bytes)))))
)
def expect_response(self):
diff --git a/kafka/record/__init__.py b/kafka/record/__init__.py
new file mode 100644
index 0000000..4c75acb
--- /dev/null
+++ b/kafka/record/__init__.py
@@ -0,0 +1,3 @@
+from .memory_records import MemoryRecords
+
+__all__ = ["MemoryRecords"]
diff --git a/kafka/record/abc.py b/kafka/record/abc.py
new file mode 100644
index 0000000..4f14d76
--- /dev/null
+++ b/kafka/record/abc.py
@@ -0,0 +1,119 @@
+from __future__ import absolute_import
+import abc
+
+
+class ABCRecord(object):
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractproperty
+ def offset(self):
+ """ Absolute offset of record
+ """
+
+ @abc.abstractproperty
+ def timestamp(self):
+ """ Epoch milliseconds
+ """
+
+ @abc.abstractproperty
+ def timestamp_type(self):
+ """ CREATE_TIME(0) or APPEND_TIME(1)
+ """
+
+ @abc.abstractproperty
+ def key(self):
+ """ Bytes key or None
+ """
+
+ @abc.abstractproperty
+ def value(self):
+ """ Bytes value or None
+ """
+
+ @abc.abstractproperty
+ def checksum(self):
+ """ Prior to v2 format CRC was contained in every message. This will
+ be the checksum for v0 and v1 and None for v2 and above.
+ """
+
+ @abc.abstractproperty
+ def headers(self):
+ """ If supported by version list of key-value tuples, or empty list if
+ not supported by format.
+ """
+
+
+class ABCRecordBatchBuilder(object):
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def append(self, offset, timestamp, key, value, headers):
+ """ Writes record to internal buffer.
+
+ Arguments:
+ offset (int): Relative offset of record, starting from 0
+ timestamp (int): Timestamp in milliseconds since beginning of the
+ epoch (midnight Jan 1, 1970 (UTC))
+ key (bytes or None): Key of the record
+ value (bytes or None): Value of the record
+ headers (List[Tuple[str, bytes]]): Headers of the record. Header
+ keys can not be ``None``.
+
+ Returns:
+ (bytes, int): Checksum of the written record (or None for v2 and
+ above) and size of the written record.
+ """
+
+ @abc.abstractmethod
+ def size_in_bytes(self, offset, timestamp, key, value, headers):
+ """ Return the expected size change on buffer (uncompressed) if we add
+ this message. This will account for varint size changes and give a
+ reliable size.
+ """
+
+ @abc.abstractmethod
+ def build(self):
+ """ Close for append, compress if needed, write size and header and
+ return a ready to send bytes object.
+
+ Return:
+ io.BytesIO: finished batch, ready to send.
+ """
+
+
+class ABCRecordBatch(object):
+ """ For v2 incapsulates a RecordBatch, for v0/v1 a single (maybe
+ compressed) message.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def __iter__(self):
+ """ Return iterator over records (ABCRecord instances). Will decompress
+ if needed.
+ """
+
+
+class ABCRecords(object):
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def __init__(self, buffer):
+ """ Initialize with bytes-like object conforming to the buffer
+ interface (ie. bytes, bytearray, memoryview etc.).
+ """
+
+ @abc.abstractmethod
+ def size_in_bytes(self):
+ """ Returns the size of buffer.
+ """
+
+ @abc.abstractmethod
+ def next_batch(self):
+ """ Return next batch of records (ABCRecordBatch instances).
+ """
+
+ @abc.abstractmethod
+ def has_next(self):
+ """ True if there are more batches to read, False otherwise.
+ """
diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py
new file mode 100644
index 0000000..3d9822d
--- /dev/null
+++ b/kafka/record/legacy_records.py
@@ -0,0 +1,485 @@
+# See:
+# https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/\
+# apache/kafka/common/record/LegacyRecord.java
+
+# Builder and reader implementation for V0 and V1 record versions. As of Kafka
+# 0.11.0.0 those were replaced with V2, thus the Legacy naming.
+
+# The schema is given below (see
+# https://kafka.apache.org/protocol#protocol_message_sets for more details):
+
+# MessageSet => [Offset MessageSize Message]
+# Offset => int64
+# MessageSize => int32
+
+# v0
+# Message => Crc MagicByte Attributes Key Value
+# Crc => int32
+# MagicByte => int8
+# Attributes => int8
+# Key => bytes
+# Value => bytes
+
+# v1 (supported since 0.10.0)
+# Message => Crc MagicByte Attributes Key Value
+# Crc => int32
+# MagicByte => int8
+# Attributes => int8
+# Timestamp => int64
+# Key => bytes
+# Value => bytes
+
+# The message attribute bits are given below:
+# * Unused (4-7)
+# * Timestamp Type (3) (added in V1)
+# * Compression Type (0-2)
+
+# Note that when compression is enabled (see attributes above), the whole
+# array of MessageSet's is compressed and places into a message as the `value`.
+# Only the parent message is marked with `compression` bits in attributes.
+
+# The CRC covers the data from the Magic byte to the end of the message.
+
+
+import struct
+import time
+
+from .abc import ABCRecord, ABCRecordBatch, ABCRecordBatchBuilder
+from .util import calc_crc32
+
+from kafka.codec import (
+ gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka,
+ gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka
+)
+from kafka.errors import CorruptRecordException
+
+
+class LegacyRecordBase(object):
+
+ HEADER_STRUCT_V0 = struct.Struct(
+ ">q" # BaseOffset => Int64
+ "i" # Length => Int32
+ "I" # CRC => Int32
+ "b" # Magic => Int8
+ "b" # Attributes => Int8
+ )
+ HEADER_STRUCT_V1 = struct.Struct(
+ ">q" # BaseOffset => Int64
+ "i" # Length => Int32
+ "I" # CRC => Int32
+ "b" # Magic => Int8
+ "b" # Attributes => Int8
+ "q" # timestamp => Int64
+ )
+
+ LOG_OVERHEAD = CRC_OFFSET = struct.calcsize(
+ ">q" # Offset
+ "i" # Size
+ )
+ MAGIC_OFFSET = LOG_OVERHEAD + struct.calcsize(
+ ">I" # CRC
+ )
+ # Those are used for fast size calculations
+ RECORD_OVERHEAD_V0 = struct.calcsize(
+ ">I" # CRC
+ "b" # magic
+ "b" # attributes
+ "i" # Key length
+ "i" # Value length
+ )
+ RECORD_OVERHEAD_V1 = struct.calcsize(
+ ">I" # CRC
+ "b" # magic
+ "b" # attributes
+ "q" # timestamp
+ "i" # Key length
+ "i" # Value length
+ )
+
+ KEY_OFFSET_V0 = HEADER_STRUCT_V0.size
+ KEY_OFFSET_V1 = HEADER_STRUCT_V1.size
+ KEY_LENGTH = VALUE_LENGTH = struct.calcsize(">i") # Bytes length is Int32
+
+ CODEC_MASK = 0x07
+ CODEC_NONE = 0x00
+ CODEC_GZIP = 0x01
+ CODEC_SNAPPY = 0x02
+ CODEC_LZ4 = 0x03
+ TIMESTAMP_TYPE_MASK = 0x08
+
+ LOG_APPEND_TIME = 1
+ CREATE_TIME = 0
+
+
+class LegacyRecordBatch(ABCRecordBatch, LegacyRecordBase):
+
+ def __init__(self, buffer, magic):
+ self._buffer = memoryview(buffer)
+ self._magic = magic
+
+ offset, length, crc, magic_, attrs, timestamp = self._read_header(0)
+ assert length == len(buffer) - self.LOG_OVERHEAD
+ assert magic == magic_
+
+ self._offset = offset
+ self._crc = crc
+ self._timestamp = timestamp
+ self._attributes = attrs
+ self._decompressed = False
+
+ @property
+ def timestamp_type(self):
+ """0 for CreateTime; 1 for LogAppendTime; None if unsupported.
+
+ Value is determined by broker; produced messages should always set to 0
+ Requires Kafka >= 0.10 / message version >= 1
+ """
+ if self._magic == 0:
+ return None
+ elif self._attributes & self.TIMESTAMP_TYPE_MASK:
+ return 1
+ else:
+ return 0
+
+ @property
+ def compression_type(self):
+ return self._attributes & self.CODEC_MASK
+
+ def validate_crc(self):
+ crc = calc_crc32(self._buffer[self.MAGIC_OFFSET:])
+ return self._crc == crc
+
+ def _decompress(self, key_offset):
+ # Copy of `_read_key_value`, but uses memoryview
+ pos = key_offset
+ key_size = struct.unpack_from(">i", self._buffer, pos)[0]
+ pos += self.KEY_LENGTH
+ if key_size != -1:
+ pos += key_size
+ value_size = struct.unpack_from(">i", self._buffer, pos)[0]
+ pos += self.VALUE_LENGTH
+ if value_size == -1:
+ raise CorruptRecordException("Value of compressed message is None")
+ else:
+ data = self._buffer[pos:pos + value_size]
+
+ compression_type = self.compression_type
+ if compression_type == self.CODEC_GZIP:
+ uncompressed = gzip_decode(data)
+ elif compression_type == self.CODEC_SNAPPY:
+ uncompressed = snappy_decode(data.tobytes())
+ elif compression_type == self.CODEC_LZ4:
+ if self._magic == 0:
+ uncompressed = lz4_decode_old_kafka(data.tobytes())
+ else:
+ uncompressed = lz4_decode(data.tobytes())
+ return uncompressed
+
+ def _read_header(self, pos):
+ if self._magic == 0:
+ offset, length, crc, magic_read, attrs = \
+ self.HEADER_STRUCT_V0.unpack_from(self._buffer, pos)
+ timestamp = None
+ else:
+ offset, length, crc, magic_read, attrs, timestamp = \
+ self.HEADER_STRUCT_V1.unpack_from(self._buffer, pos)
+ return offset, length, crc, magic_read, attrs, timestamp
+
+ def _read_all_headers(self):
+ pos = 0
+ msgs = []
+ buffer_len = len(self._buffer)
+ while pos < buffer_len:
+ header = self._read_header(pos)
+ msgs.append((header, pos))
+ pos += self.LOG_OVERHEAD + header[1] # length
+ return msgs
+
+ def _read_key_value(self, pos):
+ key_size = struct.unpack_from(">i", self._buffer, pos)[0]
+ pos += self.KEY_LENGTH
+ if key_size == -1:
+ key = None
+ else:
+ key = self._buffer[pos:pos + key_size].tobytes()
+ pos += key_size
+
+ value_size = struct.unpack_from(">i", self._buffer, pos)[0]
+ pos += self.VALUE_LENGTH
+ if value_size == -1:
+ value = None
+ else:
+ value = self._buffer[pos:pos + value_size].tobytes()
+ return key, value
+
+ def __iter__(self):
+ if self._magic == 1:
+ key_offset = self.KEY_OFFSET_V1
+ else:
+ key_offset = self.KEY_OFFSET_V0
+ timestamp_type = self.timestamp_type
+
+ if self.compression_type:
+ # In case we will call iter again
+ if not self._decompressed:
+ self._buffer = memoryview(self._decompress(key_offset))
+ self._decompressed = True
+
+ # If relative offset is used, we need to decompress the entire
+ # message first to compute the absolute offset.
+ headers = self._read_all_headers()
+ if self._magic > 0:
+ msg_header, _ = headers[-1]
+ absolute_base_offset = self._offset - msg_header[0]
+ else:
+ absolute_base_offset = -1
+
+ for header, msg_pos in headers:
+ offset, _, crc, _, attrs, timestamp = header
+ # There should only ever be a single layer of compression
+ assert not attrs & self.CODEC_MASK, (
+ 'MessageSet at offset %d appears double-compressed. This '
+ 'should not happen -- check your producers!' % offset)
+
+ # When magic value is greater than 0, the timestamp
+ # of a compressed message depends on the
+ # typestamp type of the wrapper message:
+ if timestamp_type == self.LOG_APPEND_TIME:
+ timestamp = self._timestamp
+
+ if absolute_base_offset >= 0:
+ offset += absolute_base_offset
+
+ key, value = self._read_key_value(msg_pos + key_offset)
+ yield LegacyRecord(
+ offset, timestamp, timestamp_type,
+ key, value, crc)
+ else:
+ key, value = self._read_key_value(key_offset)
+ yield LegacyRecord(
+ self._offset, self._timestamp, timestamp_type,
+ key, value, self._crc)
+
+
+class LegacyRecord(ABCRecord):
+
+ __slots__ = ("_offset", "_timestamp", "_timestamp_type", "_key", "_value",
+ "_crc")
+
+ def __init__(self, offset, timestamp, timestamp_type, key, value, crc):
+ self._offset = offset
+ self._timestamp = timestamp
+ self._timestamp_type = timestamp_type
+ self._key = key
+ self._value = value
+ self._crc = crc
+
+ @property
+ def offset(self):
+ return self._offset
+
+ @property
+ def timestamp(self):
+ """ Epoch milliseconds
+ """
+ return self._timestamp
+
+ @property
+ def timestamp_type(self):
+ """ CREATE_TIME(0) or APPEND_TIME(1)
+ """
+ return self._timestamp_type
+
+ @property
+ def key(self):
+ """ Bytes key or None
+ """
+ return self._key
+
+ @property
+ def value(self):
+ """ Bytes value or None
+ """
+ return self._value
+
+ @property
+ def headers(self):
+ return []
+
+ @property
+ def checksum(self):
+ return self._crc
+
+ def __repr__(self):
+ return (
+ "LegacyRecord(offset={!r}, timestamp={!r}, timestamp_type={!r},"
+ " key={!r}, value={!r}, crc={!r})".format(
+ self._offset, self._timestamp, self._timestamp_type,
+ self._key, self._value, self._crc)
+ )
+
+
+class LegacyRecordBatchBuilder(ABCRecordBatchBuilder, LegacyRecordBase):
+
+ def __init__(self, magic, compression_type, batch_size):
+ self._magic = magic
+ self._compression_type = compression_type
+ self._batch_size = batch_size
+ self._buffer = bytearray()
+
+ def append(self, offset, timestamp, key, value):
+ """ Append message to batch.
+ """
+ # Check types
+ if type(offset) != int:
+ raise TypeError(offset)
+ if timestamp is None:
+ timestamp = int(time.time() * 1000)
+ elif type(timestamp) != int:
+ raise TypeError(timestamp)
+ if not (key is None or
+ isinstance(key, (bytes, bytearray, memoryview))):
+ raise TypeError(
+ "Not supported type for key: {}".format(type(key)))
+ if not (value is None or
+ isinstance(value, (bytes, bytearray, memoryview))):
+ raise TypeError(
+ "Not supported type for value: {}".format(type(value)))
+
+ # Check if we have room for another message
+ pos = len(self._buffer)
+ size = self.size_in_bytes(offset, timestamp, key, value)
+ # We always allow at least one record to be appended
+ if offset != 0 and pos + size >= self._batch_size:
+ return None, 0
+
+ # Allocate proper buffer length
+ self._buffer.extend(bytearray(size))
+
+ # Encode message
+ crc = self._encode_msg(pos, offset, timestamp, key, value)
+
+ return crc, size
+
+ def _encode_msg(self, start_pos, offset, timestamp, key, value,
+ attributes=0):
+ """ Encode msg data into the `msg_buffer`, which should be allocated
+ to at least the size of this message.
+ """
+ magic = self._magic
+ buf = self._buffer
+ pos = start_pos
+
+ # Write key and value
+ pos += self.KEY_OFFSET_V0 if magic == 0 else self.KEY_OFFSET_V1
+
+ if key is None:
+ struct.pack_into(">i", buf, pos, -1)
+ pos += self.KEY_LENGTH
+ else:
+ key_size = len(key)
+ struct.pack_into(">i", buf, pos, key_size)
+ pos += self.KEY_LENGTH
+ buf[pos: pos + key_size] = key
+ pos += key_size
+
+ if value is None:
+ struct.pack_into(">i", buf, pos, -1)
+ pos += self.VALUE_LENGTH
+ else:
+ value_size = len(value)
+ struct.pack_into(">i", buf, pos, value_size)
+ pos += self.VALUE_LENGTH
+ buf[pos: pos + value_size] = value
+ pos += value_size
+ length = (pos - start_pos) - self.LOG_OVERHEAD
+
+ # Write msg header. Note, that Crc will be updated later
+ if magic == 0:
+ self.HEADER_STRUCT_V0.pack_into(
+ buf, start_pos,
+ offset, length, 0, magic, attributes)
+ else:
+ self.HEADER_STRUCT_V1.pack_into(
+ buf, start_pos,
+ offset, length, 0, magic, attributes, timestamp)
+
+ # Calculate CRC for msg
+ crc_data = memoryview(buf)[start_pos + self.MAGIC_OFFSET:]
+ crc = calc_crc32(crc_data)
+ struct.pack_into(">I", buf, start_pos + self.CRC_OFFSET, crc)
+ return crc
+
+ def _maybe_compress(self):
+ if self._compression_type:
+ if self._compression_type == self.CODEC_GZIP:
+ compressed = gzip_encode(bytes(self._buffer))
+ elif self._compression_type == self.CODEC_SNAPPY:
+ compressed = snappy_encode(self._buffer)
+ elif self._compression_type == self.CODEC_LZ4:
+ if self._magic == 0:
+ compressed = lz4_encode_old_kafka(bytes(self._buffer))
+ else:
+ compressed = lz4_encode(bytes(self._buffer))
+ size = self.size_in_bytes(
+ 0, timestamp=0, key=None, value=compressed)
+ # We will try to reuse the same buffer if we have enough space
+ if size > len(self._buffer):
+ self._buffer = bytearray(size)
+ else:
+ del self._buffer[size:]
+ self._encode_msg(
+ start_pos=0,
+ offset=0, timestamp=0, key=None, value=compressed,
+ attributes=self._compression_type)
+ return True
+ return False
+
+ def build(self):
+ """Compress batch to be ready for send"""
+ self._maybe_compress()
+ return self._buffer
+
+ def size(self):
+ """ Return current size of data written to buffer
+ """
+ return len(self._buffer)
+
+ # Size calculations. Just copied Java's implementation
+
+ def size_in_bytes(self, offset, timestamp, key, value, headers=None):
+ """ Actual size of message to add
+ """
+ assert not headers, "Headers not supported in v0/v1"
+ magic = self._magic
+ return self.LOG_OVERHEAD + self.record_size(magic, key, value)
+
+ @classmethod
+ def record_size(cls, magic, key, value):
+ message_size = cls.record_overhead(magic)
+ if key is not None:
+ message_size += len(key)
+ if value is not None:
+ message_size += len(value)
+ return message_size
+
+ @classmethod
+ def record_overhead(cls, magic):
+ assert magic in [0, 1], "Not supported magic"
+ if magic == 0:
+ return cls.RECORD_OVERHEAD_V0
+ else:
+ return cls.RECORD_OVERHEAD_V1
+
+ @classmethod
+ def estimate_size_in_bytes(cls, magic, compression_type, key, value):
+ """ Upper bound estimate of record size.
+ """
+ assert magic in [0, 1], "Not supported magic"
+ # In case of compression we may need another overhead for inner msg
+ if compression_type:
+ return (
+ cls.LOG_OVERHEAD + cls.record_overhead(magic) +
+ cls.record_size(magic, key, value)
+ )
+ return cls.LOG_OVERHEAD + cls.record_size(magic, key, value)
diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py
new file mode 100644
index 0000000..c6a28be
--- /dev/null
+++ b/kafka/record/memory_records.py
@@ -0,0 +1,176 @@
+# This class takes advantage of the fact that all formats v0, v1 and v2 of
+# messages storage has the same byte offsets for Length and Magic fields.
+# Lets look closely at what leading bytes all versions have:
+#
+# V0 and V1 (Offset is MessageSet part, other bytes are Message ones):
+# Offset => Int64
+# BytesLength => Int32
+# CRC => Int32
+# Magic => Int8
+# ...
+#
+# V2:
+# BaseOffset => Int64
+# Length => Int32
+# PartitionLeaderEpoch => Int32
+# Magic => Int8
+# ...
+#
+# So we can iterate over batches just by knowing offsets of Length. Magic is
+# used to construct the correct class for Batch itself.
+
+import struct
+
+from kafka.errors import CorruptRecordException
+from .abc import ABCRecords
+from .legacy_records import LegacyRecordBatch, LegacyRecordBatchBuilder
+
+
+class MemoryRecords(ABCRecords):
+
+ LENGTH_OFFSET = struct.calcsize(">q")
+ LOG_OVERHEAD = struct.calcsize(">qi")
+ MAGIC_OFFSET = struct.calcsize(">qii")
+
+ # Minimum space requirements for Record V0
+ MIN_SLICE = LOG_OVERHEAD + LegacyRecordBatch.RECORD_OVERHEAD_V0
+
+ def __init__(self, bytes_data):
+ self._buffer = bytes_data
+ self._pos = 0
+ # We keep one slice ahead so `has_next` will return very fast
+ self._next_slice = None
+ self._remaining_bytes = None
+ self._cache_next()
+
+ def size_in_bytes(self):
+ return len(self._buffer)
+
+ def valid_bytes(self):
+ # We need to read the whole buffer to get the valid_bytes.
+ # NOTE: in Fetcher we do the call after iteration, so should be fast
+ if self._remaining_bytes is None:
+ next_slice = self._next_slice
+ pos = self._pos
+ while self._remaining_bytes is None:
+ self._cache_next()
+ # Reset previous iterator position
+ self._next_slice = next_slice
+ self._pos = pos
+ return len(self._buffer) - self._remaining_bytes
+
+ # NOTE: we cache offsets here as kwargs for a bit more speed, as cPython
+ # will use LOAD_FAST opcode in this case
+ def _cache_next(self, len_offset=LENGTH_OFFSET, log_overhead=LOG_OVERHEAD):
+ buffer = self._buffer
+ buffer_len = len(buffer)
+ pos = self._pos
+ remaining = buffer_len - pos
+ if remaining < log_overhead:
+ # Will be re-checked in Fetcher for remaining bytes.
+ self._remaining_bytes = remaining
+ self._next_slice = None
+ return
+
+ length, = struct.unpack_from(
+ ">i", buffer, pos + len_offset)
+
+ slice_end = pos + log_overhead + length
+ if slice_end > buffer_len:
+ # Will be re-checked in Fetcher for remaining bytes
+ self._remaining_bytes = remaining
+ self._next_slice = None
+ return
+
+ self._next_slice = memoryview(buffer)[pos: slice_end]
+ self._pos = slice_end
+
+ def has_next(self):
+ return self._next_slice is not None
+
+ # NOTE: same cache for LOAD_FAST as above
+ def next_batch(self, _min_slice=MIN_SLICE,
+ _magic_offset=MAGIC_OFFSET):
+ next_slice = self._next_slice
+ if next_slice is None:
+ return None
+ if len(next_slice) < _min_slice:
+ raise CorruptRecordException(
+ "Record size is less than the minimum record overhead "
+ "({})".format(_min_slice - self.LOG_OVERHEAD))
+ self._cache_next()
+ magic, = struct.unpack_from(">b", next_slice, _magic_offset)
+ if magic <= 1:
+ return LegacyRecordBatch(next_slice, magic)
+ else: # pragma: no cover
+ raise NotImplementedError("Record V2 still not implemented")
+
+
+class MemoryRecordsBuilder(object):
+
+ def __init__(self, magic, compression_type, batch_size):
+ assert magic in [0, 1], "Not supported magic"
+ assert compression_type in [0, 1, 2, 3], "Not valid compression type"
+ self._builder = LegacyRecordBatchBuilder(
+ magic=magic, compression_type=compression_type,
+ batch_size=batch_size)
+ self._batch_size = batch_size
+ self._buffer = None
+
+ self._next_offset = 0
+ self._closed = False
+ self._bytes_written = 0
+
+ def append(self, timestamp, key, value):
+ """ Append a message to the buffer.
+
+ Returns:
+ (int, int): checksum and bytes written
+ """
+ if self._closed:
+ return None, 0
+
+ offset = self._next_offset
+ checksum, actual_size = self._builder.append(
+ offset, timestamp, key, value)
+ # Return of 0 size means there's no space to add a new message
+ if actual_size == 0:
+ return None, 0
+
+ self._next_offset += 1
+ return checksum, actual_size
+
+ def close(self):
+ # This method may be called multiple times on the same batch
+ # i.e., on retries
+ # we need to make sure we only close it out once
+ # otherwise compressed messages may be double-compressed
+ # see Issue 718
+ if not self._closed:
+ self._bytes_written = self._builder.size()
+ self._buffer = bytes(self._builder.build())
+ self._builder = None
+ self._closed = True
+
+ def size_in_bytes(self):
+ if not self._closed:
+ return self._builder.size()
+ else:
+ return len(self._buffer)
+
+ def compression_rate(self):
+ assert self._closed
+ return self.size_in_bytes() / self._bytes_written
+
+ def is_full(self):
+ if self._closed:
+ return True
+ else:
+ return self._builder.size() >= self._batch_size
+
+ def next_offset(self):
+ return self._next_offset
+
+ def buffer(self):
+ assert self._closed
+ return self._buffer
diff --git a/kafka/record/util.py b/kafka/record/util.py
new file mode 100644
index 0000000..098d6f4
--- /dev/null
+++ b/kafka/record/util.py
@@ -0,0 +1,8 @@
+import binascii
+
+
+def calc_crc32(memview):
+ """ Calculate simple CRC-32 checksum over a memoryview of data
+ """
+ crc = binascii.crc32(memview) & 0xffffffff
+ return crc
diff --git a/test/record/test_legacy_records.py b/test/record/test_legacy_records.py
new file mode 100644
index 0000000..2d76695
--- /dev/null
+++ b/test/record/test_legacy_records.py
@@ -0,0 +1,85 @@
+import pytest
+from kafka.record.legacy_records import (
+ LegacyRecordBatch, LegacyRecordBatchBuilder
+)
+from kafka.protocol.message import Message
+
+
+@pytest.mark.parametrize("magic", [0, 1])
+def test_read_write_serde_v0_v1_no_compression(magic):
+ builder = LegacyRecordBatchBuilder(
+ magic=magic, compression_type=0, batch_size=9999999)
+ builder.append(
+ 0, timestamp=9999999, key=b"test", value=b"Super")
+ buffer = builder.build()
+
+ batch = LegacyRecordBatch(bytes(buffer), magic)
+ msgs = list(batch)
+ assert len(msgs) == 1
+ msg = msgs[0]
+
+ assert msg.offset == 0
+ assert msg.timestamp == (9999999 if magic else None)
+ assert msg.timestamp_type == (0 if magic else None)
+ assert msg.key == b"test"
+ assert msg.value == b"Super"
+ assert msg.checksum == (-2095076219 if magic else 278251978) & 0xffffffff
+
+
+@pytest.mark.parametrize("compression_type", [
+ Message.CODEC_GZIP,
+ Message.CODEC_SNAPPY,
+ Message.CODEC_LZ4
+])
+@pytest.mark.parametrize("magic", [0, 1])
+def test_read_write_serde_v0_v1_with_compression(compression_type, magic):
+ builder = LegacyRecordBatchBuilder(
+ magic=magic, compression_type=compression_type, batch_size=9999999)
+ for offset in range(10):
+ builder.append(
+ offset, timestamp=9999999, key=b"test", value=b"Super")
+ buffer = builder.build()
+
+ batch = LegacyRecordBatch(bytes(buffer), magic)
+ msgs = list(batch)
+
+ expected_checksum = (-2095076219 if magic else 278251978) & 0xffffffff
+ for offset, msg in enumerate(msgs):
+ assert msg.offset == offset
+ assert msg.timestamp == (9999999 if magic else None)
+ assert msg.timestamp_type == (0 if magic else None)
+ assert msg.key == b"test"
+ assert msg.value == b"Super"
+ assert msg.checksum == expected_checksum
+
+
+@pytest.mark.parametrize("magic", [0, 1])
+def test_written_bytes_equals_size_in_bytes(magic):
+ key = b"test"
+ value = b"Super"
+ builder = LegacyRecordBatchBuilder(
+ magic=magic, compression_type=0, batch_size=9999999)
+
+ size_in_bytes = builder.size_in_bytes(
+ 0, timestamp=9999999, key=key, value=value)
+
+ pos = builder.size()
+ builder.append(0, timestamp=9999999, key=key, value=value)
+
+ assert builder.size() - pos == size_in_bytes
+
+
+@pytest.mark.parametrize("magic", [0, 1])
+def test_estimate_size_in_bytes_bigger_than_batch(magic):
+ key = b"Super Key"
+ value = b"1" * 100
+ estimate_size = LegacyRecordBatchBuilder.estimate_size_in_bytes(
+ magic, compression_type=0, key=key, value=value)
+
+ builder = LegacyRecordBatchBuilder(
+ magic=magic, compression_type=0, batch_size=9999999)
+ builder.append(
+ 0, timestamp=9999999, key=key, value=value)
+ buf = builder.build()
+ assert len(buf) <= estimate_size, \
+ "Estimate should always be upper bound"
diff --git a/test/record/test_records.py b/test/record/test_records.py
new file mode 100644
index 0000000..fc3eaca
--- /dev/null
+++ b/test/record/test_records.py
@@ -0,0 +1,108 @@
+import pytest
+from kafka.record import MemoryRecords
+from kafka.errors import CorruptRecordException
+
+record_batch_data_v1 = [
+ # First Message value == "123"
+ b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19G\x86(\xc2\x01\x00\x00'
+ b'\x00\x01^\x18g\xab\xae\xff\xff\xff\xff\x00\x00\x00\x03123',
+ # Second Message value == ""
+ b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x16\xef\x98\xc9 \x01\x00'
+ b'\x00\x00\x01^\x18g\xaf\xc0\xff\xff\xff\xff\x00\x00\x00\x00',
+ # Third Message value == ""
+ b'\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x16_\xaf\xfb^\x01\x00\x00'
+ b'\x00\x01^\x18g\xb0r\xff\xff\xff\xff\x00\x00\x00\x00',
+ # Fourth Message value = "123"
+ b'\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x19\xa8\x12W \x01\x00\x00'
+ b'\x00\x01^\x18g\xb8\x03\xff\xff\xff\xff\x00\x00\x00\x03123'
+]
+
+# This is real live data from Kafka 10 broker
+record_batch_data_v0 = [
+ # First Message value == "123"
+ b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x11\xfe\xb0\x1d\xbf\x00'
+ b'\x00\xff\xff\xff\xff\x00\x00\x00\x03123',
+ # Second Message value == ""
+ b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x0eyWH\xe0\x00\x00\xff'
+ b'\xff\xff\xff\x00\x00\x00\x00',
+ # Third Message value == ""
+ b'\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x0eyWH\xe0\x00\x00\xff'
+ b'\xff\xff\xff\x00\x00\x00\x00',
+ # Fourth Message value = "123"
+ b'\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x11\xfe\xb0\x1d\xbf\x00'
+ b'\x00\xff\xff\xff\xff\x00\x00\x00\x03123'
+]
+
+
+def test_memory_records_v1():
+ data_bytes = b"".join(record_batch_data_v1) + b"\x00" * 4
+ records = MemoryRecords(data_bytes)
+
+ assert records.size_in_bytes() == 146
+ assert records.valid_bytes() == 142
+
+ assert records.has_next() is True
+ batch = records.next_batch()
+ recs = list(batch)
+ assert len(recs) == 1
+ assert recs[0].value == b"123"
+ assert recs[0].key is None
+ assert recs[0].timestamp == 1503648000942
+ assert recs[0].timestamp_type == 0
+ assert recs[0].checksum == 1199974594 & 0xffffffff
+
+ assert records.next_batch() is not None
+ assert records.next_batch() is not None
+ assert records.next_batch() is not None
+
+ assert records.has_next() is False
+ assert records.next_batch() is None
+ assert records.next_batch() is None
+
+
+def test_memory_records_v0():
+ data_bytes = b"".join(record_batch_data_v0)
+ records = MemoryRecords(data_bytes + b"\x00" * 4)
+
+ assert records.size_in_bytes() == 114
+ assert records.valid_bytes() == 110
+
+ records = MemoryRecords(data_bytes)
+
+ assert records.has_next() is True
+ batch = records.next_batch()
+ recs = list(batch)
+ assert len(recs) == 1
+ assert recs[0].value == b"123"
+ assert recs[0].key is None
+ assert recs[0].timestamp is None
+ assert recs[0].timestamp_type is None
+ assert recs[0].checksum == -22012481 & 0xffffffff
+
+ assert records.next_batch() is not None
+ assert records.next_batch() is not None
+ assert records.next_batch() is not None
+
+ assert records.has_next() is False
+ assert records.next_batch() is None
+ assert records.next_batch() is None
+
+
+def test_memory_records_corrupt():
+ records = MemoryRecords(b"")
+ assert records.size_in_bytes() == 0
+ assert records.valid_bytes() == 0
+ assert records.has_next() is False
+
+ records = MemoryRecords(b"\x00\x00\x00")
+ assert records.size_in_bytes() == 3
+ assert records.valid_bytes() == 0
+ assert records.has_next() is False
+
+ records = MemoryRecords(
+ b"\x00\x00\x00\x00\x00\x00\x00\x03" # Offset=3
+ b"\x00\x00\x00\x03" # Length=3
+ b"\xfe\xb0\x1d", # Some random bytes
+ )
+ with pytest.raises(CorruptRecordException):
+ records.next_batch()
diff --git a/test/test_buffer.py b/test/test_buffer.py
deleted file mode 100644
index db6cbb3..0000000
--- a/test/test_buffer.py
+++ /dev/null
@@ -1,72 +0,0 @@
-# pylint: skip-file
-from __future__ import absolute_import
-
-import io
-import platform
-
-import pytest
-
-from kafka.producer.buffer import MessageSetBuffer
-from kafka.protocol.message import Message, MessageSet
-
-
-def test_buffer_close():
- records = MessageSetBuffer(io.BytesIO(), 100000)
- orig_msg = Message(b'foobar')
- records.append(1234, orig_msg)
- records.close()
-
- msgset = MessageSet.decode(records.buffer())
- assert len(msgset) == 1
- (offset, size, msg) = msgset[0]
- assert offset == 1234
- assert msg == orig_msg
-
- # Closing again should work fine
- records.close()
-
- msgset = MessageSet.decode(records.buffer())
- assert len(msgset) == 1
- (offset, size, msg) = msgset[0]
- assert offset == 1234
- assert msg == orig_msg
-
-
-@pytest.mark.parametrize('compression', [
- 'gzip',
- 'snappy',
- pytest.mark.skipif(platform.python_implementation() == 'PyPy',
- reason='python-lz4 crashes on older versions of pypy')('lz4'),
-])
-def test_compressed_buffer_close(compression):
- records = MessageSetBuffer(io.BytesIO(), 100000, compression_type=compression)
- orig_msg = Message(b'foobar')
- records.append(1234, orig_msg)
- records.close()
-
- msgset = MessageSet.decode(records.buffer())
- assert len(msgset) == 1
- (offset, size, msg) = msgset[0]
- assert offset == 0
- assert msg.is_compressed()
-
- msgset = msg.decompress()
- (offset, size, msg) = msgset[0]
- assert not msg.is_compressed()
- assert offset == 1234
- assert msg == orig_msg
-
- # Closing again should work fine
- records.close()
-
- msgset = MessageSet.decode(records.buffer())
- assert len(msgset) == 1
- (offset, size, msg) = msgset[0]
- assert offset == 0
- assert msg.is_compressed()
-
- msgset = msg.decompress()
- (offset, size, msg) = msgset[0]
- assert not msg.is_compressed()
- assert offset == 1234
- assert msg == orig_msg
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 17e7401..d1843b3 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -26,6 +26,8 @@ from test.testutil import (
class TestConsumerIntegration(KafkaIntegrationTestCase):
+ maxDiff = None
+
@classmethod
def setUpClass(cls):
if not os.environ.get('KAFKA_VERSION'):
@@ -648,10 +650,10 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
kafka_producer = self.kafka_producer()
early_msg = kafka_producer.send(
self.topic, partition=0, value=b"first",
- timestamp_ms=early_time).get()
+ timestamp_ms=early_time).get(1)
late_msg = kafka_producer.send(
self.topic, partition=0, value=b"last",
- timestamp_ms=late_time).get()
+ timestamp_ms=late_time).get(1)
consumer = self.kafka_consumer()
offsets = consumer.offsets_for_times({tp: early_time})
diff --git a/test/test_protocol.py b/test/test_protocol.py
index 0203614..d963650 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -260,13 +260,14 @@ def test_decode_fetch_response_partial():
struct.pack('>i', 8), # Length of value
b'ar', # Value (truncated)
])
-
resp = FetchResponse[0].decode(io.BytesIO(encoded))
assert len(resp.topics) == 1
topic, partitions = resp.topics[0]
assert topic == 'foobar'
assert len(partitions) == 2
- m1 = partitions[0][3]
+
+ m1 = MessageSet.decode(
+ partitions[0][3], bytes_to_read=len(partitions[0][3]))
assert len(m1) == 2
assert m1[1] == (None, None, PartialMessage())
diff --git a/test/test_sender.py b/test/test_sender.py
index f37e194..2a68def 100644
--- a/test/test_sender.py
+++ b/test/test_sender.py
@@ -1,20 +1,17 @@
# pylint: skip-file
from __future__ import absolute_import
-import io
-
import pytest
+import io
from kafka.client_async import KafkaClient
from kafka.cluster import ClusterMetadata
-import kafka.errors as Errors
-from kafka.future import Future
from kafka.metrics import Metrics
-from kafka.producer.buffer import MessageSetBuffer
from kafka.protocol.produce import ProduceRequest
-from kafka.producer.record_accumulator import RecordAccumulator, RecordBatch
+from kafka.producer.record_accumulator import RecordAccumulator, ProducerBatch
from kafka.producer.sender import Sender
-from kafka.structs import TopicPartition, OffsetAndMetadata
+from kafka.record.memory_records import MemoryRecordsBuilder
+from kafka.structs import TopicPartition
@pytest.fixture
@@ -47,7 +44,10 @@ def sender(client, accumulator, metrics):
def test_produce_request(sender, mocker, api_version, produce_version):
sender.config['api_version'] = api_version
tp = TopicPartition('foo', 0)
- records = MessageSetBuffer(io.BytesIO(), 100000)
- batch = RecordBatch(tp, records)
+ buffer = io.BytesIO()
+ records = MemoryRecordsBuilder(
+ magic=1, compression_type=0, batch_size=100000)
+ batch = ProducerBatch(tp, records, buffer)
+ records.close()
produce_request = sender._produce_request(0, 0, 0, [batch])
assert isinstance(produce_request, ProduceRequest[produce_version])