summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGábor Boros <gabor.brs@gmail.com>2021-08-25 16:08:17 +0400
committerGitHub <noreply@github.com>2021-08-25 14:08:17 +0200
commit39584a194055bc42f4836a5213eb0be141001500 (patch)
tree2539852339806cf65e540608dab8177b7693502f
parent8d15a6602dd48c40cfa3105852a81e4369102cb3 (diff)
downloadkombu-39584a194055bc42f4836a5213eb0be141001500.tar.gz
Add global key prefix for keys set by Redis transporter (#1349)
* Introduce global key prefix for redis transport Co-authored-by: Matus Valo <matusvalo@users.noreply.github.com> * refactor: use a custom redis client As per the suggestions, refactor the redis key prefixing to use a custom redis client that prefixes the keys it uses. The custom client implementation does not prefix every key by default as the way of prefixing keys may differ for some redis commands, instead it lists those keys that will be prefixed. In case of commands, where multiple keys can be passed as an argument, the custom client defines where the arg positions are starting and ending for the given command. * test: fix unit tests by moving import statement * fix: wrap redis.parse_response to remove key prefixes Co-authored-by: Matus Valo <matusvalo@users.noreply.github.com> * fix: typo * fix: lint Co-authored-by: Antonin Delpeuch <antonin@delpeuch.eu> Co-authored-by: Matus Valo <matusvalo@users.noreply.github.com> Co-authored-by: Jillian Vogel <jill@opencraft.com>
-rw-r--r--AUTHORS1
-rw-r--r--kombu/transport/redis.py125
-rw-r--r--t/unit/transport/test_redis.py94
3 files changed, 218 insertions, 2 deletions
diff --git a/AUTHORS b/AUTHORS
index 34a47ab1..90ae30bd 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -53,6 +53,7 @@ Fernando Jorge Mota <f.j.mota13@gmail.com>
Flavio [FlaPer87] Percoco Premoli <flaper87@flaper87.org>
Florian Munz <surf@theflow.de>
Franck Cuny <fcuny@saymedia.com>
+Gábor Boros <gabor.brs@gmail.com>
Germán M. Bravo <german.mb@gmail.com>
Gregory Haskins <greg@greghaskins.com>
Hank John <jindongh@gmail.com>
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 5b4e1bca..02d06b45 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -38,6 +38,8 @@ Transport Options
* ``unacked_restore_limit``
* ``fanout_prefix``
* ``fanout_patterns``
+* ``global_keyprefix``: (str) The global key prefix to be prepended to all keys
+ used by Kombu
* ``socket_timeout``
* ``socket_connect_timeout``
* ``socket_keepalive``
@@ -49,6 +51,7 @@ Transport Options
* ``priority_steps``
"""
+import functools
import numbers
import socket
from bisect import bisect
@@ -179,6 +182,108 @@ def _after_fork_cleanup_channel(channel):
channel._after_fork()
+class GlobalKeyPrefixMixin:
+ """Mixin to provide common logic for global key prefixing.
+
+ Overriding all the methods used by Kombu with the same key prefixing logic
+ would be cumbersome and inefficient. Hence, we override the command
+ execution logic that is called by all commands.
+ """
+
+ PREFIXED_SIMPLE_COMMANDS = [
+ "HDEL",
+ "HGET",
+ "HSET",
+ "LLEN",
+ "LPUSH",
+ "PUBLISH",
+ "SADD",
+ "SET",
+ "SMEMBERS",
+ "ZADD",
+ "ZREM",
+ "ZREVRANGEBYSCORE",
+ ]
+
+ PREFIXED_COMPLEX_COMMANDS = {
+ "BRPOP": {"args_start": 0, "args_end": -1},
+ "EVALSHA": {"args_start": 2, "args_end": 3},
+ }
+
+ def _prefix_args(self, args):
+ args = list(args)
+ command = args.pop(0)
+
+ if command in self.PREFIXED_SIMPLE_COMMANDS:
+ args[0] = self.global_keyprefix + str(args[0])
+
+ if command in self.PREFIXED_COMPLEX_COMMANDS.keys():
+ args_start = self.PREFIXED_COMPLEX_COMMANDS[command]["args_start"]
+ args_end = self.PREFIXED_COMPLEX_COMMANDS[command]["args_end"]
+
+ pre_args = args[:args_start] if args_start > 0 else []
+
+ if args_end is not None:
+ post_args = args[args_end:]
+ elif args_end < 0:
+ post_args = args[len(args):]
+ else:
+ post_args = []
+
+ args = pre_args + [
+ self.global_keyprefix + str(arg)
+ for arg in args[args_start:args_end]
+ ] + post_args
+
+ return [command, *args]
+
+ def parse_response(self, connection, command_name, **options):
+ """Parse a response from the Redis server.
+
+ Method wraps ``redis.parse_response()`` to remove prefixes of keys
+ returned by redis command.
+ """
+ ret = super().parse_response(connection, command_name, **options)
+ if command_name == 'BRPOP' and ret:
+ key, value = ret
+ key = key[len(self.global_keyprefix):]
+ return key, value
+ return ret
+
+ def execute_command(self, *args, **kwargs):
+ return super().execute_command(*self._prefix_args(args), **kwargs)
+
+ def pipeline(self, transaction=True, shard_hint=None):
+ return PrefixedRedisPipeline(
+ self.connection_pool,
+ self.response_callbacks,
+ transaction,
+ shard_hint,
+ global_keyprefix=self.global_keyprefix,
+ )
+
+
+class PrefixedStrictRedis(GlobalKeyPrefixMixin, redis.Redis):
+ """Returns a ``StrictRedis`` client that prefixes the keys it uses."""
+
+ def __init__(self, *args, **kwargs):
+ self.global_keyprefix = kwargs.pop('global_keyprefix', '')
+ redis.Redis.__init__(self, *args, **kwargs)
+
+
+class PrefixedRedisPipeline(GlobalKeyPrefixMixin, redis.client.Pipeline):
+ """Custom Redis pipeline that takes global_keyprefix into consideration.
+
+ As the ``PrefixedStrictRedis`` client uses the `global_keyprefix` to prefix
+ the keys it uses, the pipeline called by the client must be able to prefix
+ the keys as well.
+ """
+
+ def __init__(self, *args, **kwargs):
+ self.global_keyprefix = kwargs.pop('global_keyprefix', '')
+ redis.client.Pipeline.__init__(self, *args, **kwargs)
+
+
class QoS(virtual.QoS):
"""Redis Ack Emulation."""
@@ -485,6 +590,11 @@ class Channel(virtual.Channel):
#: Disable for backwards compatibility with Kombu 3.x.
fanout_patterns = True
+ #: The global key prefix will be prepended to all keys used
+ #: by Kombu, which can be useful when a redis database is shared
+ #: by different users. By default, no prefix is prepended.
+ global_keyprefix = ''
+
#: Order in which we consume from queues.
#:
#: Can be either string alias, or a cycle strategy class
@@ -526,6 +636,7 @@ class Channel(virtual.Channel):
'unacked_restore_limit',
'fanout_prefix',
'fanout_patterns',
+ 'global_keyprefix',
'socket_timeout',
'socket_connect_timeout',
'socket_keepalive',
@@ -769,7 +880,12 @@ class Channel(virtual.Channel):
keys = [self._q_for_pri(queue, pri) for pri in self.priority_steps
for queue in queues] + [timeout or 0]
self._in_poll = self.client.connection
- self.client.connection.send_command('BRPOP', *keys)
+
+ command_args = ['BRPOP', *keys]
+ if self.global_keyprefix:
+ command_args = self.client._prefix_args(command_args)
+
+ self.client.connection.send_command(*command_args)
def _brpop_read(self, **options):
try:
@@ -1025,6 +1141,13 @@ class Channel(virtual.Channel):
raise VersionMismatch(
'Redis transport requires redis-py versions 3.2.0 or later. '
'You have {0.__version__}'.format(redis))
+
+ if self.global_keyprefix:
+ return functools.partial(
+ PrefixedStrictRedis,
+ global_keyprefix=self.global_keyprefix,
+ )
+
return redis.StrictRedis
@contextmanager
diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py
index 52ea7f81..3e1f76f5 100644
--- a/t/unit/transport/test_redis.py
+++ b/t/unit/transport/test_redis.py
@@ -746,7 +746,7 @@ class test_Channel:
def test_get_client(self):
import redis as R
KombuRedis = redis.Channel._get_client(self.channel)
- assert KombuRedis
+ assert isinstance(KombuRedis(), R.StrictRedis)
Rv = getattr(R, 'VERSION', None)
try:
@@ -757,6 +757,12 @@ class test_Channel:
if Rv is not None:
R.VERSION = Rv
+ def test_get_prefixed_client(self):
+ from kombu.transport.redis import PrefixedStrictRedis
+ self.channel.global_keyprefix = "test_"
+ PrefixedRedis = redis.Channel._get_client(self.channel)
+ assert isinstance(PrefixedRedis(), PrefixedStrictRedis)
+
def test_get_response_error(self):
from redis.exceptions import ResponseError
assert redis.Channel._get_response_error(self.channel) is ResponseError
@@ -926,6 +932,43 @@ class test_Channel:
('celery', '', 'celery'),
]
+ @patch("redis.StrictRedis.execute_command")
+ def test_global_keyprefix(self, mock_execute_command):
+ from kombu.transport.redis import PrefixedStrictRedis
+
+ with Connection(transport=Transport) as conn:
+ client = PrefixedStrictRedis(global_keyprefix='foo_')
+
+ channel = conn.channel()
+ channel._create_client = Mock()
+ channel._create_client.return_value = client
+
+ body = {'hello': 'world'}
+ channel._put_fanout('exchange', body, '')
+ mock_execute_command.assert_called_with(
+ 'PUBLISH',
+ 'foo_/{db}.exchange',
+ dumps(body)
+ )
+
+ @patch("redis.StrictRedis.execute_command")
+ def test_global_keyprefix_queue_bind(self, mock_execute_command):
+ from kombu.transport.redis import PrefixedStrictRedis
+
+ with Connection(transport=Transport) as conn:
+ client = PrefixedStrictRedis(global_keyprefix='foo_')
+
+ channel = conn.channel()
+ channel._create_client = Mock()
+ channel._create_client.return_value = client
+
+ channel._queue_bind('default', '', None, 'queue')
+ mock_execute_command.assert_called_with(
+ 'SADD',
+ 'foo__kombu.binding.default',
+ '\x06\x16\x06\x16queue'
+ )
+
class test_Redis:
@@ -1500,3 +1543,52 @@ class test_RedisSentinel:
from kombu.transport.redis import SentinelManagedSSLConnection
assert (params['connection_class'] is
SentinelManagedSSLConnection)
+
+
+class test_GlobalKeyPrefixMixin:
+
+ from kombu.transport.redis import GlobalKeyPrefixMixin
+
+ global_keyprefix = "prefix_"
+ mixin = GlobalKeyPrefixMixin()
+ mixin.global_keyprefix = global_keyprefix
+
+ def test_prefix_simple_args(self):
+ for command in self.mixin.PREFIXED_SIMPLE_COMMANDS:
+ prefixed_args = self.mixin._prefix_args([command, "fake_key"])
+ assert prefixed_args == [
+ command,
+ f"{self.global_keyprefix}fake_key"
+ ]
+
+ def test_prefix_brpop_args(self):
+ prefixed_args = self.mixin._prefix_args([
+ "BRPOP",
+ "fake_key",
+ "fake_key2",
+ "not_prefixed"
+ ])
+
+ assert prefixed_args == [
+ "BRPOP",
+ f"{self.global_keyprefix}fake_key",
+ f"{self.global_keyprefix}fake_key2",
+ "not_prefixed",
+ ]
+
+ def test_prefix_evalsha_args(self):
+ prefixed_args = self.mixin._prefix_args([
+ "EVALSHA",
+ "not_prefixed",
+ "not_prefixed",
+ "fake_key",
+ "not_prefixed",
+ ])
+
+ assert prefixed_args == [
+ "EVALSHA",
+ "not_prefixed",
+ "not_prefixed",
+ f"{self.global_keyprefix}fake_key",
+ "not_prefixed",
+ ]