diff options
author | Asif Saif Uddin <auvipy@gmail.com> | 2019-06-29 12:37:58 +0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-06-29 12:37:58 +0600 |
commit | d9d5bd65065b65c62e9197849b918af77e720718 (patch) | |
tree | f5d1655028a7ca6e06acc6a3449d6683c5de3d6f | |
parent | ac6c7b975469cf335b88760ec349ab3b149105fa (diff) | |
download | kombu-d9d5bd65065b65c62e9197849b918af77e720718.tar.gz |
Revert "Revert "Use SIMEMBERS instead of SMEMBERS to check for queue (redis broker) (#1041)" (#1058)"revert-1058-revert-1041-master
This reverts commit fcb8e05cc95bcaf18a23092416f654b88b015163.
-rw-r--r-- | kombu/transport/redis.py | 18 | ||||
-rw-r--r-- | kombu/transport/virtual/base.py | 16 | ||||
-rw-r--r-- | kombu/transport/virtual/exchange.py | 2 | ||||
-rw-r--r-- | t/unit/transport/test_redis.py | 3 |
4 files changed, 38 insertions, 1 deletions
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 7d3cdab3..37524b5b 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -829,6 +829,24 @@ class Channel(virtual.Channel): raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key)) return [tuple(bytes_to_str(val).split(self.sep)) for val in values] + def _lookup_direct(self, exchange, routing_key): + if not exchange: + return [routing_key] + + key = self.keyprefix_queue % exchange + pattern = '' + queue = routing_key + queue_bind = self.sep.join([ + routing_key or '', + pattern, + queue or '', + ]) + with self.conn_or_acquire() as client: + if client.sismember(key, queue_bind): + return [queue] + + return [] + def _purge(self, queue): with self.conn_or_acquire() as client: with client.pipeline() as pipe: diff --git a/kombu/transport/virtual/base.py b/kombu/transport/virtual/base.py index 61ede626..9450cad2 100644 --- a/kombu/transport/virtual/base.py +++ b/kombu/transport/virtual/base.py @@ -725,6 +725,22 @@ class Channel(AbstractChannel, base.StdChannel): R = [default] return R + def _lookup_direct(self, exchange, routing_key): + """Find queue matching `routing_key` for given direct `exchange`. + + Returns: + str: queue name + """ + if not exchange: + return [routing_key] + + return self.exchange_types['direct'].lookup( + table=self.get_table(exchange), + exchange=exchange, + routing_key=routing_key, + default=None, + ) + def _restore(self, message): """Redeliver message to its original destination.""" delivery_info = message.delivery_info diff --git a/kombu/transport/virtual/exchange.py b/kombu/transport/virtual/exchange.py index 51909a1d..4f9e4fb7 100644 --- a/kombu/transport/virtual/exchange.py +++ b/kombu/transport/virtual/exchange.py @@ -65,7 +65,7 @@ class DirectExchange(ExchangeType): } def deliver(self, message, exchange, routing_key, **kwargs): - _lookup = self.channel._lookup + _lookup = self.channel._lookup_direct _put = self.channel._put for queue in _lookup(exchange, routing_key): _put(queue, message, **kwargs) diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py index 25718787..1dcfe71e 100644 --- a/t/unit/transport/test_redis.py +++ b/t/unit/transport/test_redis.py @@ -88,6 +88,9 @@ class Client(object): def smembers(self, key): return self.sets.get(key, set()) + def sismember(self, name, value): + return value in self.sets.get(name, set()) + def ping(self, *args, **kwargs): return True |