diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-03-13 23:19:53 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-03-13 23:19:53 -0700 |
commit | 0c272a05b3b29515759620803053d091ef98260d (patch) | |
tree | 0bc1548580f50008b23918f2312b131fafff0e0b | |
parent | a45cd4d17bd7f6d1fe9ae887f5847182a799ca07 (diff) | |
parent | 4ffd4e94e05e9494bd5ec32bd1037f65ed820986 (diff) | |
download | kafka-python-0c272a05b3b29515759620803053d091ef98260d.tar.gz |
Merge pull request #585 from dpkp/truncate_buffer
Truncate deallocated message buffers
-rw-r--r-- | kafka/producer/buffer.py | 17 | ||||
-rw-r--r-- | test/test_producer.py | 27 |
2 files changed, 27 insertions, 17 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py index 74ba5da..8c83ffc 100644 --- a/kafka/producer/buffer.py +++ b/kafka/producer/buffer.py @@ -191,19 +191,12 @@ class SimpleBufferPool(object): buffer_ (io.BytesIO): The buffer to return """ with self._lock: - capacity = buf.seek(0, 2) - - # free extra memory if needed - if capacity > self._poolable_size: - # BytesIO (cpython) only frees memory if 2x reduction or more - trunc_to = int(min(capacity / 2, self._poolable_size)) - buf.truncate(trunc_to) - - buf.seek(0) - #buf.write(bytearray(12)) - #buf.seek(0) + # BytesIO.truncate here makes the pool somewhat pointless + # but we stick with the BufferPool API until migrating to + # bytesarray / memoryview. The buffer we return must not + # expose any prior data on read(). + buf.truncate(0) self._free.append(buf) - if self._waiters: self._waiters[0].notify() diff --git a/test/test_producer.py b/test/test_producer.py index 829c6f2..f11bb05 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -3,10 +3,23 @@ import sys import pytest from kafka import KafkaConsumer, KafkaProducer +from kafka.producer.buffer import SimpleBufferPool from test.conftest import version from test.testutil import random_string +def test_buffer_pool(): + pool = SimpleBufferPool(1000, 1000) + + buf1 = pool.allocate(1000, 1000) + message = ''.join(map(str, range(100))) + buf1.write(message.encode('utf-8')) + pool.deallocate(buf1) + + buf2 = pool.allocate(1000, 1000) + assert buf2.read() == b'' + + @pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") @pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4']) def test_end_to_end(kafka_broker, compression): @@ -33,17 +46,21 @@ def test_end_to_end(kafka_broker, compression): topic = random_string(5) - for i in range(1000): - producer.send(topic, 'msg %d' % i) - producer.flush(timeout=30) + messages = 100 + futures = [] + for i in range(messages): + futures.append(producer.send(topic, 'msg %d' % i)) + ret = [f.get(timeout=30) for f in futures] + assert len(ret) == messages + producer.close() consumer.subscribe([topic]) msgs = set() - for i in range(1000): + for i in range(messages): try: msgs.add(next(consumer).value) except StopIteration: break - assert msgs == set(['msg %d' % i for i in range(1000)]) + assert msgs == set(['msg %d' % i for i in range(messages)]) |