summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-02-17 22:43:21 -0800
committerDana Powers <dana.powers@gmail.com>2016-02-17 22:43:21 -0800
commit97fd705a234fae1d4252e02a47ab0b6b70fde12b (patch)
tree3ea4aad2fd4bc0e54e0bf7b4165bb0f00e3ad6a2
parentd5c05c811e453c507ac6f7f85bceffc5a7ba1661 (diff)
downloadkafka-python-batch_size_zero.tar.gz
Support batch_size = 0 in producer buffersbatch_size_zero
-rw-r--r--kafka/producer/buffer.py10
-rw-r--r--kafka/producer/record_accumulator.py2
2 files changed, 7 insertions, 5 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py
index 1a2dd71..a95bb87 100644
--- a/kafka/producer/buffer.py
+++ b/kafka/producer/buffer.py
@@ -30,8 +30,6 @@ class MessageSetBuffer(object):
'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4),
}
def __init__(self, buf, batch_size, compression_type=None):
- assert batch_size > 0, 'batch_size must be > 0'
-
if compression_type is not None:
assert compression_type in self._COMPRESSORS, 'Unrecognized compression type'
checker, encoder, attributes = self._COMPRESSORS[compression_type]
@@ -121,7 +119,7 @@ class SimpleBufferPool(object):
self._poolable_size = poolable_size
self._lock = threading.RLock()
- buffers = int(memory / poolable_size)
+ buffers = int(memory / poolable_size) if poolable_size else 0
self._free = collections.deque([io.BytesIO() for _ in range(buffers)])
self._waiters = collections.deque()
@@ -130,12 +128,13 @@ class SimpleBufferPool(object):
#MetricName metricName = metrics.metricName("bufferpool-wait-ratio", metricGrpName, "The fraction of time an appender waits for space allocation.");
#this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
- def allocate(self, max_time_to_block_ms):
+ def allocate(self, size, max_time_to_block_ms):
"""
Allocate a buffer of the given size. This method blocks if there is not
enough memory and the buffer pool is configured with blocking mode.
Arguments:
+ size (int): The buffer size to allocate in bytes [ignored]
max_time_to_block_ms (int): The maximum time in milliseconds to
block for buffer memory to be available
@@ -147,6 +146,9 @@ class SimpleBufferPool(object):
if self._free:
return self._free.popleft()
+ elif self._poolable_size == 0:
+ return io.BytesIO()
+
else:
# we are out of buffers and will have to block
buf = None
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index c62926d..1e692ee 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -200,7 +200,7 @@ class RecordAccumulator(object):
size = max(self.config['batch_size'], message_size)
log.debug("Allocating a new %d byte message buffer for %s", size, tp) # trace
- buf = self._free.allocate(max_time_to_block_ms)
+ buf = self._free.allocate(size, max_time_to_block_ms)
with self._tp_locks[tp]:
# Need to check if producer is closed again after grabbing the
# dequeue lock.