summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsif Saif Uddin <auvipy@gmail.com>2019-10-10 17:35:48 +0600
committerGitHub <noreply@github.com>2019-10-10 17:35:48 +0600
commit9fd41a1e714a0db1c879fe0c4373c474bf18f9ff (patch)
treea9d12ed62d76a23713802e6dd9fff6b9365e2334
parentb51d1d678e198a80d7e5fd95f32674c7d8e04a75 (diff)
downloadkombu-revert-1089-issue-1087-redis-fix.tar.gz
Revert "Issue #1087 redis fix (#1089)"revert-1089-issue-1087-redis-fix
This reverts commit 2f6f5f6a5df1cf52bf8fd45ee9fb3c93d793d637.
-rw-r--r--AUTHORS1
-rw-r--r--kombu/transport/redis.py93
-rw-r--r--kombu/transport/virtual/base.py26
-rw-r--r--kombu/transport/virtual/exchange.py2
-rw-r--r--requirements/test.txt1
-rw-r--r--t/unit/transport/test_redis.py863
6 files changed, 306 insertions, 680 deletions
diff --git a/AUTHORS b/AUTHORS
index 25b54ab0..551e3a55 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -90,7 +90,6 @@ Marcin Lulek (ergo) <info@webreactor.eu>
Marcin Puhacz <marcin.puhacz@gmail.com>
Mark Lavin <mlavin@caktusgroup.com>
markow <markow@red-sky.pl>
-Matt Davis <matteius@gmail.com>
Matt Wise <wise@wiredgeek.net>
Maxime Rouyrre <rouyrre+git@gmail.com>
mdk <luc.mdk@gmail.com>
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index b4a3d362..37524b5b 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -3,12 +3,12 @@ from __future__ import absolute_import, unicode_literals
import numbers
import socket
-import warnings
from bisect import bisect
from collections import namedtuple
from contextlib import contextmanager
from time import time
+
from vine import promise
from kombu.exceptions import InconsistencyError, VersionMismatch
@@ -25,7 +25,6 @@ from kombu.utils.uuid import uuid
from kombu.utils.compat import _detect_environment
from . import virtual
-from .virtual.base import UndeliverableWarning, UNDELIVERABLE_FMT
try:
import redis
@@ -147,8 +146,12 @@ class QoS(virtual.QoS):
def append(self, message, delivery_tag):
delivery = message.delivery_info
EX, RK = delivery['exchange'], delivery['routing_key']
- # Redis-py changed the format of zadd args in v3.0.0 to be like this
- zadd_args = [{delivery_tag: time()}]
+ # TODO: Remove this once we soley on Redis-py 3.0.0+
+ if redis.VERSION[0] >= 3:
+ # Redis-py changed the format of zadd args in v3.0.0
+ zadd_args = [{delivery_tag: time()}]
+ else:
+ zadd_args = [time(), delivery_tag]
with self.pipe_or_acquire() as pipe:
pipe.zadd(self.unacked_index_key, *zadd_args) \
@@ -710,8 +713,7 @@ class Channel(virtual.Channel):
queues = self._queue_cycle.consume(len(self.active_queues))
if not queues:
return
- _q_for_pri = self._queue_for_priority
- keys = [_q_for_pri(queue, pri) for pri in self.priority_steps
+ keys = [self._q_for_pri(queue, pri) for pri in self.priority_steps
for queue in queues] + [timeout or 0]
self._in_poll = self.client.connection
self.client.connection.send_command('BRPOP', *keys)
@@ -747,8 +749,7 @@ class Channel(virtual.Channel):
def _get(self, queue):
with self.conn_or_acquire() as client:
for pri in self.priority_steps:
- queue_name = self._queue_for_priority(queue, pri)
- item = client.rpop(queue_name)
+ item = client.rpop(self._q_for_pri(queue, pri))
if item:
return loads(bytes_to_str(item))
raise Empty()
@@ -757,21 +758,14 @@ class Channel(virtual.Channel):
with self.conn_or_acquire() as client:
with client.pipeline() as pipe:
for pri in self.priority_steps:
- queue_name = self._queue_for_priority(queue, pri)
- pipe = pipe.llen(queue_name)
+ pipe = pipe.llen(self._q_for_pri(queue, pri))
sizes = pipe.execute()
- size = sum(size for size in sizes
+ return sum(size for size in sizes
if isinstance(size, numbers.Integral))
- return size
- def _queue_for_priority(self, queue, pri):
+ def _q_for_pri(self, queue, pri):
pri = self.priority(pri)
- if pri:
- queue_args = (queue, self.sep, pri)
- else:
- queue_args = (queue, '', '')
- priority_queue_name = '%s%s%s' % queue_args
- return priority_queue_name
+ return '%s%s%s' % ((queue, self.sep, pri) if pri else (queue, '', ''))
def priority(self, n):
steps = self.priority_steps
@@ -782,7 +776,7 @@ class Channel(virtual.Channel):
pri = self._get_message_priority(message, reverse=False)
with self.conn_or_acquire() as client:
- client.lpush(self._queue_for_priority(queue, pri), dumps(message))
+ client.lpush(self._q_for_pri(queue, pri), dumps(message))
def _put_fanout(self, exchange, message, routing_key, **kwargs):
"""Deliver fanout message."""
@@ -817,14 +811,14 @@ class Channel(virtual.Channel):
queue or '']))
with client.pipeline() as pipe:
for pri in self.priority_steps:
- pipe = pipe.delete(self._queue_for_priority(queue, pri))
+ pipe = pipe.delete(self._q_for_pri(queue, pri))
pipe.execute()
def _has_queue(self, queue, **kwargs):
with self.conn_or_acquire() as client:
with client.pipeline() as pipe:
for pri in self.priority_steps:
- pipe = pipe.exists(self._queue_for_priority(queue, pri))
+ pipe = pipe.exists(self._q_for_pri(queue, pri))
return any(pipe.execute())
def get_table(self, exchange):
@@ -835,55 +829,30 @@ class Channel(virtual.Channel):
raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key))
return [tuple(bytes_to_str(val).split(self.sep)) for val in values]
- def _lookup(self, exchange, routing_key, default=None):
- """Find all queues matching `routing_key` for the given `exchange`.
+ def _lookup_direct(self, exchange, routing_key):
+ if not exchange:
+ return [routing_key]
- Returns:
- str: queue name -- must return the string `default`
- if no queues matched.
- """
+ key = self.keyprefix_queue % exchange
pattern = ''
- result = []
- if default is None:
- default = self.deadletter_queue
- if not exchange: # anon exchange
- return [routing_key or default]
-
queue = routing_key
- redis_key = self.keyprefix_queue % exchange
- try:
- queue_bind = self.sep.join([
- routing_key or '',
- pattern,
- queue or '',
- ])
- with self.conn_or_acquire() as client:
- if not client.scard(redis_key):
- pass # Do not check if its a member because set is empty
- elif client.sismember(redis_key, queue_bind):
- result = [queue]
- except KeyError:
- pass
-
- if not result:
- if default is not None:
- warnings.warn(UndeliverableWarning(UNDELIVERABLE_FMT.format(
- exchange=exchange, routing_key=routing_key)),
- )
- self._new_queue(default)
- result = [default]
- else:
- raise InconsistencyError(NO_ROUTE_ERROR.format(
- exchange, redis_key))
+ queue_bind = self.sep.join([
+ routing_key or '',
+ pattern,
+ queue or '',
+ ])
+ with self.conn_or_acquire() as client:
+ if client.sismember(key, queue_bind):
+ return [queue]
- return result
+ return []
def _purge(self, queue):
with self.conn_or_acquire() as client:
with client.pipeline() as pipe:
for pri in self.priority_steps:
- priority_queue = self._queue_for_priority(queue, pri)
- pipe = pipe.llen(priority_queue).delete(priority_queue)
+ priq = self._q_for_pri(queue, pri)
+ pipe = pipe.llen(priq).delete(priq)
sizes = pipe.execute()
return sum(sizes[::2])
diff --git a/kombu/transport/virtual/base.py b/kombu/transport/virtual/base.py
index fbfc1563..9450cad2 100644
--- a/kombu/transport/virtual/base.py
+++ b/kombu/transport/virtual/base.py
@@ -710,20 +710,36 @@ class Channel(AbstractChannel, base.StdChannel):
return [routing_key or default]
try:
- result = self.typeof(exchange).lookup(
+ R = self.typeof(exchange).lookup(
self.get_table(exchange),
exchange, routing_key, default,
)
except KeyError:
- result = []
+ R = []
- if not result and default is not None:
+ if not R and default is not None:
warnings.warn(UndeliverableWarning(UNDELIVERABLE_FMT.format(
exchange=exchange, routing_key=routing_key)),
)
self._new_queue(default)
- result = [default]
- return result
+ R = [default]
+ return R
+
+ def _lookup_direct(self, exchange, routing_key):
+ """Find queue matching `routing_key` for given direct `exchange`.
+
+ Returns:
+ str: queue name
+ """
+ if not exchange:
+ return [routing_key]
+
+ return self.exchange_types['direct'].lookup(
+ table=self.get_table(exchange),
+ exchange=exchange,
+ routing_key=routing_key,
+ default=None,
+ )
def _restore(self, message):
"""Redeliver message to its original destination."""
diff --git a/kombu/transport/virtual/exchange.py b/kombu/transport/virtual/exchange.py
index 51909a1d..4f9e4fb7 100644
--- a/kombu/transport/virtual/exchange.py
+++ b/kombu/transport/virtual/exchange.py
@@ -65,7 +65,7 @@ class DirectExchange(ExchangeType):
}
def deliver(self, message, exchange, routing_key, **kwargs):
- _lookup = self.channel._lookup
+ _lookup = self.channel._lookup_direct
_put = self.channel._put
for queue in _lookup(exchange, routing_key):
_put(queue, message, **kwargs)
diff --git a/requirements/test.txt b/requirements/test.txt
index 91b210a0..ef7aa4e5 100644
--- a/requirements/test.txt
+++ b/requirements/test.txt
@@ -3,4 +3,3 @@ case>=1.5.2
pytest
pytest-sugar
Pyro4
-fakeredis==1.0.4
diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py
index 1c4f6130..1dcfe71e 100644
--- a/t/unit/transport/test_redis.py
+++ b/t/unit/transport/test_redis.py
@@ -1,35 +1,22 @@
from __future__ import absolute_import, unicode_literals
-import fakeredis
import pytest
-import redis
-import unittest
import socket
+import types
-from array import array
-from case import ANY, ContextMock, Mock, call, mock, skip, patch
from collections import defaultdict
-from contextlib import contextmanager
from itertools import count
+from case import ANY, ContextMock, Mock, call, mock, skip, patch
+
from kombu import Connection, Exchange, Queue, Consumer, Producer
from kombu.exceptions import InconsistencyError, VersionMismatch
-from kombu.five import Empty, Queue as _Queue
+from kombu.five import Empty, Queue as _Queue, bytes_if_py2
from kombu.transport import virtual
from kombu.utils import eventio # patch poll
from kombu.utils.json import dumps
-_fake_redis_client = None
-
-
-def _get_fake_redis_client():
- global _fake_redis_client
- if _fake_redis_client is None:
- _fake_redis_client = FakeRedisClient()
- return _fake_redis_client
-
-
class _poll(eventio._select):
def register(self, fd, flags):
@@ -46,177 +33,33 @@ class _poll(eventio._select):
eventio.poll = _poll
# must import after poller patch, pep8 complains
-from kombu.transport import redis as kombu_redis # noqa
+from kombu.transport import redis # noqa
class ResponseError(Exception):
pass
-class DummyParser(object):
-
- def __init__(self, *args, **kwargs):
- self.socket_read_size = 1
- self.encoder = None
- self._sock = None
- self._buffer = None
-
- def on_disconnect(self):
- self.socket_read_size = 1
- self.encoder = None
- self._sock = None
- self._buffer = None
-
- def on_connect(self, connection):
- pass
-
-
-class FakeRedisSocket(fakeredis._server.FakeSocket):
- blocking = True
- filenos = count(30)
-
- def __init__(self, server):
- super(FakeRedisSocket, self).__init__(server)
- self._server = server
- self._fileno = next(self.filenos)
- self.data = []
- self.connection = None
- self.channel = None
- self.transport_options = {}
- self.hostname = None
- self.port = None
- self.password = None
- self.virtual_host = '/'
- self.max_connections = 10
- self.ssl = None
-
-
-class FakeRedisConnection(fakeredis.FakeConnection):
- disconnected = False
- default_port = 6379
- channel_max = 65535
-
- def __init__(self, client, server, **kwargs):
- kwargs['parser_class'] = DummyParser
- super(fakeredis.FakeConnection, self).__init__(**kwargs)
- if client is None:
- client = _get_fake_redis_client()
- self.client = client
- if server is None:
- server = fakeredis.FakeServer()
- self._server = server
- self._sock = FakeRedisSocket(server=server)
- try:
- self.on_connect()
- except redis.exceptions.RedisError:
- # clean up after any error in on_connect
- self.disconnect()
- raise
- self._parser = ()
- self._avail_channel_ids = array(
- virtual.base.ARRAY_TYPE_H, range(self.channel_max, 0, -1),
- )
- self.cycle = kombu_redis.MultiChannelPoller()
- conn_errs, channel_errs = kombu_redis.get_redis_error_classes()
- self.connection_errors, self.channel_errors = conn_errs, channel_errs
-
- def disconnect(self):
- self.disconnected = True
-
-
-class FakeRedisConnectionPool(redis.ConnectionPool):
- def __init__(self, connection_class, max_connections=None,
- **connection_kwargs):
- connection_class = FakeRedisConnection
- connection_kwargs['client'] = None
- connection_kwargs['server'] = None
- self._connections = []
- super(FakeRedisConnectionPool, self).__init__(
- connection_class=connection_class,
- max_connections=max_connections,
- **connection_kwargs
- )
-
- def get_connection(self, *args, **kwargs):
- connection = self.connection_class(**self.connection_kwargs)
- self._connections.append(connection)
- return connection
-
- def release(self, connection):
- pass
-
-
-class FakeRedisClient(fakeredis.FakeStrictRedis):
+class Client(object):
queues = {}
+ sets = defaultdict(set)
+ hashes = defaultdict(dict)
shard_hint = None
def __init__(self, db=None, port=None, connection_pool=None, **kwargs):
self._called = []
self._connection = None
self.bgsave_raises_ResponseError = False
- self.server = server = fakeredis.FakeServer()
- connection_pool = FakeRedisConnectionPool(FakeRedisConnection)
- self.connection_pool = connection_pool
- super(FakeRedisClient, self).__init__(
- db=db, port=port, connection_pool=connection_pool, server=server)
- self.connection = FakeRedisConnection(self, server)
- self.response_callbacks = dict()
-
- def __del__(self, key=None):
- if key:
- self.delete(key)
-
- def ping(self, *args, **kwargs):
- return True
-
- def pipeline(self):
- return FakePipeline(self.server, self.connection_pool, [], '1234', '')
-
- def set_response_callback(self, command, callback):
- pass
-
- def _new_queue(self, queue, auto_delete=False, **kwargs):
- self.queues[queue] = _Queue()
- if auto_delete:
- self.auto_delete_queues.add(queue)
-
- def rpop(self, key):
- try:
- return self.queues[key].get_nowait()
- except (KeyError, Empty):
- pass
-
- def llen(self, key):
- try:
- return self.queues[key].qsize()
- except KeyError:
- return 0
-
- def lpush(self, key, value):
- self.queues[key].put_nowait(value)
+ self.connection = self._sconnection(self)
- def pubsub(self, *args, **kwargs):
- self.connection_pool = FakeRedisConnectionPool(FakeRedisConnection)
- return self
+ def bgsave(self):
+ self._called.append('BGSAVE')
+ if self.bgsave_raises_ResponseError:
+ raise ResponseError()
def delete(self, key):
self.queues.pop(key, None)
-
-class FakeRedisClientLite(object):
- """The original FakeRedis client from Kombu to support the
- Producer/Consumer TestCases, preferred to use FakeRedisClient."""
- queues = {}
- sets = defaultdict(set)
- hashes = defaultdict(dict)
- shard_hint = None
-
- def __init__(self, db=None, port=None, connection_pool=None, **kwargs):
- self._called = []
- self._connection = None
- self.bgsave_raises_ResponseError = False
- self.connection = self._sconnection(self)
-
def exists(self, key):
return key in self.queues or key in self.sets
@@ -233,9 +76,14 @@ class FakeRedisClientLite(object):
self.sets[key].add(member)
def zadd(self, key, *args):
- (mapping,) = args
- for item in mapping:
- self.sets[key].add(item)
+ if redis.redis.VERSION[0] >= 3:
+ (mapping,) = args
+ for item in mapping:
+ self.sets[key].add(item)
+ else:
+ # TODO: remove me when we drop support for Redis-py v2
+ (score1, member1) = args
+ self.sets[key].add(member1)
def smembers(self, key):
return self.sets.get(key, set())
@@ -243,9 +91,6 @@ class FakeRedisClientLite(object):
def sismember(self, name, value):
return value in self.sets.get(name, set())
- def scard(self, key):
- return len(self.sets.get(key, set()))
-
def ping(self, *args, **kwargs):
return True
@@ -293,7 +138,7 @@ class FakeRedisClientLite(object):
return k in self._called
def pipeline(self):
- return FakePipelineLite(self)
+ return Pipeline(self)
def encode(self, value):
return str(value)
@@ -328,8 +173,22 @@ class FakeRedisClientLite(object):
def send_command(self, cmd, *args):
self._sock.data.append((cmd, args))
+ def info(self):
+ return {'foo': 1}
+
+ def pubsub(self, *args, **kwargs):
+ connection = self.connection
+
+ class ConnectionPool(object):
-class FakePipelineLite(object):
+ def get_connection(self, *args, **kwargs):
+ return connection
+ self.connection_pool = ConnectionPool()
+
+ return self
+
+
+class Pipeline(object):
def __init__(self, client):
self.client = client
@@ -357,84 +216,10 @@ class FakePipelineLite(object):
return [fun(*args, **kwargs) for fun, args, kwargs in stack]
-class FakePipeline(redis.client.Pipeline):
-
- def __init__(self, server, connection_pool,
- response_callbacks, transaction, shard_hint):
- if not server:
- server = fakeredis.FakeServer()
- self._server = server
- correct_pool_instance = isinstance(
- connection_pool, FakeRedisConnectionPool)
- if connection_pool is not None and not correct_pool_instance:
- connection_pool = FakeRedisConnectionPool(FakeRedisConnection)
- self.connection_pool = connection_pool
- self.connection = FakeRedisConnection(self, server)
- self.client = connection_pool.get_connection().client
- self.response_callbacks = response_callbacks
- self.transaction = transaction
- self.shard_hint = shard_hint
- self.watching = False
- self.reset()
-
- def __enter__(self):
- return self
-
- def __exit__(self, *exc_info):
- pass
-
- def __getattr__(self, key):
- if key not in self.__dict__:
-
- def _add(*args, **kwargs):
- self.command_stack.append(
- (getattr(self.connection.client, key), args, kwargs))
- return self
-
- return _add
- return self.__dict__[key]
-
- def reset(self):
- # Do nothing with the real connection
- self.command_stack = []
- self.scripts = set()
-
- def execute(self):
- stack = list(self.command_stack)
- all_cmds = self.connection.pack_commands(
- [args for args, _ in self.command_stack])
- self.connection.send_packed_command(all_cmds)
-
- response = []
- for cmd in all_cmds:
- try:
- response.append(
- self.parse_response(self.connection, cmd))
- except ResponseError:
- import sys
- response.append(sys.exc_info()[1])
-
- self.raise_first_error(self.command_stack, response)
- results = []
- for t, kwargs in stack:
- redis_func_name = t[0]
- redis_func_name = redis_func_name.lower()
- if redis_func_name == 'del':
- redis_func_name = 'delete'
- args = t[1:]
- fun = getattr(self.client, redis_func_name)
- r = fun(*args, **kwargs)
- results.append(r)
-
- self.command_stack[:] = []
- self.reset()
- return results
-
-
-class FakeRedisKombuChannelLite(kombu_redis.Channel):
+class Channel(redis.Channel):
def _get_client(self):
- return FakeRedisClientLite
+ return Client
def _get_pool(self, asynchronous=False):
return Mock()
@@ -444,97 +229,42 @@ class FakeRedisKombuChannelLite(kombu_redis.Channel):
def _new_queue(self, queue, **kwargs):
for pri in self.priority_steps:
- self.client._new_queue(self._queue_for_priority(queue, pri))
+ self.client._new_queue(self._q_for_pri(queue, pri))
def pipeline(self):
- return FakePipelineLite(FakeRedisClientLite())
-
-
-class FakeRedisKombuChannel(kombu_redis.Channel):
- _fanout_queues = {}
+ return Pipeline(Client())
- def __init__(self, *args, **kwargs):
- super(FakeRedisKombuChannel, self).__init__(*args, **kwargs)
- def _get_client(self):
- return FakeRedisClient
-
- def _create_client(self, asynchronous=False):
- global _fake_redis_client
- if _fake_redis_client is None:
- _fake_redis_client = self._get_client()()
- return _fake_redis_client
-
- @contextmanager
- def conn_or_acquire(self, client=None):
- if client:
- yield client
- else:
- yield self._create_client()
-
- def _get_pool(self, asynchronous=False):
- params = self._connparams(asynchronous=asynchronous)
- self.keyprefix_fanout = self.keyprefix_fanout.format(db=params['db'])
- return FakeRedisConnectionPool(**params)
-
- def _get_response_error(self):
- return ResponseError
-
- def _new_queue(self, queue, **kwargs):
- for pri in self.priority_steps:
- self.client._new_queue(self._queue_for_priority(queue, pri))
-
- def pipeline(self):
- yield _get_fake_redis_client().pipeline()
-
- def basic_publish(self, message, exchange='', routing_key='', **kwargs):
- self._inplace_augment_message(message, exchange, routing_key)
- # anon exchange: routing_key is the destination queue
- return self._put(routing_key, message, **kwargs)
-
-
-class FakeRedisKombuTransportLite(kombu_redis.Transport):
- Channel = FakeRedisKombuChannelLite
-
- def __init__(self, *args, **kwargs):
- super(FakeRedisKombuTransportLite, self).__init__(*args, **kwargs)
+class Transport(redis.Transport):
+ Channel = Channel
def _get_errors(self):
return ((KeyError,), (IndexError,))
-class FakeRedisKombuTransport(FakeRedisKombuTransportLite):
- Channel = FakeRedisKombuChannel
-
-
@skip.unless_module('redis')
-class TestRedisChannel(unittest.TestCase):
+class test_Channel:
- def setUp(self):
+ def setup(self):
self.connection = self.create_connection()
self.channel = self.connection.default_channel
- def tearDown(self):
- self.connection = None
- self.channel = None
- global _fake_redis_client
- _fake_redis_client = None
-
def create_connection(self, **kwargs):
kwargs.setdefault('transport_options', {'fanout_patterns': True})
- return Connection(transport=FakeRedisKombuTransport, **kwargs)
+ return Connection(transport=Transport, **kwargs)
def _get_one_delivery_tag(self, n='test_uniq_tag'):
- chan = self.connection.default_channel
- chan.exchange_declare(n)
- chan.queue_declare(n)
- chan.queue_bind(n, n, n)
- msg = chan.prepare_message('quick brown fox')
- chan.basic_publish(msg, n, n)
- payload = chan._get(n)
- assert payload
- pymsg = chan.message_to_python(payload)
- return pymsg.delivery_tag
+ with self.create_connection() as conn1:
+ chan = conn1.default_channel
+ chan.exchange_declare(n)
+ chan.queue_declare(n)
+ chan.queue_bind(n, n, n)
+ msg = chan.prepare_message('quick brown fox')
+ chan.basic_publish(msg, n, n)
+ payload = chan._get(n)
+ assert payload
+ pymsg = chan.message_to_python(payload)
+ return pymsg.delivery_tag
def test_delivery_tag_is_uuid(self):
seen = set()
@@ -547,10 +277,9 @@ class TestRedisChannel(unittest.TestCase):
assert len(tag) == 36
def test_disable_ack_emulation(self):
- conn = Connection(
- transport=FakeRedisKombuTransport,
- transport_options={'ack_emulation': False}
- )
+ conn = Connection(transport=Transport, transport_options={
+ 'ack_emulation': False,
+ })
chan = conn.channel()
assert not chan.ack_emulation
@@ -561,7 +290,7 @@ class TestRedisChannel(unittest.TestCase):
pool_at_init = [pool]
client = Mock(name='client')
- class XChannel(FakeRedisKombuChannel):
+ class XChannel(Channel):
def __init__(self, *args, **kwargs):
self._pool = pool_at_init[0]
@@ -570,12 +299,7 @@ class TestRedisChannel(unittest.TestCase):
def _get_client(self):
return lambda *_, **__: client
- def _create_client(self, asynchronous=False):
- if asynchronous:
- return self.Client(connection_pool=self.async_pool)
- return self.Client(connection_pool=self.pool)
-
- class XTransport(FakeRedisKombuTransport):
+ class XTransport(Transport):
Channel = XChannel
conn = Connection(transport=XTransport)
@@ -670,15 +394,14 @@ class TestRedisChannel(unittest.TestCase):
client = client()
def pipe(*args, **kwargs):
- return FakePipelineLite(client)
-
+ return Pipeline(client)
client.pipeline = pipe
client.zrevrangebyscore.return_value = [
(1, 10),
(2, 20),
(3, 30),
]
- qos = kombu_redis.QoS(self.channel)
+ qos = redis.QoS(self.channel)
restore = qos.restore_by_tag = Mock(name='restore_by_tag')
qos._vrestore_count = 1
qos.restore_visible()
@@ -700,13 +423,14 @@ class TestRedisChannel(unittest.TestCase):
assert qos._vrestore_count == 1
qos._vrestore_count = 0
- client.setnx.side_effect = kombu_redis.MutexHeld()
+ client.setnx.side_effect = redis.MutexHeld()
qos.restore_visible()
def test_basic_consume_when_fanout_queue(self):
self.channel.exchange_declare(exchange='txconfan', type='fanout')
self.channel.queue_declare(queue='txconfanq')
self.channel.queue_bind(queue='txconfanq', exchange='txconfan')
+
assert 'txconfanq' in self.channel._fanout_queues
self.channel.basic_consume('txconfanq', False, None, 1)
assert 'txconfanq' in self.channel.active_fanout_queues
@@ -820,7 +544,7 @@ class TestRedisChannel(unittest.TestCase):
c = self.channel.client = Mock()
c.parse_response.return_value = None
- with pytest.raises(kombu_redis.Empty):
+ with pytest.raises(redis.Empty):
self.channel._brpop_read()
def test_poll_error(self):
@@ -859,19 +583,19 @@ class TestRedisChannel(unittest.TestCase):
self.channel._put('george', msg1)
client().lpush.assert_called_with(
- self.channel._queue_for_priority('george', 3), dumps(msg1),
+ self.channel._q_for_pri('george', 3), dumps(msg1),
)
msg2 = {'properties': {'priority': 313}}
self.channel._put('george', msg2)
client().lpush.assert_called_with(
- self.channel._queue_for_priority('george', 9), dumps(msg2),
+ self.channel._q_for_pri('george', 9), dumps(msg2),
)
msg3 = {'properties': {}}
self.channel._put('george', msg3)
client().lpush.assert_called_with(
- self.channel._queue_for_priority('george', 0), dumps(msg3),
+ self.channel._q_for_pri('george', 0), dumps(msg3),
)
def test_delete(self):
@@ -883,12 +607,10 @@ class TestRedisChannel(unittest.TestCase):
x._delete('queue', 'exchange', 'routing_key', None)
delete.assert_has_calls([
- call(x._queue_for_priority('queue', pri))
- for pri in kombu_redis.PRIORITY_STEPS
+ call(x._q_for_pri('queue', pri)) for pri in redis.PRIORITY_STEPS
])
- srem.assert_called_with(
- x.keyprefix_queue % ('exchange',),
- x.sep.join(['routing_key', '', 'queue']))
+ srem.assert_called_with(x.keyprefix_queue % ('exchange',),
+ x.sep.join(['routing_key', '', 'queue']))
def test_has_queue(self):
self.channel._create_client = Mock()
@@ -897,8 +619,8 @@ class TestRedisChannel(unittest.TestCase):
exists.return_value = True
assert self.channel._has_queue('foo')
exists.assert_has_calls([
- call(self.channel._queue_for_priority('foo', pri))
- for pri in kombu_redis.PRIORITY_STEPS
+ call(self.channel._q_for_pri('foo', pri))
+ for pri in redis.PRIORITY_STEPS
])
exists.return_value = False
@@ -942,11 +664,9 @@ class TestRedisChannel(unittest.TestCase):
assert self.channel._connparams()['db'] == 124
def test_new_queue_with_auto_delete(self):
- kombu_redis.Channel._new_queue(
- self.channel, 'george', auto_delete=False)
+ redis.Channel._new_queue(self.channel, 'george', auto_delete=False)
assert 'george' not in self.channel.auto_delete_queues
- kombu_redis.Channel._new_queue(
- self.channel, 'elaine', auto_delete=True)
+ redis.Channel._new_queue(self.channel, 'elaine', auto_delete=True)
assert 'elaine' in self.channel.auto_delete_queues
def test_connparams_regular_hostname(self):
@@ -975,21 +695,22 @@ class TestRedisChannel(unittest.TestCase):
cycle.rotate('elaine')
def test_get_client(self):
- kombu_redis_client = kombu_redis.Channel._get_client(self.channel)
- assert kombu_redis_client
+ import redis as R
+ KombuRedis = redis.Channel._get_client(self.channel)
+ assert KombuRedis
- redis_version = getattr(redis, 'VERSION', None)
+ Rv = getattr(R, 'VERSION', None)
try:
- redis.VERSION = (2, 4, 0)
+ R.VERSION = (2, 4, 0)
with pytest.raises(VersionMismatch):
- kombu_redis.Channel._get_client(self.channel)
+ redis.Channel._get_client(self.channel)
finally:
- if redis_version is not None:
- redis.VERSION = redis_version
+ if Rv is not None:
+ R.VERSION = Rv
def test_get_response_error(self):
- kombu_error = kombu_redis.Channel._get_response_error(self.channel)
- assert kombu_error is redis.exceptions.ResponseError
+ from redis.exceptions import ResponseError
+ assert redis.Channel._get_response_error(self.channel) is ResponseError
def test_avail_client(self):
self.channel._pool = Mock()
@@ -1004,7 +725,7 @@ class TestRedisChannel(unittest.TestCase):
transport.cycle.fds = {12: 'LISTEN', 13: 'BRPOP'}
conn = Mock(name='conn')
loop = Mock(name='loop')
- kombu_redis.Transport.register_with_event_loop(transport, conn, loop)
+ redis.Transport.register_with_event_loop(transport, conn, loop)
transport.cycle.on_poll_init.assert_called_with(loop.poller)
loop.call_repeatedly.assert_called_with(
10, transport.cycle.maybe_restore_messages,
@@ -1024,23 +745,34 @@ class TestRedisChannel(unittest.TestCase):
cycle = transport.cycle = Mock(name='cyle')
cycle.on_readable.return_value = None
- kombu_redis.Transport.on_readable(transport, 13)
+ redis.Transport.on_readable(transport, 13)
cycle.on_readable.assert_called_with(13)
def test_transport_get_errors(self):
- assert kombu_redis.Transport._get_errors(self.connection.transport)
+ assert redis.Transport._get_errors(self.connection.transport)
def test_transport_driver_version(self):
- assert kombu_redis.Transport.driver_version(self.connection.transport)
+ assert redis.Transport.driver_version(self.connection.transport)
def test_transport_get_errors_when_InvalidData_used(self):
+ from redis import exceptions
+
+ class ID(Exception):
+ pass
+
+ DataError = getattr(exceptions, 'DataError', None)
+ InvalidData = getattr(exceptions, 'InvalidData', None)
+ exceptions.InvalidData = ID
+ exceptions.DataError = None
try:
- errors = kombu_redis.Transport._get_errors(
- self.connection.transport)
+ errors = redis.Transport._get_errors(self.connection.transport)
assert errors
- assert redis.exceptions.DataError in errors[1]
- except Exception:
- raise
+ assert ID in errors[1]
+ finally:
+ if DataError is not None:
+ exceptions.DataError = DataError
+ if InvalidData is not None:
+ exceptions.InvalidData = InvalidData
def test_empty_queues_key(self):
channel = self.channel
@@ -1048,14 +780,13 @@ class TestRedisChannel(unittest.TestCase):
key = channel.keyprefix_queue % 'celery'
# Everything is fine, there is a list of queues.
- list_of_queues = 'celery\x06\x16\x06\x16celery'
- channel.client.sadd(key, list_of_queues)
+ channel.client.sadd(key, 'celery\x06\x16\x06\x16celery')
assert channel.get_table('celery') == [
('celery', '', 'celery'),
]
# ... then for some reason, the _kombu.binding.celery key gets lost
- channel.client.srem(key, list_of_queues)
+ channel.client.srem(key)
# which raises a channel error so that the consumer/publisher
# can recover by redeclaring the required entities.
@@ -1068,7 +799,7 @@ class TestRedisChannel(unittest.TestCase):
connparams = conn.default_channel._connparams()
assert issubclass(
connparams['connection_class'],
- redis.UnixDomainSocketConnection,
+ redis.redis.UnixDomainSocketConnection,
)
assert connparams['path'] == '/tmp/redis.sock'
@@ -1095,7 +826,7 @@ class TestRedisChannel(unittest.TestCase):
connparams = conn.default_channel._connparams()
assert issubclass(
connparams['connection_class'],
- redis.SSLConnection,
+ redis.redis.SSLConnection,
)
def test_rediss_connection(self):
@@ -1104,16 +835,16 @@ class TestRedisChannel(unittest.TestCase):
connparams = conn.default_channel._connparams()
assert issubclass(
connparams['connection_class'],
- redis.SSLConnection,
+ redis.redis.SSLConnection,
)
def test_sep_transport_option(self):
- conn_kwargs = dict(
- transport=FakeRedisKombuTransport,
- transport_options={'sep': ':'})
- with Connection(**conn_kwargs) as conn:
+ with Connection(transport=Transport, transport_options={
+ 'sep': ':',
+ }) as conn:
key = conn.default_channel.keyprefix_queue % 'celery'
conn.default_channel.client.sadd(key, 'celery::celery')
+
assert conn.default_channel.sep == ':'
assert conn.default_channel.get_table('celery') == [
('celery', '', 'celery'),
@@ -1121,32 +852,71 @@ class TestRedisChannel(unittest.TestCase):
@skip.unless_module('redis')
-@mock.patch('redis.Connection', FakeRedisConnection)
-class TestRedisConnections(unittest.TestCase):
+class test_Redis:
- def setUp(self):
- self.connection = self.create_connection()
- self.exchange_name = exchange_name = 'test_Redis'
- self.exchange = Exchange(exchange_name, type='direct')
+ def setup(self):
+ self.connection = Connection(transport=Transport)
+ self.exchange = Exchange('test_Redis', type='direct')
self.queue = Queue('test_Redis', self.exchange, 'test_Redis')
- self.queue(self.connection.default_channel).declare()
- self.real_scard = FakeRedisClient.scard
- def tearDown(self):
+ def teardown(self):
self.connection.close()
- self.queue = None
- self.exchange = None
- global _fake_redis_client
- _fake_redis_client = None
- FakeRedisClient.scard = self.real_scard
- def create_connection(self, **kwargs):
- kwargs.setdefault('transport_options', {'fanout_patterns': True})
- return Connection(transport=FakeRedisKombuTransport, **kwargs)
+ @mock.replace_module_value(redis.redis, 'VERSION', [3, 0, 0])
+ def test_publish__get_redispyv3(self):
+ channel = self.connection.channel()
+ producer = Producer(channel, self.exchange, routing_key='test_Redis')
+ self.queue(channel).declare()
+
+ producer.publish({'hello': 'world'})
+
+ assert self.queue(channel).get().payload == {'hello': 'world'}
+ assert self.queue(channel).get() is None
+ assert self.queue(channel).get() is None
+ assert self.queue(channel).get() is None
+
+ @mock.replace_module_value(redis.redis, 'VERSION', [2, 5, 10])
+ def test_publish__get_redispyv2(self):
+ channel = self.connection.channel()
+ producer = Producer(channel, self.exchange, routing_key='test_Redis')
+ self.queue(channel).declare()
+
+ producer.publish({'hello': 'world'})
+
+ assert self.queue(channel).get().payload == {'hello': 'world'}
+ assert self.queue(channel).get() is None
+ assert self.queue(channel).get() is None
+ assert self.queue(channel).get() is None
+
+ def test_publish__consume(self):
+ connection = Connection(transport=Transport)
+ channel = connection.channel()
+ producer = Producer(channel, self.exchange, routing_key='test_Redis')
+ consumer = Consumer(channel, queues=[self.queue])
+
+ producer.publish({'hello2': 'world2'})
+ _received = []
+
+ def callback(message_data, message):
+ _received.append(message_data)
+ message.ack()
+
+ consumer.register_callback(callback)
+ consumer.consume()
+
+ assert channel in channel.connection.cycle._channels
+ try:
+ connection.drain_events(timeout=1)
+ assert _received
+ with pytest.raises(socket.timeout):
+ connection.drain_events(timeout=0.01)
+ finally:
+ channel.close()
def test_purge(self):
- channel = self.connection.default_channel
+ channel = self.connection.channel()
producer = Producer(channel, self.exchange, routing_key='test_Redis')
+ self.queue(channel).declare()
for i in range(10):
producer.publish({'hello': 'world-%s' % (i,)})
@@ -1157,38 +927,38 @@ class TestRedisConnections(unittest.TestCase):
def test_db_values(self):
Connection(virtual_host=1,
- transport=FakeRedisKombuTransport).channel()
+ transport=Transport).channel()
Connection(virtual_host='1',
- transport=FakeRedisKombuTransport).channel()
+ transport=Transport).channel()
Connection(virtual_host='/1',
- transport=FakeRedisKombuTransport).channel()
+ transport=Transport).channel()
with pytest.raises(Exception):
Connection('redis:///foo').channel()
def test_db_port(self):
- c1 = Connection(port=None, transport=FakeRedisKombuTransport).channel()
+ c1 = Connection(port=None, transport=Transport).channel()
c1.close()
- c2 = Connection(port=9999, transport=FakeRedisKombuTransport).channel()
+ c2 = Connection(port=9999, transport=Transport).channel()
c2.close()
def test_close_poller_not_active(self):
- c = Connection(transport=FakeRedisKombuTransport).channel()
+ c = Connection(transport=Transport).channel()
cycle = c.connection.cycle
c.client.connection
c.close()
assert c not in cycle._channels
def test_close_ResponseError(self):
- c = Connection(transport=FakeRedisKombuTransport).channel()
+ c = Connection(transport=Transport).channel()
c.client.bgsave_raises_ResponseError = True
c.close()
def test_close_disconnects(self):
- c = Connection(transport=FakeRedisKombuTransport).channel()
+ c = Connection(transport=Transport).channel()
conn1 = c.client.connection
conn2 = c.subclient.connection
c.close()
@@ -1202,133 +972,62 @@ class TestRedisConnections(unittest.TestCase):
channel.close()
def test_get_client(self):
- conn = Connection(transport=FakeRedisKombuTransport)
- chan = conn.channel()
- assert chan.Client
- assert chan.ResponseError
- assert conn.transport.connection_errors
- assert conn.transport.channel_errors
+ with mock.module_exists(*_redis_modules()):
+ conn = Connection(transport=Transport)
+ chan = conn.channel()
+ assert chan.Client
+ assert chan.ResponseError
+ assert conn.transport.connection_errors
+ assert conn.transport.channel_errors
def test_check_at_least_we_try_to_connect_and_fail(self):
+ import redis
connection = Connection('redis://localhost:65534/')
with pytest.raises(redis.exceptions.ConnectionError):
chan = connection.channel()
chan._size('some_queue')
- def test_redis_queue_lookup_happy_path(self):
- fake_redis_client = _get_fake_redis_client()
- redis_channel = self.connection.default_channel
- routing_key = redis_channel.keyprefix_queue % self.exchange_name
- redis_channel.queue_bind(routing_key, self.exchange_name, routing_key)
- fake_redis_client.sadd(routing_key, routing_key)
- lookup_queue_result = redis_channel._lookup(
- exchange=self.exchange_name,
- routing_key=routing_key,
- default='default_queue')
- assert lookup_queue_result == [routing_key]
-
- def test_redis_queue_lookup_gets_default_when_queue_doesnot_exist(self):
- # Given: A test redis client and channel
- fake_redis_client = _get_fake_redis_client()
- redis_channel = self.connection.default_channel
- # Given: The default queue is set:
- default_queue = 'default_queue'
- redis_channel.deadletter_queue = default_queue
- # Determine the routing key
- routing_key = redis_channel.keyprefix_queue % self.exchange
- fake_redis_client.sadd(routing_key, "DoesNotExist")
- lookup_queue_result = redis_channel._lookup(
- exchange=self.exchange_name,
- routing_key=routing_key,
- default=default_queue)
- assert lookup_queue_result == [redis_channel.deadletter_queue]
-
- def test_redis_queue_lookup_gets_queue_when_exchange_doesnot_exist(self):
- # Given: A test redis client and channel
- fake_redis_client = _get_fake_redis_client()
- redis_channel = self.connection.default_channel
- # Given: The default queue is set:
- default_queue = 'default_queue'
- redis_channel.deadletter_queue = default_queue
- # Determine the routing key
- routing_key = redis_channel.keyprefix_queue % self.exchange
- fake_redis_client.sadd(routing_key, routing_key)
- lookup_queue_result = redis_channel._lookup(
- exchange=None,
- routing_key=routing_key,
- default=default_queue)
- assert lookup_queue_result == [routing_key]
-
- def test_redis_queue_lookup_gets_default_when_route_doesnot_exist(self):
- # Given: A test redis client and channel
- fake_redis_client = _get_fake_redis_client()
- redis_channel = self.connection.default_channel
- # Given: The default queue is set:
- default_queue = 'default_queue'
- redis_channel.deadletter_queue = default_queue
- # Determine the routing key
- routing_key = redis_channel.keyprefix_queue % self.exchange
- fake_redis_client.sadd(routing_key, "DoesNotExist")
- lookup_queue_result = redis_channel._lookup(
- exchange=None,
- routing_key=None,
- default=None)
- assert lookup_queue_result == [default_queue]
-
- def test_redis_queue_lookup_raises_inconsistency_error(self):
- connection = Mock(client=Mock(
- hostname='127.0.0.1',
- virtual_host='/', port=6379,
- transport_options={},
- ))
- redis_channel = FakeRedisKombuChannel(connection)
- exchange = Mock(name='Exchange')
- try:
- redis_channel._lookup(
- exchange=exchange.name,
- routing_key='routing_key',
- default=None)
- except kombu_redis.InconsistencyError:
- pass # This is expected
- else:
- raise("Redis test did not raise expected InconsistencyError!")
-
- def test_redis_queue_lookup_client_raises_key_error_gets_default(self):
- fake_redis_client = _get_fake_redis_client()
- fake_redis_client.scard = Mock(side_effect=KeyError)
- redis_channel = self.connection.default_channel
- routing_key = redis_channel.keyprefix_queue % self.exchange
- redis_channel.queue_bind(routing_key, self.exchange_name, routing_key)
- fake_redis_client.sadd(routing_key, routing_key)
- default_queue_name = 'default_queue'
- lookup_queue_result = redis_channel._lookup(
- exchange=self.exchange_name,
- routing_key=routing_key,
- default=default_queue_name)
- assert lookup_queue_result == [default_queue_name]
-
- def test_redis_queue_lookup_client_raises_key_error_gets_deadletter(self):
- fake_redis_client = _get_fake_redis_client()
- fake_redis_client.scard = Mock(side_effect=KeyError)
- redis_channel = self.connection.default_channel
- routing_key = redis_channel.keyprefix_queue % self.exchange
- redis_channel.queue_bind(routing_key, self.exchange_name, routing_key)
- fake_redis_client.sadd(routing_key, routing_key)
- default_queue_name = 'deadletter_queue'
- redis_channel.deadletter_queue = default_queue_name
- lookup_queue_result = redis_channel._lookup(
- exchange=self.exchange_name,
- routing_key=routing_key,
- default=None)
- assert lookup_queue_result == [default_queue_name]
+
+def _redis_modules():
+
+ class ConnectionError(Exception):
+ pass
+
+ class AuthenticationError(Exception):
+ pass
+
+ class InvalidData(Exception):
+ pass
+
+ class InvalidResponse(Exception):
+ pass
+
+ class ResponseError(Exception):
+ pass
+
+ exceptions = types.ModuleType(bytes_if_py2('redis.exceptions'))
+ exceptions.ConnectionError = ConnectionError
+ exceptions.AuthenticationError = AuthenticationError
+ exceptions.InvalidData = InvalidData
+ exceptions.InvalidResponse = InvalidResponse
+ exceptions.ResponseError = ResponseError
+
+ class Redis(object):
+ pass
+
+ myredis = types.ModuleType(bytes_if_py2('redis'))
+ myredis.exceptions = exceptions
+ myredis.Redis = Redis
+
+ return myredis, exceptions
@skip.unless_module('redis')
-class TestKombuRedisMultiChannelPoller(unittest.TestCase):
+class test_MultiChannelPoller:
- def setUp(self):
- self.Poller = kombu_redis.MultiChannelPoller
+ def setup(self):
+ self.Poller = redis.MultiChannelPoller
def test_on_poll_start(self):
p = self.Poller()
@@ -1384,7 +1083,7 @@ class TestKombuRedisMultiChannelPoller(unittest.TestCase):
redis_ctx_mock.__exit__ = Mock()
redis_ctx_mock.__enter__ = Mock(return_value=redis_client_mock)
chan1.conn_or_acquire.return_value = redis_ctx_mock
- qos = kombu_redis.QoS(chan1)
+ qos = redis.QoS(chan1)
qos.visibility_timeout = timeout
qos.restore_visible()
redis_client_mock.zrevrangebyscore\
@@ -1398,17 +1097,17 @@ class TestKombuRedisMultiChannelPoller(unittest.TestCase):
chan.handlers = {'BRPOP': Mock(name='BRPOP')}
chan.qos.can_consume.return_value = False
- p.handle_event(13, kombu_redis.READ)
+ p.handle_event(13, redis.READ)
chan.handlers['BRPOP'].assert_not_called()
chan.qos.can_consume.return_value = True
- p.handle_event(13, kombu_redis.READ)
+ p.handle_event(13, redis.READ)
chan.handlers['BRPOP'].assert_called_with()
- p.handle_event(13, kombu_redis.ERR)
+ p.handle_event(13, redis.ERR)
chan._poll_error.assert_called_with('BRPOP')
- p.handle_event(13, ~(kombu_redis.READ | kombu_redis.ERR))
+ p.handle_event(13, ~(redis.READ | redis.ERR))
def test_fds(self):
p = self.Poller()
@@ -1535,12 +1234,12 @@ class TestKombuRedisMultiChannelPoller(unittest.TestCase):
def test_get_no_actions(self):
p, channel = self.create_get()
- with pytest.raises(kombu_redis.Empty):
+ with pytest.raises(redis.Empty):
p.get(Mock())
def test_qos_reject(self):
p, channel = self.create_get()
- qos = kombu_redis.QoS(channel)
+ qos = redis.QoS(channel)
qos.ack = Mock(name='Qos.ack')
qos.reject(1234)
qos.ack.assert_called_with(1234)
@@ -1549,7 +1248,7 @@ class TestKombuRedisMultiChannelPoller(unittest.TestCase):
p, channel = self.create_get(queues=['a_queue'])
channel.qos.can_consume.return_value = True
- with pytest.raises(kombu_redis.Empty):
+ with pytest.raises(redis.Empty):
p.get(Mock())
p._register_BRPOP.assert_called_with(channel)
@@ -1558,7 +1257,7 @@ class TestKombuRedisMultiChannelPoller(unittest.TestCase):
p, channel = self.create_get(queues=['a_queue'])
channel.qos.can_consume.return_value = False
- with pytest.raises(kombu_redis.Empty):
+ with pytest.raises(redis.Empty):
p.get(Mock())
p._register_BRPOP.assert_not_called()
@@ -1566,7 +1265,7 @@ class TestKombuRedisMultiChannelPoller(unittest.TestCase):
def test_get_listen(self):
p, channel = self.create_get(fanouts=['f_queue'])
- with pytest.raises(kombu_redis.Empty):
+ with pytest.raises(redis.Empty):
p.get(Mock())
p._register_LISTEN.assert_called_with(channel)
@@ -1575,7 +1274,7 @@ class TestKombuRedisMultiChannelPoller(unittest.TestCase):
p, channel = self.create_get(events=[(1, eventio.ERR)])
p._fd_to_chan[1] = (channel, 'BRPOP')
- with pytest.raises(kombu_redis.Empty):
+ with pytest.raises(redis.Empty):
p.get(Mock())
channel._poll_error.assert_called_with('BRPOP')
@@ -1585,14 +1284,14 @@ class TestKombuRedisMultiChannelPoller(unittest.TestCase):
(1, eventio.ERR)])
p._fd_to_chan[1] = (channel, 'BRPOP')
- with pytest.raises(kombu_redis.Empty):
+ with pytest.raises(redis.Empty):
p.get(Mock())
channel._poll_error.assert_called_with('BRPOP')
@skip.unless_module('redis')
-class TestKombuRedisMutex(unittest.TestCase):
+class test_Mutex:
def test_mutex(self, lock_id='xxx'):
client = Mock(name='client')
@@ -1604,13 +1303,13 @@ class TestKombuRedisMutex(unittest.TestCase):
pipe = client.pipeline.return_value
pipe.get.return_value = lock_id
held = False
- with kombu_redis.Mutex(client, 'foo1', 100):
+ with redis.Mutex(client, 'foo1', 100):
held = True
assert held
client.setnx.assert_called_with('foo1', lock_id)
pipe.get.return_value = 'yyy'
held = False
- with kombu_redis.Mutex(client, 'foo1', 100):
+ with redis.Mutex(client, 'foo1', 100):
held = True
assert held
@@ -1618,93 +1317,35 @@ class TestKombuRedisMutex(unittest.TestCase):
client.expire.reset_mock()
pipe.get.return_value = lock_id
client.setnx.return_value = False
- with pytest.raises(kombu_redis.MutexHeld):
+ with pytest.raises(redis.MutexHeld):
held = False
- with kombu_redis.Mutex(client, 'foo1', '100'):
+ with redis.Mutex(client, 'foo1', '100'):
held = True
assert not held
client.ttl.return_value = 0
- with pytest.raises(kombu_redis.MutexHeld):
+ with pytest.raises(redis.MutexHeld):
held = False
- with kombu_redis.Mutex(client, 'foo1', '100'):
+ with redis.Mutex(client, 'foo1', '100'):
held = True
assert not held
client.expire.assert_called()
# Wins but raises WatchError (and that is ignored)
client.setnx.return_value = True
- pipe.watch.side_effect = redis.WatchError()
+ pipe.watch.side_effect = redis.redis.WatchError()
held = False
- with kombu_redis.Mutex(client, 'foo1', 100):
+ with redis.Mutex(client, 'foo1', 100):
held = True
assert held
-class TestRedisProducerConsumer(unittest.TestCase):
- def setUp(self):
- self.connection = self.create_connection()
- self.channel = self.connection.default_channel
- self.routing_key = routing_key = 'test_redis_producer'
- self.exchange_name = exchange_name = 'test_redis_producer'
- self.exchange = Exchange(exchange_name, type='direct')
- self.queue = Queue(routing_key, self.exchange, routing_key)
-
- self.queue(self.connection.default_channel).declare()
- self.channel.queue_bind(routing_key, self.exchange_name, routing_key)
-
- def create_connection(self, **kwargs):
- kwargs.setdefault('transport_options', {'fanout_patterns': True})
- return Connection(transport=FakeRedisKombuTransportLite, **kwargs)
-
- def teardown(self):
- self.connection.close()
-
- def test_publish__get(self):
- channel = self.connection.channel()
- producer = Producer(channel, self.exchange,
- routing_key=self.routing_key)
- self.queue(channel).declare()
-
- producer.publish({'hello': 'world'})
-
- assert self.queue(channel).get().payload == {'hello': 'world'}
- assert self.queue(channel).get() is None
- assert self.queue(channel).get() is None
- assert self.queue(channel).get() is None
-
- def test_publish__consume(self):
- connection = self.create_connection()
- channel = connection.default_channel
- producer = Producer(channel, self.exchange,
- routing_key=self.routing_key)
- consumer = Consumer(channel, queues=[self.queue])
-
- producer.publish({'hello2': 'world2'})
- _received = []
-
- def callback(message_data, message):
- _received.append(message_data)
- message.ack()
-
- consumer.register_callback(callback)
- consumer.consume()
-
- assert channel in channel.connection.cycle._channels
- try:
- connection.drain_events(timeout=1)
- assert _received
- with pytest.raises(socket.timeout):
- connection.drain_events(timeout=0.01)
- finally:
- channel.close()
-
-
@skip.unless_module('redis.sentinel')
-class TestRedisSentinel(unittest.TestCase):
+class test_RedisSentinel:
def test_method_called(self):
- with patch.object(kombu_redis.SentinelChannel,
- '_sentinel_managed_pool') as p:
+ from kombu.transport.redis import SentinelChannel
+
+ with patch.object(SentinelChannel, '_sentinel_managed_pool') as p:
connection = Connection(
'sentinel://localhost:65534/',
transport_options={
@@ -1768,11 +1409,13 @@ class TestRedisSentinel(unittest.TestCase):
master_for().connection_pool.get_connection.assert_called()
def test_can_create_connection(self):
+ from redis.exceptions import ConnectionError
+
connection = Connection(
'sentinel://localhost:65534/',
transport_options={
'master_name': 'not_important',
},
)
- with pytest.raises(redis.exceptions.ConnectionError):
+ with pytest.raises(ConnectionError):
connection.channel()