summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-03-13 23:19:53 -0700
committerDana Powers <dana.powers@gmail.com>2016-03-13 23:19:53 -0700
commit0c272a05b3b29515759620803053d091ef98260d (patch)
tree0bc1548580f50008b23918f2312b131fafff0e0b
parenta45cd4d17bd7f6d1fe9ae887f5847182a799ca07 (diff)
parent4ffd4e94e05e9494bd5ec32bd1037f65ed820986 (diff)
downloadkafka-python-0c272a05b3b29515759620803053d091ef98260d.tar.gz
Merge pull request #585 from dpkp/truncate_buffer
Truncate deallocated message buffers
-rw-r--r--kafka/producer/buffer.py17
-rw-r--r--test/test_producer.py27
2 files changed, 27 insertions, 17 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py
index 74ba5da..8c83ffc 100644
--- a/kafka/producer/buffer.py
+++ b/kafka/producer/buffer.py
@@ -191,19 +191,12 @@ class SimpleBufferPool(object):
buffer_ (io.BytesIO): The buffer to return
"""
with self._lock:
- capacity = buf.seek(0, 2)
-
- # free extra memory if needed
- if capacity > self._poolable_size:
- # BytesIO (cpython) only frees memory if 2x reduction or more
- trunc_to = int(min(capacity / 2, self._poolable_size))
- buf.truncate(trunc_to)
-
- buf.seek(0)
- #buf.write(bytearray(12))
- #buf.seek(0)
+ # BytesIO.truncate here makes the pool somewhat pointless
+ # but we stick with the BufferPool API until migrating to
+ # bytesarray / memoryview. The buffer we return must not
+ # expose any prior data on read().
+ buf.truncate(0)
self._free.append(buf)
-
if self._waiters:
self._waiters[0].notify()
diff --git a/test/test_producer.py b/test/test_producer.py
index 829c6f2..f11bb05 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -3,10 +3,23 @@ import sys
import pytest
from kafka import KafkaConsumer, KafkaProducer
+from kafka.producer.buffer import SimpleBufferPool
from test.conftest import version
from test.testutil import random_string
+def test_buffer_pool():
+ pool = SimpleBufferPool(1000, 1000)
+
+ buf1 = pool.allocate(1000, 1000)
+ message = ''.join(map(str, range(100)))
+ buf1.write(message.encode('utf-8'))
+ pool.deallocate(buf1)
+
+ buf2 = pool.allocate(1000, 1000)
+ assert buf2.read() == b''
+
+
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4'])
def test_end_to_end(kafka_broker, compression):
@@ -33,17 +46,21 @@ def test_end_to_end(kafka_broker, compression):
topic = random_string(5)
- for i in range(1000):
- producer.send(topic, 'msg %d' % i)
- producer.flush(timeout=30)
+ messages = 100
+ futures = []
+ for i in range(messages):
+ futures.append(producer.send(topic, 'msg %d' % i))
+ ret = [f.get(timeout=30) for f in futures]
+ assert len(ret) == messages
+
producer.close()
consumer.subscribe([topic])
msgs = set()
- for i in range(1000):
+ for i in range(messages):
try:
msgs.add(next(consumer).value)
except StopIteration:
break
- assert msgs == set(['msg %d' % i for i in range(1000)])
+ assert msgs == set(['msg %d' % i for i in range(messages)])