diff options
author | Sergey Prokazov <prokazov@users.noreply.github.com> | 2023-02-06 05:46:45 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-02-06 13:46:45 +0200 |
commit | ffbe879549c8a212ba70e25ee9a0367187753669 (patch) | |
tree | 2503035cf4bb98b627337b29116c0dc382eb07da | |
parent | 31a1c0b7908e157f80e39a00597defa4d76c5ca1 (diff) | |
download | redis-py-ffbe879549c8a212ba70e25ee9a0367187753669.tar.gz |
Use hiredis::pack_command to serialized the commands. (#2570)
* Implemented pack command and pack bytes
* 1) refactored the command packer construction process
2) now hiredis.pack_bytes is the default choice. Though it's still possible to run redisrs-py (fix the flag in utils.py) or hiredis.pack_command (flag in connection.py)
* Switch to hiredis.pack_command
* Remove the rust extension module.
* 1) Introduce HIREDIS_PACK_AVAILABLE environment variable.
2) Extract serialization functionality out of Connection class.
* 1) Fix typo.
2) Add change log entry.
3) Revert the benchmark changes
* Ditch the hiredis version check for pack_command.
* Fix linter errors
* Revert version changes
* Fix linter issues
* Looks like the current redis-py version is 4.4.1
---------
Co-authored-by: Sergey Prokazov <sergey.prokazov@redis.com>
-rw-r--r-- | CHANGES | 1 | ||||
-rw-r--r--[-rwxr-xr-x] | redis/connection.py | 135 | ||||
-rw-r--r-- | redis/utils.py | 2 | ||||
-rw-r--r-- | tests/test_encoding.py | 5 |
4 files changed, 98 insertions, 45 deletions
@@ -1,3 +1,4 @@ + * Use hiredis-py pack_command if available. * Support `.unlink()` in ClusterPipeline * Simplify synchronous SocketBuffer state management * Fix string cleanse in Redis Graph diff --git a/redis/connection.py b/redis/connection.py index 57f0a3a..114221d 100755..100644 --- a/redis/connection.py +++ b/redis/connection.py @@ -3,6 +3,7 @@ import errno import io import os import socket +import sys import threading import weakref from io import SEEK_END @@ -32,7 +33,12 @@ from redis.exceptions import ( TimeoutError, ) from redis.retry import Retry -from redis.utils import CRYPTOGRAPHY_AVAILABLE, HIREDIS_AVAILABLE, str_if_bytes +from redis.utils import ( + CRYPTOGRAPHY_AVAILABLE, + HIREDIS_AVAILABLE, + HIREDIS_PACK_AVAILABLE, + str_if_bytes, +) try: import ssl @@ -509,6 +515,75 @@ else: DefaultParser = PythonParser +class HiredisRespSerializer: + def pack(self, *args): + """Pack a series of arguments into the Redis protocol""" + output = [] + + if isinstance(args[0], str): + args = tuple(args[0].encode().split()) + args[1:] + elif b" " in args[0]: + args = tuple(args[0].split()) + args[1:] + try: + output.append(hiredis.pack_command(args)) + except TypeError: + _, value, traceback = sys.exc_info() + raise DataError(value).with_traceback(traceback) + + return output + + +class PythonRespSerializer: + def __init__(self, buffer_cutoff, encode) -> None: + self._buffer_cutoff = buffer_cutoff + self.encode = encode + + def pack(self, *args): + """Pack a series of arguments into the Redis protocol""" + output = [] + # the client might have included 1 or more literal arguments in + # the command name, e.g., 'CONFIG GET'. The Redis server expects these + # arguments to be sent separately, so split the first argument + # manually. These arguments should be bytestrings so that they are + # not encoded. + if isinstance(args[0], str): + args = tuple(args[0].encode().split()) + args[1:] + elif b" " in args[0]: + args = tuple(args[0].split()) + args[1:] + + buff = SYM_EMPTY.join((SYM_STAR, str(len(args)).encode(), SYM_CRLF)) + + buffer_cutoff = self._buffer_cutoff + for arg in map(self.encode, args): + # to avoid large string mallocs, chunk the command into the + # output list if we're sending large values or memoryviews + arg_length = len(arg) + if ( + len(buff) > buffer_cutoff + or arg_length > buffer_cutoff + or isinstance(arg, memoryview) + ): + buff = SYM_EMPTY.join( + (buff, SYM_DOLLAR, str(arg_length).encode(), SYM_CRLF) + ) + output.append(buff) + output.append(arg) + buff = SYM_CRLF + else: + buff = SYM_EMPTY.join( + ( + buff, + SYM_DOLLAR, + str(arg_length).encode(), + SYM_CRLF, + arg, + SYM_CRLF, + ) + ) + output.append(buff) + return output + + class Connection: "Manages TCP communication to and from a Redis server" @@ -536,6 +611,7 @@ class Connection: retry=None, redis_connect_func=None, credential_provider: Optional[CredentialProvider] = None, + command_packer=None, ): """ Initialize a new Connection. @@ -590,6 +666,7 @@ class Connection: self.set_parser(parser_class) self._connect_callbacks = [] self._buffer_cutoff = 6000 + self._command_packer = self._construct_command_packer(command_packer) def __repr__(self): repr_args = ",".join([f"{k}={v}" for k, v in self.repr_pieces()]) @@ -607,6 +684,14 @@ class Connection: except Exception: pass + def _construct_command_packer(self, packer): + if packer is not None: + return packer + elif HIREDIS_PACK_AVAILABLE: + return HiredisRespSerializer() + else: + return PythonRespSerializer(self._buffer_cutoff, self.encoder.encode) + def register_connect_callback(self, callback): self._connect_callbacks.append(weakref.WeakMethod(callback)) @@ -827,7 +912,8 @@ class Connection: def send_command(self, *args, **kwargs): """Pack and send a command to the Redis server""" self.send_packed_command( - self.pack_command(*args), check_health=kwargs.get("check_health", True) + self._command_packer.pack(*args), + check_health=kwargs.get("check_health", True), ) def can_read(self, timeout=0): @@ -872,48 +958,7 @@ class Connection: def pack_command(self, *args): """Pack a series of arguments into the Redis protocol""" - output = [] - # the client might have included 1 or more literal arguments in - # the command name, e.g., 'CONFIG GET'. The Redis server expects these - # arguments to be sent separately, so split the first argument - # manually. These arguments should be bytestrings so that they are - # not encoded. - if isinstance(args[0], str): - args = tuple(args[0].encode().split()) + args[1:] - elif b" " in args[0]: - args = tuple(args[0].split()) + args[1:] - - buff = SYM_EMPTY.join((SYM_STAR, str(len(args)).encode(), SYM_CRLF)) - - buffer_cutoff = self._buffer_cutoff - for arg in map(self.encoder.encode, args): - # to avoid large string mallocs, chunk the command into the - # output list if we're sending large values or memoryviews - arg_length = len(arg) - if ( - len(buff) > buffer_cutoff - or arg_length > buffer_cutoff - or isinstance(arg, memoryview) - ): - buff = SYM_EMPTY.join( - (buff, SYM_DOLLAR, str(arg_length).encode(), SYM_CRLF) - ) - output.append(buff) - output.append(arg) - buff = SYM_CRLF - else: - buff = SYM_EMPTY.join( - ( - buff, - SYM_DOLLAR, - str(arg_length).encode(), - SYM_CRLF, - arg, - SYM_CRLF, - ) - ) - output.append(buff) - return output + return self._command_packer.pack(*args) def pack_commands(self, commands): """Pack multiple commands into the Redis protocol""" @@ -923,7 +968,7 @@ class Connection: buffer_cutoff = self._buffer_cutoff for cmd in commands: - for chunk in self.pack_command(*cmd): + for chunk in self._command_packer.pack(*cmd): chunklen = len(chunk) if ( buffer_length > buffer_cutoff diff --git a/redis/utils.py b/redis/utils.py index 693d4e6..d95e62c 100644 --- a/redis/utils.py +++ b/redis/utils.py @@ -7,8 +7,10 @@ try: # Only support Hiredis >= 1.0: HIREDIS_AVAILABLE = not hiredis.__version__.startswith("0.") + HIREDIS_PACK_AVAILABLE = hasattr(hiredis, "pack_command") except ImportError: HIREDIS_AVAILABLE = False + HIREDIS_PACK_AVAILABLE = False try: import cryptography # noqa diff --git a/tests/test_encoding.py b/tests/test_encoding.py index 2867640..cb9c4e2 100644 --- a/tests/test_encoding.py +++ b/tests/test_encoding.py @@ -2,6 +2,7 @@ import pytest import redis from redis.connection import Connection +from redis.utils import HIREDIS_PACK_AVAILABLE from .conftest import _get_client @@ -75,6 +76,10 @@ class TestEncodingErrors: assert r.get("a") == "foo\ufffd" +@pytest.mark.skipif( + HIREDIS_PACK_AVAILABLE, + reason="Packing via hiredis does not preserve memoryviews", +) class TestMemoryviewsAreNotPacked: def test_memoryviews_are_not_packed(self): c = Connection() |