diff options
Diffstat (limited to 'kafka/producer/buffer.py')
-rw-r--r-- | kafka/producer/buffer.py | 20 |
1 files changed, 15 insertions, 5 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py index 5fcb35f..de5f0e7 100644 --- a/kafka/producer/buffer.py +++ b/kafka/producer/buffer.py @@ -1,4 +1,4 @@ -from __future__ import absolute_import +from __future__ import absolute_import, division import collections import io @@ -55,6 +55,8 @@ class MessageSetBuffer(object): self._batch_size = batch_size self._closed = False self._messages = 0 + self._bytes_written = 4 # Int32 header is 4 bytes + self._final_size = None def append(self, offset, message): """Apend a Message to the MessageSet. @@ -62,6 +64,8 @@ class MessageSetBuffer(object): Arguments: offset (int): offset of the message message (Message or bytes): message struct or encoded bytes + + Returns: bytes written """ if isinstance(message, Message): encoded = message.encode() @@ -70,6 +74,8 @@ class MessageSetBuffer(object): msg = Int64.encode(offset) + Int32.encode(len(encoded)) + encoded self._buffer.write(msg) self._messages += 1 + self._bytes_written += len(msg) + return len(msg) def has_room_for(self, key, value): if self._closed: @@ -107,16 +113,20 @@ class MessageSetBuffer(object): self._buffer.write(Int32.encode(len(encoded))) self._buffer.write(encoded) - # Update the message set size, and return ready for full read() - size = self._buffer.tell() - 4 + # Update the message set size (less the 4 byte header), + # and return with buffer ready for full read() + self._final_size = self._buffer.tell() self._buffer.seek(0) - self._buffer.write(Int32.encode(size)) + self._buffer.write(Int32.encode(self._final_size - 4)) self._buffer.seek(0) self._closed = True def size_in_bytes(self): - return self._buffer.tell() + return self._final_size or self._buffer.tell() + + def compression_rate(self): + return self.size_in_bytes() / self._bytes_written def buffer(self): return self._buffer |