summaryrefslogtreecommitdiff
path: root/kafka/producer/buffer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer/buffer.py')
-rw-r--r--kafka/producer/buffer.py10
1 files changed, 6 insertions, 4 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py
index 1a2dd71..a95bb87 100644
--- a/kafka/producer/buffer.py
+++ b/kafka/producer/buffer.py
@@ -30,8 +30,6 @@ class MessageSetBuffer(object):
'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4),
}
def __init__(self, buf, batch_size, compression_type=None):
- assert batch_size > 0, 'batch_size must be > 0'
-
if compression_type is not None:
assert compression_type in self._COMPRESSORS, 'Unrecognized compression type'
checker, encoder, attributes = self._COMPRESSORS[compression_type]
@@ -121,7 +119,7 @@ class SimpleBufferPool(object):
self._poolable_size = poolable_size
self._lock = threading.RLock()
- buffers = int(memory / poolable_size)
+ buffers = int(memory / poolable_size) if poolable_size else 0
self._free = collections.deque([io.BytesIO() for _ in range(buffers)])
self._waiters = collections.deque()
@@ -130,12 +128,13 @@ class SimpleBufferPool(object):
#MetricName metricName = metrics.metricName("bufferpool-wait-ratio", metricGrpName, "The fraction of time an appender waits for space allocation.");
#this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
- def allocate(self, max_time_to_block_ms):
+ def allocate(self, size, max_time_to_block_ms):
"""
Allocate a buffer of the given size. This method blocks if there is not
enough memory and the buffer pool is configured with blocking mode.
Arguments:
+ size (int): The buffer size to allocate in bytes [ignored]
max_time_to_block_ms (int): The maximum time in milliseconds to
block for buffer memory to be available
@@ -147,6 +146,9 @@ class SimpleBufferPool(object):
if self._free:
return self._free.popleft()
+ elif self._poolable_size == 0:
+ return io.BytesIO()
+
else:
# we are out of buffers and will have to block
buf = None