summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStevan Milic <stevan.milic@yahoo.com>2019-05-29 15:45:09 +0200
committerAsif Saif Uddin <auvipy@gmail.com>2019-05-29 19:45:09 +0600
commit73d2219887dfc5469c0c1ea382baf4749049aef2 (patch)
treee274349baba656902f6c061d6d3dd916e41679f7
parentd4eab78ab47a5a008afccb5671ab3c5cd311c516 (diff)
downloadkombu-73d2219887dfc5469c0c1ea382baf4749049aef2.tar.gz
Use SIMEMBERS instead of SMEMBERS to check for queue (redis broker) (#1041)
* Add `_lookup_direct` method to virtual channel. (#994) Add possibility to optimize lookup for queue in direct exchange set. * Add `_lookup_direct` method to redis virtual channel. (#994) Use `SISMEMBER` instead of `SMEMBERS` command to check if queue exists in a set. Time complexity is increased from O(N) to O(1) where N is the set cardinality.
-rw-r--r--kombu/transport/redis.py18
-rw-r--r--kombu/transport/virtual/base.py16
-rw-r--r--kombu/transport/virtual/exchange.py2
-rw-r--r--t/unit/transport/test_redis.py3
4 files changed, 38 insertions, 1 deletions
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 7d3cdab3..37524b5b 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -829,6 +829,24 @@ class Channel(virtual.Channel):
raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key))
return [tuple(bytes_to_str(val).split(self.sep)) for val in values]
+ def _lookup_direct(self, exchange, routing_key):
+ if not exchange:
+ return [routing_key]
+
+ key = self.keyprefix_queue % exchange
+ pattern = ''
+ queue = routing_key
+ queue_bind = self.sep.join([
+ routing_key or '',
+ pattern,
+ queue or '',
+ ])
+ with self.conn_or_acquire() as client:
+ if client.sismember(key, queue_bind):
+ return [queue]
+
+ return []
+
def _purge(self, queue):
with self.conn_or_acquire() as client:
with client.pipeline() as pipe:
diff --git a/kombu/transport/virtual/base.py b/kombu/transport/virtual/base.py
index 61ede626..9450cad2 100644
--- a/kombu/transport/virtual/base.py
+++ b/kombu/transport/virtual/base.py
@@ -725,6 +725,22 @@ class Channel(AbstractChannel, base.StdChannel):
R = [default]
return R
+ def _lookup_direct(self, exchange, routing_key):
+ """Find queue matching `routing_key` for given direct `exchange`.
+
+ Returns:
+ str: queue name
+ """
+ if not exchange:
+ return [routing_key]
+
+ return self.exchange_types['direct'].lookup(
+ table=self.get_table(exchange),
+ exchange=exchange,
+ routing_key=routing_key,
+ default=None,
+ )
+
def _restore(self, message):
"""Redeliver message to its original destination."""
delivery_info = message.delivery_info
diff --git a/kombu/transport/virtual/exchange.py b/kombu/transport/virtual/exchange.py
index 51909a1d..4f9e4fb7 100644
--- a/kombu/transport/virtual/exchange.py
+++ b/kombu/transport/virtual/exchange.py
@@ -65,7 +65,7 @@ class DirectExchange(ExchangeType):
}
def deliver(self, message, exchange, routing_key, **kwargs):
- _lookup = self.channel._lookup
+ _lookup = self.channel._lookup_direct
_put = self.channel._put
for queue in _lookup(exchange, routing_key):
_put(queue, message, **kwargs)
diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py
index 25718787..1dcfe71e 100644
--- a/t/unit/transport/test_redis.py
+++ b/t/unit/transport/test_redis.py
@@ -88,6 +88,9 @@ class Client(object):
def smembers(self, key):
return self.sets.get(key, set())
+ def sismember(self, name, value):
+ return value in self.sets.get(name, set())
+
def ping(self, *args, **kwargs):
return True