summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsif Saif Uddin <auvipy@gmail.com>2019-06-11 17:48:49 +0600
committerGitHub <noreply@github.com>2019-06-11 17:48:49 +0600
commit37071c6f72aed386c130068ae3e28935d16d18e9 (patch)
treed852e30ccea8ebeff75c704779a4ce9df4e949d3
parentf9fbd8eb48f21f9e308013209b33758aa5ed47e5 (diff)
downloadkombu-revert-1041-master.tar.gz
Revert "Use SIMEMBERS instead of SMEMBERS to check for queue (redis broker) (#1041)"revert-1041-master
This reverts commit 73d2219887dfc5469c0c1ea382baf4749049aef2.
-rw-r--r--kombu/transport/redis.py18
-rw-r--r--kombu/transport/virtual/base.py16
-rw-r--r--kombu/transport/virtual/exchange.py2
-rw-r--r--t/unit/transport/test_redis.py3
4 files changed, 1 insertions, 38 deletions
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 37524b5b..7d3cdab3 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -829,24 +829,6 @@ class Channel(virtual.Channel):
raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key))
return [tuple(bytes_to_str(val).split(self.sep)) for val in values]
- def _lookup_direct(self, exchange, routing_key):
- if not exchange:
- return [routing_key]
-
- key = self.keyprefix_queue % exchange
- pattern = ''
- queue = routing_key
- queue_bind = self.sep.join([
- routing_key or '',
- pattern,
- queue or '',
- ])
- with self.conn_or_acquire() as client:
- if client.sismember(key, queue_bind):
- return [queue]
-
- return []
-
def _purge(self, queue):
with self.conn_or_acquire() as client:
with client.pipeline() as pipe:
diff --git a/kombu/transport/virtual/base.py b/kombu/transport/virtual/base.py
index 9450cad2..61ede626 100644
--- a/kombu/transport/virtual/base.py
+++ b/kombu/transport/virtual/base.py
@@ -725,22 +725,6 @@ class Channel(AbstractChannel, base.StdChannel):
R = [default]
return R
- def _lookup_direct(self, exchange, routing_key):
- """Find queue matching `routing_key` for given direct `exchange`.
-
- Returns:
- str: queue name
- """
- if not exchange:
- return [routing_key]
-
- return self.exchange_types['direct'].lookup(
- table=self.get_table(exchange),
- exchange=exchange,
- routing_key=routing_key,
- default=None,
- )
-
def _restore(self, message):
"""Redeliver message to its original destination."""
delivery_info = message.delivery_info
diff --git a/kombu/transport/virtual/exchange.py b/kombu/transport/virtual/exchange.py
index 4f9e4fb7..51909a1d 100644
--- a/kombu/transport/virtual/exchange.py
+++ b/kombu/transport/virtual/exchange.py
@@ -65,7 +65,7 @@ class DirectExchange(ExchangeType):
}
def deliver(self, message, exchange, routing_key, **kwargs):
- _lookup = self.channel._lookup_direct
+ _lookup = self.channel._lookup
_put = self.channel._put
for queue in _lookup(exchange, routing_key):
_put(queue, message, **kwargs)
diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py
index 1dcfe71e..25718787 100644
--- a/t/unit/transport/test_redis.py
+++ b/t/unit/transport/test_redis.py
@@ -88,9 +88,6 @@ class Client(object):
def smembers(self, key):
return self.sets.get(key, set())
- def sismember(self, name, value):
- return value in self.sets.get(name, set())
-
def ping(self, *args, **kwargs):
return True