diff options
Diffstat (limited to 'kafka/producer/buffer.py')
-rw-r--r-- | kafka/producer/buffer.py | 126 |
1 files changed, 1 insertions, 125 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py index d1eeaf1..19ea732 100644 --- a/kafka/producer/buffer.py +++ b/kafka/producer/buffer.py @@ -5,133 +5,9 @@ import io import threading import time -from ..codec import (has_gzip, has_snappy, has_lz4, - gzip_encode, snappy_encode, - lz4_encode, lz4_encode_old_kafka) -from .. import errors as Errors from ..metrics.stats import Rate -from ..protocol.types import Int32, Int64 -from ..protocol.message import MessageSet, Message - - -class MessageSetBuffer(object): - """Wrap a buffer for writing MessageSet batches. - - Arguments: - buf (IO stream): a buffer for writing data. Typically BytesIO. - batch_size (int): maximum number of bytes to write to the buffer. - - Keyword Arguments: - compression_type ('gzip', 'snappy', None): compress messages before - publishing. Default: None. - """ - _COMPRESSORS = { - 'gzip': (has_gzip, gzip_encode, Message.CODEC_GZIP), - 'snappy': (has_snappy, snappy_encode, Message.CODEC_SNAPPY), - 'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4), - 'lz4-old-kafka': (has_lz4, lz4_encode_old_kafka, Message.CODEC_LZ4), - } - def __init__(self, buf, batch_size, compression_type=None, message_version=0): - if compression_type is not None: - assert compression_type in self._COMPRESSORS, 'Unrecognized compression type' - - # Kafka 0.8/0.9 had a quirky lz4... - if compression_type == 'lz4' and message_version == 0: - compression_type = 'lz4-old-kafka' - - checker, encoder, attributes = self._COMPRESSORS[compression_type] - assert checker(), 'Compression Libraries Not Found' - self._compressor = encoder - self._compression_attributes = attributes - else: - self._compressor = None - self._compression_attributes = None - - self._message_version = message_version - self._buffer = buf - # Init MessageSetSize to 0 -- update on close - self._buffer.seek(0) - self._buffer.write(Int32.encode(0)) - self._batch_size = batch_size - self._closed = False - self._messages = 0 - self._bytes_written = 4 # Int32 header is 4 bytes - self._final_size = None - - def append(self, offset, message): - """Append a Message to the MessageSet. - - Arguments: - offset (int): offset of the message - message (Message or bytes): message struct or encoded bytes - - Returns: bytes written - """ - if isinstance(message, Message): - encoded = message.encode() - else: - encoded = bytes(message) - msg = Int64.encode(offset) + Int32.encode(len(encoded)) + encoded - self._buffer.write(msg) - self._messages += 1 - self._bytes_written += len(msg) - return len(msg) - - def has_room_for(self, key, value): - if self._closed: - return False - if not self._messages: - return True - needed_bytes = MessageSet.HEADER_SIZE + Message.HEADER_SIZE - if key is not None: - needed_bytes += len(key) - if value is not None: - needed_bytes += len(value) - return self._buffer.tell() + needed_bytes < self._batch_size - - def is_full(self): - if self._closed: - return True - return self._buffer.tell() >= self._batch_size - - def close(self): - # This method may be called multiple times on the same batch - # i.e., on retries - # we need to make sure we only close it out once - # otherwise compressed messages may be double-compressed - # see Issue 718 - if not self._closed: - if self._compressor: - # TODO: avoid copies with bytearray / memoryview - uncompressed_size = self._buffer.tell() - self._buffer.seek(4) - msg = Message(self._compressor(self._buffer.read(uncompressed_size - 4)), - attributes=self._compression_attributes, - magic=self._message_version) - encoded = msg.encode() - self._buffer.seek(4) - self._buffer.write(Int64.encode(0)) # offset 0 for wrapper msg - self._buffer.write(Int32.encode(len(encoded))) - self._buffer.write(encoded) - - # Update the message set size (less the 4 byte header), - # and return with buffer ready for full read() - self._final_size = self._buffer.tell() - self._buffer.seek(0) - self._buffer.write(Int32.encode(self._final_size - 4)) - - self._buffer.seek(0) - self._closed = True - - def size_in_bytes(self): - return self._final_size or self._buffer.tell() - - def compression_rate(self): - return self.size_in_bytes() / self._bytes_written - - def buffer(self): - return self._buffer +import kafka.errors as Errors class SimpleBufferPool(object): |