summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-25 17:14:35 -0800
committerDana Powers <dana.powers@rd.io>2016-01-25 17:14:35 -0800
commit0a74a9eec07f148ba3554ed70e4bbda901bbcb6b (patch)
tree31ed6661423eb248cc1e1d01faa3ac75262c2a4d
parent78bbc6d4d4ad67a7af32e10b08cc89ddfdd86322 (diff)
downloadkafka-python-0a74a9eec07f148ba3554ed70e4bbda901bbcb6b.tar.gz
Python3 does not support buffer -- use memoryview in snappy_decode
-rw-r--r--kafka/codec.py10
1 files changed, 8 insertions, 2 deletions
diff --git a/kafka/codec.py b/kafka/codec.py
index 9eaeeca..5adb2e5 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -2,6 +2,7 @@ import gzip
from io import BytesIO
import struct
+import six
from six.moves import xrange
_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
@@ -100,10 +101,15 @@ def snappy_encode(payload, xerial_compatible=True, xerial_blocksize=32*1024):
out.write(struct.pack('!' + fmt, dat))
# Chunk through buffers to avoid creating intermediate slice copies
- for chunk in (buffer(payload, i, xerial_blocksize)
+ if six.PY2:
+ chunker = lambda payload, i, size: buffer(payload, i, size)
+ else:
+ chunker = lambda payload, i, size: memoryview(payload)[i:size+i].tobytes()
+
+ for chunk in (chunker(payload, i, xerial_blocksize)
for i in xrange(0, len(payload), xerial_blocksize)):
- block = snappy.compress(chunk)
+ block = snappy.compress(chunk) # this wont accept a raw memoryview...?
block_size = len(block)
out.write(struct.pack('!i', block_size))
out.write(block)