summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTaras Voinarovskyi <voyn1991@gmail.com>2018-02-21 23:05:31 +0200
committerGitHub <noreply@github.com>2018-02-21 23:05:31 +0200
commit0c0c7eae13f3b2b8e3ed7c443adef39cb6802a67 (patch)
tree2cb5e5660842baa9dcd08a7f716c3a4461f20de7
parent92635d9bfff5593ba865003dd3010a0feb280140 (diff)
downloadkafka-python-0c0c7eae13f3b2b8e3ed7c443adef39cb6802a67.tar.gz
Use hardware accelerated CRC32C function if available (#1389)
* Use hardware accelerated CRC32C function if available * Add doc notice of optional `crc32c` package
-rw-r--r--docs/install.rst13
-rw-r--r--kafka/record/util.py14
-rw-r--r--test/record/test_util.py5
-rw-r--r--tox.ini1
4 files changed, 28 insertions, 5 deletions
diff --git a/docs/install.rst b/docs/install.rst
index cc0e82d..fe740f6 100644
--- a/docs/install.rst
+++ b/docs/install.rst
@@ -70,3 +70,16 @@ Install the `python-snappy` module
.. code:: bash
pip install python-snappy
+
+
+Optional crc32c install
+***********************
+Highly recommended if you are using Kafka 11+ brokers. For those `kafka-python`
+uses a new message protocol version, that requires calculation of `crc32c`,
+which differs from `zlib.crc32` hash implementation. By default `kafka-python`
+calculates it in pure python, which is quite slow. To speed it up we optionally
+support https://pypi.python.org/pypi/crc32c package if it's installed.
+
+.. code:: bash
+
+ pip install crc32c
diff --git a/kafka/record/util.py b/kafka/record/util.py
index 55d7adb..74b9a69 100644
--- a/kafka/record/util.py
+++ b/kafka/record/util.py
@@ -1,6 +1,10 @@
import binascii
from kafka.record._crc32c import crc as crc32c_py
+try:
+ from crc32c import crc32 as crc32c_c
+except ImportError:
+ crc32c_c = None
def encode_varint(value, write):
@@ -113,11 +117,15 @@ def decode_varint(buffer, pos=0):
raise ValueError("Out of int64 range")
-def calc_crc32c(memview):
+_crc32c = crc32c_py
+if crc32c_c is not None:
+ _crc32c = crc32c_c
+
+
+def calc_crc32c(memview, _crc32c=_crc32c):
""" Calculate CRC-32C (Castagnoli) checksum over a memoryview of data
"""
- crc = crc32c_py(memview)
- return crc
+ return _crc32c(memview)
def calc_crc32(memview):
diff --git a/test/record/test_util.py b/test/record/test_util.py
index bfe0fcc..0b2782e 100644
--- a/test/record/test_util.py
+++ b/test/record/test_util.py
@@ -68,9 +68,10 @@ def test_size_of_varint(encoded, decoded):
assert util.size_of_varint(decoded) == len(encoded)
-def test_crc32c():
+@pytest.mark.parametrize("crc32_func", [util.crc32c_c, util.crc32c_py])
+def test_crc32c(crc32_func):
def make_crc(data):
- crc = util.calc_crc32c(data)
+ crc = crc32_func(data)
return struct.pack(">I", crc)
assert make_crc(b"") == b"\x00\x00\x00\x00"
assert make_crc(b"a") == b"\xc1\xd0\x43\x30"
diff --git a/tox.ini b/tox.ini
index 0f1aaf4..35dc842 100644
--- a/tox.ini
+++ b/tox.ini
@@ -18,6 +18,7 @@ deps =
python-snappy
lz4
xxhash
+ crc32c
py26: unittest2
commands =
py.test {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka --cov-config=.covrc}