summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-25 22:11:18 -0800
committerDana Powers <dana.powers@rd.io>2016-01-25 22:11:18 -0800
commitccf0de0f4d0e41ec7cded0ef0f053f3e8cf9b6f1 (patch)
treeaea053870008f097f23c8b119b910b0bceae6299
parenta2e9eb5214da94ee8d71a66315ed4a8bf08baf5a (diff)
downloadkafka-python-ccf0de0f4d0e41ec7cded0ef0f053f3e8cf9b6f1.tar.gz
python-snappy does not like buffer-slices on pypy...
-rw-r--r--kafka/codec.py14
1 files changed, 12 insertions, 2 deletions
diff --git a/kafka/codec.py b/kafka/codec.py
index 09075e1..5d2c8fc 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -1,5 +1,6 @@
import gzip
from io import BytesIO
+import platform
import struct
import six
@@ -22,6 +23,7 @@ except ImportError:
lz4_encode = None
lz4_decode = None
+PYPY = bool(platform.python_implementation() == 'PyPy')
def has_gzip():
return True
@@ -101,17 +103,25 @@ def snappy_encode(payload, xerial_compatible=True, xerial_blocksize=32*1024):
out.write(struct.pack('!' + fmt, dat))
# Chunk through buffers to avoid creating intermediate slice copies
- if six.PY2:
+ if PYPY:
+ # on pypy, snappy.compress() on a sliced buffer consumes the entire
+ # buffer... likely a python-snappy bug, so just use a slice copy
+ chunker = lambda payload, i, size: payload[i:size+i]
+
+ elif six.PY2:
+ # Sliced buffer avoids additional copies
# pylint: disable-msg=undefined-variable
chunker = lambda payload, i, size: buffer(payload, i, size)
else:
+ # snappy.compress does not like raw memoryviews, so we have to convert
+ # tobytes, which is a copy... oh well. it's the thought that counts.
# pylint: disable-msg=undefined-variable
chunker = lambda payload, i, size: memoryview(payload)[i:size+i].tobytes()
for chunk in (chunker(payload, i, xerial_blocksize)
for i in xrange(0, len(payload), xerial_blocksize)):
- block = snappy.compress(chunk) # this wont accept a raw memoryview...?
+ block = snappy.compress(chunk)
block_size = len(block)
out.write(struct.pack('!i', block_size))
out.write(block)