summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsif Saif Uddin <auvipy@gmail.com>2019-06-29 12:37:58 +0600
committerGitHub <noreply@github.com>2019-06-29 12:37:58 +0600
commitd9d5bd65065b65c62e9197849b918af77e720718 (patch)
treef5d1655028a7ca6e06acc6a3449d6683c5de3d6f
parentac6c7b975469cf335b88760ec349ab3b149105fa (diff)
downloadkombu-revert-1058-revert-1041-master.tar.gz
Revert "Revert "Use SIMEMBERS instead of SMEMBERS to check for queue (redis broker) (#1041)" (#1058)"revert-1058-revert-1041-master
This reverts commit fcb8e05cc95bcaf18a23092416f654b88b015163.
-rw-r--r--kombu/transport/redis.py18
-rw-r--r--kombu/transport/virtual/base.py16
-rw-r--r--kombu/transport/virtual/exchange.py2
-rw-r--r--t/unit/transport/test_redis.py3
4 files changed, 38 insertions, 1 deletions
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 7d3cdab3..37524b5b 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -829,6 +829,24 @@ class Channel(virtual.Channel):
raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key))
return [tuple(bytes_to_str(val).split(self.sep)) for val in values]
+ def _lookup_direct(self, exchange, routing_key):
+ if not exchange:
+ return [routing_key]
+
+ key = self.keyprefix_queue % exchange
+ pattern = ''
+ queue = routing_key
+ queue_bind = self.sep.join([
+ routing_key or '',
+ pattern,
+ queue or '',
+ ])
+ with self.conn_or_acquire() as client:
+ if client.sismember(key, queue_bind):
+ return [queue]
+
+ return []
+
def _purge(self, queue):
with self.conn_or_acquire() as client:
with client.pipeline() as pipe:
diff --git a/kombu/transport/virtual/base.py b/kombu/transport/virtual/base.py
index 61ede626..9450cad2 100644
--- a/kombu/transport/virtual/base.py
+++ b/kombu/transport/virtual/base.py
@@ -725,6 +725,22 @@ class Channel(AbstractChannel, base.StdChannel):
R = [default]
return R
+ def _lookup_direct(self, exchange, routing_key):
+ """Find queue matching `routing_key` for given direct `exchange`.
+
+ Returns:
+ str: queue name
+ """
+ if not exchange:
+ return [routing_key]
+
+ return self.exchange_types['direct'].lookup(
+ table=self.get_table(exchange),
+ exchange=exchange,
+ routing_key=routing_key,
+ default=None,
+ )
+
def _restore(self, message):
"""Redeliver message to its original destination."""
delivery_info = message.delivery_info
diff --git a/kombu/transport/virtual/exchange.py b/kombu/transport/virtual/exchange.py
index 51909a1d..4f9e4fb7 100644
--- a/kombu/transport/virtual/exchange.py
+++ b/kombu/transport/virtual/exchange.py
@@ -65,7 +65,7 @@ class DirectExchange(ExchangeType):
}
def deliver(self, message, exchange, routing_key, **kwargs):
- _lookup = self.channel._lookup
+ _lookup = self.channel._lookup_direct
_put = self.channel._put
for queue in _lookup(exchange, routing_key):
_put(queue, message, **kwargs)
diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py
index 25718787..1dcfe71e 100644
--- a/t/unit/transport/test_redis.py
+++ b/t/unit/transport/test_redis.py
@@ -88,6 +88,9 @@ class Client(object):
def smembers(self, key):
return self.sets.get(key, set())
+ def sismember(self, name, value):
+ return value in self.sets.get(name, set())
+
def ping(self, *args, **kwargs):
return True