diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-25 22:11:18 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-25 22:11:18 -0800 |
commit | ccf0de0f4d0e41ec7cded0ef0f053f3e8cf9b6f1 (patch) | |
tree | aea053870008f097f23c8b119b910b0bceae6299 | |
parent | a2e9eb5214da94ee8d71a66315ed4a8bf08baf5a (diff) | |
download | kafka-python-ccf0de0f4d0e41ec7cded0ef0f053f3e8cf9b6f1.tar.gz |
python-snappy does not like buffer-slices on pypy...
-rw-r--r-- | kafka/codec.py | 14 |
1 files changed, 12 insertions, 2 deletions
diff --git a/kafka/codec.py b/kafka/codec.py index 09075e1..5d2c8fc 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -1,5 +1,6 @@ import gzip from io import BytesIO +import platform import struct import six @@ -22,6 +23,7 @@ except ImportError: lz4_encode = None lz4_decode = None +PYPY = bool(platform.python_implementation() == 'PyPy') def has_gzip(): return True @@ -101,17 +103,25 @@ def snappy_encode(payload, xerial_compatible=True, xerial_blocksize=32*1024): out.write(struct.pack('!' + fmt, dat)) # Chunk through buffers to avoid creating intermediate slice copies - if six.PY2: + if PYPY: + # on pypy, snappy.compress() on a sliced buffer consumes the entire + # buffer... likely a python-snappy bug, so just use a slice copy + chunker = lambda payload, i, size: payload[i:size+i] + + elif six.PY2: + # Sliced buffer avoids additional copies # pylint: disable-msg=undefined-variable chunker = lambda payload, i, size: buffer(payload, i, size) else: + # snappy.compress does not like raw memoryviews, so we have to convert + # tobytes, which is a copy... oh well. it's the thought that counts. # pylint: disable-msg=undefined-variable chunker = lambda payload, i, size: memoryview(payload)[i:size+i].tobytes() for chunk in (chunker(payload, i, xerial_blocksize) for i in xrange(0, len(payload), xerial_blocksize)): - block = snappy.compress(chunk) # this wont accept a raw memoryview...? + block = snappy.compress(chunk) block_size = len(block) out.write(struct.pack('!i', block_size)) out.write(block) |