summaryrefslogtreecommitdiff
path: root/kafka/producer/buffer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer/buffer.py')
-rw-r--r--kafka/producer/buffer.py126
1 files changed, 1 insertions, 125 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py
index d1eeaf1..19ea732 100644
--- a/kafka/producer/buffer.py
+++ b/kafka/producer/buffer.py
@@ -5,133 +5,9 @@ import io
import threading
import time
-from ..codec import (has_gzip, has_snappy, has_lz4,
- gzip_encode, snappy_encode,
- lz4_encode, lz4_encode_old_kafka)
-from .. import errors as Errors
from ..metrics.stats import Rate
-from ..protocol.types import Int32, Int64
-from ..protocol.message import MessageSet, Message
-
-
-class MessageSetBuffer(object):
- """Wrap a buffer for writing MessageSet batches.
-
- Arguments:
- buf (IO stream): a buffer for writing data. Typically BytesIO.
- batch_size (int): maximum number of bytes to write to the buffer.
-
- Keyword Arguments:
- compression_type ('gzip', 'snappy', None): compress messages before
- publishing. Default: None.
- """
- _COMPRESSORS = {
- 'gzip': (has_gzip, gzip_encode, Message.CODEC_GZIP),
- 'snappy': (has_snappy, snappy_encode, Message.CODEC_SNAPPY),
- 'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4),
- 'lz4-old-kafka': (has_lz4, lz4_encode_old_kafka, Message.CODEC_LZ4),
- }
- def __init__(self, buf, batch_size, compression_type=None, message_version=0):
- if compression_type is not None:
- assert compression_type in self._COMPRESSORS, 'Unrecognized compression type'
-
- # Kafka 0.8/0.9 had a quirky lz4...
- if compression_type == 'lz4' and message_version == 0:
- compression_type = 'lz4-old-kafka'
-
- checker, encoder, attributes = self._COMPRESSORS[compression_type]
- assert checker(), 'Compression Libraries Not Found'
- self._compressor = encoder
- self._compression_attributes = attributes
- else:
- self._compressor = None
- self._compression_attributes = None
-
- self._message_version = message_version
- self._buffer = buf
- # Init MessageSetSize to 0 -- update on close
- self._buffer.seek(0)
- self._buffer.write(Int32.encode(0))
- self._batch_size = batch_size
- self._closed = False
- self._messages = 0
- self._bytes_written = 4 # Int32 header is 4 bytes
- self._final_size = None
-
- def append(self, offset, message):
- """Append a Message to the MessageSet.
-
- Arguments:
- offset (int): offset of the message
- message (Message or bytes): message struct or encoded bytes
-
- Returns: bytes written
- """
- if isinstance(message, Message):
- encoded = message.encode()
- else:
- encoded = bytes(message)
- msg = Int64.encode(offset) + Int32.encode(len(encoded)) + encoded
- self._buffer.write(msg)
- self._messages += 1
- self._bytes_written += len(msg)
- return len(msg)
-
- def has_room_for(self, key, value):
- if self._closed:
- return False
- if not self._messages:
- return True
- needed_bytes = MessageSet.HEADER_SIZE + Message.HEADER_SIZE
- if key is not None:
- needed_bytes += len(key)
- if value is not None:
- needed_bytes += len(value)
- return self._buffer.tell() + needed_bytes < self._batch_size
-
- def is_full(self):
- if self._closed:
- return True
- return self._buffer.tell() >= self._batch_size
-
- def close(self):
- # This method may be called multiple times on the same batch
- # i.e., on retries
- # we need to make sure we only close it out once
- # otherwise compressed messages may be double-compressed
- # see Issue 718
- if not self._closed:
- if self._compressor:
- # TODO: avoid copies with bytearray / memoryview
- uncompressed_size = self._buffer.tell()
- self._buffer.seek(4)
- msg = Message(self._compressor(self._buffer.read(uncompressed_size - 4)),
- attributes=self._compression_attributes,
- magic=self._message_version)
- encoded = msg.encode()
- self._buffer.seek(4)
- self._buffer.write(Int64.encode(0)) # offset 0 for wrapper msg
- self._buffer.write(Int32.encode(len(encoded)))
- self._buffer.write(encoded)
-
- # Update the message set size (less the 4 byte header),
- # and return with buffer ready for full read()
- self._final_size = self._buffer.tell()
- self._buffer.seek(0)
- self._buffer.write(Int32.encode(self._final_size - 4))
-
- self._buffer.seek(0)
- self._closed = True
-
- def size_in_bytes(self):
- return self._final_size or self._buffer.tell()
-
- def compression_rate(self):
- return self.size_in_bytes() / self._bytes_written
-
- def buffer(self):
- return self._buffer
+import kafka.errors as Errors
class SimpleBufferPool(object):