From 73d2219887dfc5469c0c1ea382baf4749049aef2 Mon Sep 17 00:00:00 2001 From: Stevan Milic Date: Wed, 29 May 2019 15:45:09 +0200 Subject: Use SIMEMBERS instead of SMEMBERS to check for queue (redis broker) (#1041) * Add `_lookup_direct` method to virtual channel. (#994) Add possibility to optimize lookup for queue in direct exchange set. * Add `_lookup_direct` method to redis virtual channel. (#994) Use `SISMEMBER` instead of `SMEMBERS` command to check if queue exists in a set. Time complexity is increased from O(N) to O(1) where N is the set cardinality. --- kombu/transport/redis.py | 18 ++++++++++++++++++ kombu/transport/virtual/base.py | 16 ++++++++++++++++ kombu/transport/virtual/exchange.py | 2 +- t/unit/transport/test_redis.py | 3 +++ 4 files changed, 38 insertions(+), 1 deletion(-) diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 7d3cdab3..37524b5b 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -829,6 +829,24 @@ class Channel(virtual.Channel): raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key)) return [tuple(bytes_to_str(val).split(self.sep)) for val in values] + def _lookup_direct(self, exchange, routing_key): + if not exchange: + return [routing_key] + + key = self.keyprefix_queue % exchange + pattern = '' + queue = routing_key + queue_bind = self.sep.join([ + routing_key or '', + pattern, + queue or '', + ]) + with self.conn_or_acquire() as client: + if client.sismember(key, queue_bind): + return [queue] + + return [] + def _purge(self, queue): with self.conn_or_acquire() as client: with client.pipeline() as pipe: diff --git a/kombu/transport/virtual/base.py b/kombu/transport/virtual/base.py index 61ede626..9450cad2 100644 --- a/kombu/transport/virtual/base.py +++ b/kombu/transport/virtual/base.py @@ -725,6 +725,22 @@ class Channel(AbstractChannel, base.StdChannel): R = [default] return R + def _lookup_direct(self, exchange, routing_key): + """Find queue matching `routing_key` for given direct `exchange`. + + Returns: + str: queue name + """ + if not exchange: + return [routing_key] + + return self.exchange_types['direct'].lookup( + table=self.get_table(exchange), + exchange=exchange, + routing_key=routing_key, + default=None, + ) + def _restore(self, message): """Redeliver message to its original destination.""" delivery_info = message.delivery_info diff --git a/kombu/transport/virtual/exchange.py b/kombu/transport/virtual/exchange.py index 51909a1d..4f9e4fb7 100644 --- a/kombu/transport/virtual/exchange.py +++ b/kombu/transport/virtual/exchange.py @@ -65,7 +65,7 @@ class DirectExchange(ExchangeType): } def deliver(self, message, exchange, routing_key, **kwargs): - _lookup = self.channel._lookup + _lookup = self.channel._lookup_direct _put = self.channel._put for queue in _lookup(exchange, routing_key): _put(queue, message, **kwargs) diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py index 25718787..1dcfe71e 100644 --- a/t/unit/transport/test_redis.py +++ b/t/unit/transport/test_redis.py @@ -88,6 +88,9 @@ class Client(object): def smembers(self, key): return self.sets.get(key, set()) + def sismember(self, name, value): + return value in self.sets.get(name, set()) + def ping(self, *args, **kwargs): return True -- cgit v1.2.1