summaryrefslogtreecommitdiff
path: root/kafka/producer/buffer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer/buffer.py')
-rw-r--r--kafka/producer/buffer.py20
1 files changed, 15 insertions, 5 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py
index 5fcb35f..de5f0e7 100644
--- a/kafka/producer/buffer.py
+++ b/kafka/producer/buffer.py
@@ -1,4 +1,4 @@
-from __future__ import absolute_import
+from __future__ import absolute_import, division
import collections
import io
@@ -55,6 +55,8 @@ class MessageSetBuffer(object):
self._batch_size = batch_size
self._closed = False
self._messages = 0
+ self._bytes_written = 4 # Int32 header is 4 bytes
+ self._final_size = None
def append(self, offset, message):
"""Apend a Message to the MessageSet.
@@ -62,6 +64,8 @@ class MessageSetBuffer(object):
Arguments:
offset (int): offset of the message
message (Message or bytes): message struct or encoded bytes
+
+ Returns: bytes written
"""
if isinstance(message, Message):
encoded = message.encode()
@@ -70,6 +74,8 @@ class MessageSetBuffer(object):
msg = Int64.encode(offset) + Int32.encode(len(encoded)) + encoded
self._buffer.write(msg)
self._messages += 1
+ self._bytes_written += len(msg)
+ return len(msg)
def has_room_for(self, key, value):
if self._closed:
@@ -107,16 +113,20 @@ class MessageSetBuffer(object):
self._buffer.write(Int32.encode(len(encoded)))
self._buffer.write(encoded)
- # Update the message set size, and return ready for full read()
- size = self._buffer.tell() - 4
+ # Update the message set size (less the 4 byte header),
+ # and return with buffer ready for full read()
+ self._final_size = self._buffer.tell()
self._buffer.seek(0)
- self._buffer.write(Int32.encode(size))
+ self._buffer.write(Int32.encode(self._final_size - 4))
self._buffer.seek(0)
self._closed = True
def size_in_bytes(self):
- return self._buffer.tell()
+ return self._final_size or self._buffer.tell()
+
+ def compression_rate(self):
+ return self.size_in_bytes() / self._bytes_written
def buffer(self):
return self._buffer