summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-14 08:35:24 -0700
committerDana Powers <dana.powers@gmail.com>2016-07-14 20:43:03 -0700
commit594f7079da4fc1598966dcc82caaf73532dea4d4 (patch)
treea1349c4a71fcee681d1670509255d2cf4e0e3ad8
parent1eb7e05c323322818fb60192f638d6b83f2fd1ef (diff)
downloadkafka-python-594f7079da4fc1598966dcc82caaf73532dea4d4.tar.gz
Test MessageSetBuffer close -- cover double close compression bug
-rw-r--r--test/test_buffer.py70
1 files changed, 70 insertions, 0 deletions
diff --git a/test/test_buffer.py b/test/test_buffer.py
new file mode 100644
index 0000000..c8e283d
--- /dev/null
+++ b/test/test_buffer.py
@@ -0,0 +1,70 @@
+# pylint: skip-file
+from __future__ import absolute_import
+
+import io
+
+import pytest
+
+from kafka.producer.buffer import MessageSetBuffer
+from kafka.protocol.message import Message, MessageSet
+
+
+def test_buffer_close():
+ records = MessageSetBuffer(io.BytesIO(), 100000)
+ orig_msg = Message(b'foobar')
+ records.append(1234, orig_msg)
+ records.close()
+
+ msgset = MessageSet.decode(records.buffer())
+ assert len(msgset) == 1
+ (offset, size, msg) = msgset[0]
+ assert offset == 1234
+ assert msg == orig_msg
+
+ # Closing again should work fine
+ records.close()
+
+ msgset = MessageSet.decode(records.buffer())
+ assert len(msgset) == 1
+ (offset, size, msg) = msgset[0]
+ assert offset == 1234
+ assert msg == orig_msg
+
+
+@pytest.mark.parametrize('compression', [
+ 'gzip',
+ 'snappy',
+ pytest.mark.skipif("sys.version_info < (2,7)")('lz4'), # lz4tools does not work on py26
+])
+def test_compressed_buffer_close(compression):
+ records = MessageSetBuffer(io.BytesIO(), 100000, compression_type=compression)
+ orig_msg = Message(b'foobar')
+ records.append(1234, orig_msg)
+ records.close()
+
+ msgset = MessageSet.decode(records.buffer())
+ assert len(msgset) == 1
+ (offset, size, msg) = msgset[0]
+ assert offset == 0
+ assert msg.is_compressed()
+
+ msgset = msg.decompress()
+ (offset, size, msg) = msgset[0]
+ assert not msg.is_compressed()
+ assert offset == 1234
+ assert msg == orig_msg
+
+ # Closing again should work fine
+ records.close()
+
+ msgset = MessageSet.decode(records.buffer())
+ assert len(msgset) == 1
+ (offset, size, msg) = msgset[0]
+ assert offset == 0
+ assert msg.is_compressed()
+
+ msgset = msg.decompress()
+ (offset, size, msg) = msgset[0]
+ assert not msg.is_compressed()
+ assert offset == 1234
+ assert msg == orig_msg