diff options
author | Asif Saif Uddin <auvipy@gmail.com> | 2019-10-10 17:35:48 +0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-10 17:35:48 +0600 |
commit | 9fd41a1e714a0db1c879fe0c4373c474bf18f9ff (patch) | |
tree | a9d12ed62d76a23713802e6dd9fff6b9365e2334 | |
parent | b51d1d678e198a80d7e5fd95f32674c7d8e04a75 (diff) | |
download | kombu-revert-1089-issue-1087-redis-fix.tar.gz |
Revert "Issue #1087 redis fix (#1089)"revert-1089-issue-1087-redis-fix
This reverts commit 2f6f5f6a5df1cf52bf8fd45ee9fb3c93d793d637.
-rw-r--r-- | AUTHORS | 1 | ||||
-rw-r--r-- | kombu/transport/redis.py | 93 | ||||
-rw-r--r-- | kombu/transport/virtual/base.py | 26 | ||||
-rw-r--r-- | kombu/transport/virtual/exchange.py | 2 | ||||
-rw-r--r-- | requirements/test.txt | 1 | ||||
-rw-r--r-- | t/unit/transport/test_redis.py | 863 |
6 files changed, 306 insertions, 680 deletions
@@ -90,7 +90,6 @@ Marcin Lulek (ergo) <info@webreactor.eu> Marcin Puhacz <marcin.puhacz@gmail.com> Mark Lavin <mlavin@caktusgroup.com> markow <markow@red-sky.pl> -Matt Davis <matteius@gmail.com> Matt Wise <wise@wiredgeek.net> Maxime Rouyrre <rouyrre+git@gmail.com> mdk <luc.mdk@gmail.com> diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index b4a3d362..37524b5b 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -3,12 +3,12 @@ from __future__ import absolute_import, unicode_literals import numbers import socket -import warnings from bisect import bisect from collections import namedtuple from contextlib import contextmanager from time import time + from vine import promise from kombu.exceptions import InconsistencyError, VersionMismatch @@ -25,7 +25,6 @@ from kombu.utils.uuid import uuid from kombu.utils.compat import _detect_environment from . import virtual -from .virtual.base import UndeliverableWarning, UNDELIVERABLE_FMT try: import redis @@ -147,8 +146,12 @@ class QoS(virtual.QoS): def append(self, message, delivery_tag): delivery = message.delivery_info EX, RK = delivery['exchange'], delivery['routing_key'] - # Redis-py changed the format of zadd args in v3.0.0 to be like this - zadd_args = [{delivery_tag: time()}] + # TODO: Remove this once we soley on Redis-py 3.0.0+ + if redis.VERSION[0] >= 3: + # Redis-py changed the format of zadd args in v3.0.0 + zadd_args = [{delivery_tag: time()}] + else: + zadd_args = [time(), delivery_tag] with self.pipe_or_acquire() as pipe: pipe.zadd(self.unacked_index_key, *zadd_args) \ @@ -710,8 +713,7 @@ class Channel(virtual.Channel): queues = self._queue_cycle.consume(len(self.active_queues)) if not queues: return - _q_for_pri = self._queue_for_priority - keys = [_q_for_pri(queue, pri) for pri in self.priority_steps + keys = [self._q_for_pri(queue, pri) for pri in self.priority_steps for queue in queues] + [timeout or 0] self._in_poll = self.client.connection self.client.connection.send_command('BRPOP', *keys) @@ -747,8 +749,7 @@ class Channel(virtual.Channel): def _get(self, queue): with self.conn_or_acquire() as client: for pri in self.priority_steps: - queue_name = self._queue_for_priority(queue, pri) - item = client.rpop(queue_name) + item = client.rpop(self._q_for_pri(queue, pri)) if item: return loads(bytes_to_str(item)) raise Empty() @@ -757,21 +758,14 @@ class Channel(virtual.Channel): with self.conn_or_acquire() as client: with client.pipeline() as pipe: for pri in self.priority_steps: - queue_name = self._queue_for_priority(queue, pri) - pipe = pipe.llen(queue_name) + pipe = pipe.llen(self._q_for_pri(queue, pri)) sizes = pipe.execute() - size = sum(size for size in sizes + return sum(size for size in sizes if isinstance(size, numbers.Integral)) - return size - def _queue_for_priority(self, queue, pri): + def _q_for_pri(self, queue, pri): pri = self.priority(pri) - if pri: - queue_args = (queue, self.sep, pri) - else: - queue_args = (queue, '', '') - priority_queue_name = '%s%s%s' % queue_args - return priority_queue_name + return '%s%s%s' % ((queue, self.sep, pri) if pri else (queue, '', '')) def priority(self, n): steps = self.priority_steps @@ -782,7 +776,7 @@ class Channel(virtual.Channel): pri = self._get_message_priority(message, reverse=False) with self.conn_or_acquire() as client: - client.lpush(self._queue_for_priority(queue, pri), dumps(message)) + client.lpush(self._q_for_pri(queue, pri), dumps(message)) def _put_fanout(self, exchange, message, routing_key, **kwargs): """Deliver fanout message.""" @@ -817,14 +811,14 @@ class Channel(virtual.Channel): queue or ''])) with client.pipeline() as pipe: for pri in self.priority_steps: - pipe = pipe.delete(self._queue_for_priority(queue, pri)) + pipe = pipe.delete(self._q_for_pri(queue, pri)) pipe.execute() def _has_queue(self, queue, **kwargs): with self.conn_or_acquire() as client: with client.pipeline() as pipe: for pri in self.priority_steps: - pipe = pipe.exists(self._queue_for_priority(queue, pri)) + pipe = pipe.exists(self._q_for_pri(queue, pri)) return any(pipe.execute()) def get_table(self, exchange): @@ -835,55 +829,30 @@ class Channel(virtual.Channel): raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key)) return [tuple(bytes_to_str(val).split(self.sep)) for val in values] - def _lookup(self, exchange, routing_key, default=None): - """Find all queues matching `routing_key` for the given `exchange`. + def _lookup_direct(self, exchange, routing_key): + if not exchange: + return [routing_key] - Returns: - str: queue name -- must return the string `default` - if no queues matched. - """ + key = self.keyprefix_queue % exchange pattern = '' - result = [] - if default is None: - default = self.deadletter_queue - if not exchange: # anon exchange - return [routing_key or default] - queue = routing_key - redis_key = self.keyprefix_queue % exchange - try: - queue_bind = self.sep.join([ - routing_key or '', - pattern, - queue or '', - ]) - with self.conn_or_acquire() as client: - if not client.scard(redis_key): - pass # Do not check if its a member because set is empty - elif client.sismember(redis_key, queue_bind): - result = [queue] - except KeyError: - pass - - if not result: - if default is not None: - warnings.warn(UndeliverableWarning(UNDELIVERABLE_FMT.format( - exchange=exchange, routing_key=routing_key)), - ) - self._new_queue(default) - result = [default] - else: - raise InconsistencyError(NO_ROUTE_ERROR.format( - exchange, redis_key)) + queue_bind = self.sep.join([ + routing_key or '', + pattern, + queue or '', + ]) + with self.conn_or_acquire() as client: + if client.sismember(key, queue_bind): + return [queue] - return result + return [] def _purge(self, queue): with self.conn_or_acquire() as client: with client.pipeline() as pipe: for pri in self.priority_steps: - priority_queue = self._queue_for_priority(queue, pri) - pipe = pipe.llen(priority_queue).delete(priority_queue) + priq = self._q_for_pri(queue, pri) + pipe = pipe.llen(priq).delete(priq) sizes = pipe.execute() return sum(sizes[::2]) diff --git a/kombu/transport/virtual/base.py b/kombu/transport/virtual/base.py index fbfc1563..9450cad2 100644 --- a/kombu/transport/virtual/base.py +++ b/kombu/transport/virtual/base.py @@ -710,20 +710,36 @@ class Channel(AbstractChannel, base.StdChannel): return [routing_key or default] try: - result = self.typeof(exchange).lookup( + R = self.typeof(exchange).lookup( self.get_table(exchange), exchange, routing_key, default, ) except KeyError: - result = [] + R = [] - if not result and default is not None: + if not R and default is not None: warnings.warn(UndeliverableWarning(UNDELIVERABLE_FMT.format( exchange=exchange, routing_key=routing_key)), ) self._new_queue(default) - result = [default] - return result + R = [default] + return R + + def _lookup_direct(self, exchange, routing_key): + """Find queue matching `routing_key` for given direct `exchange`. + + Returns: + str: queue name + """ + if not exchange: + return [routing_key] + + return self.exchange_types['direct'].lookup( + table=self.get_table(exchange), + exchange=exchange, + routing_key=routing_key, + default=None, + ) def _restore(self, message): """Redeliver message to its original destination.""" diff --git a/kombu/transport/virtual/exchange.py b/kombu/transport/virtual/exchange.py index 51909a1d..4f9e4fb7 100644 --- a/kombu/transport/virtual/exchange.py +++ b/kombu/transport/virtual/exchange.py @@ -65,7 +65,7 @@ class DirectExchange(ExchangeType): } def deliver(self, message, exchange, routing_key, **kwargs): - _lookup = self.channel._lookup + _lookup = self.channel._lookup_direct _put = self.channel._put for queue in _lookup(exchange, routing_key): _put(queue, message, **kwargs) diff --git a/requirements/test.txt b/requirements/test.txt index 91b210a0..ef7aa4e5 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -3,4 +3,3 @@ case>=1.5.2 pytest pytest-sugar Pyro4 -fakeredis==1.0.4 diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py index 1c4f6130..1dcfe71e 100644 --- a/t/unit/transport/test_redis.py +++ b/t/unit/transport/test_redis.py @@ -1,35 +1,22 @@ from __future__ import absolute_import, unicode_literals -import fakeredis import pytest -import redis -import unittest import socket +import types -from array import array -from case import ANY, ContextMock, Mock, call, mock, skip, patch from collections import defaultdict -from contextlib import contextmanager from itertools import count +from case import ANY, ContextMock, Mock, call, mock, skip, patch + from kombu import Connection, Exchange, Queue, Consumer, Producer from kombu.exceptions import InconsistencyError, VersionMismatch -from kombu.five import Empty, Queue as _Queue +from kombu.five import Empty, Queue as _Queue, bytes_if_py2 from kombu.transport import virtual from kombu.utils import eventio # patch poll from kombu.utils.json import dumps -_fake_redis_client = None - - -def _get_fake_redis_client(): - global _fake_redis_client - if _fake_redis_client is None: - _fake_redis_client = FakeRedisClient() - return _fake_redis_client - - class _poll(eventio._select): def register(self, fd, flags): @@ -46,177 +33,33 @@ class _poll(eventio._select): eventio.poll = _poll # must import after poller patch, pep8 complains -from kombu.transport import redis as kombu_redis # noqa +from kombu.transport import redis # noqa class ResponseError(Exception): pass -class DummyParser(object): - - def __init__(self, *args, **kwargs): - self.socket_read_size = 1 - self.encoder = None - self._sock = None - self._buffer = None - - def on_disconnect(self): - self.socket_read_size = 1 - self.encoder = None - self._sock = None - self._buffer = None - - def on_connect(self, connection): - pass - - -class FakeRedisSocket(fakeredis._server.FakeSocket): - blocking = True - filenos = count(30) - - def __init__(self, server): - super(FakeRedisSocket, self).__init__(server) - self._server = server - self._fileno = next(self.filenos) - self.data = [] - self.connection = None - self.channel = None - self.transport_options = {} - self.hostname = None - self.port = None - self.password = None - self.virtual_host = '/' - self.max_connections = 10 - self.ssl = None - - -class FakeRedisConnection(fakeredis.FakeConnection): - disconnected = False - default_port = 6379 - channel_max = 65535 - - def __init__(self, client, server, **kwargs): - kwargs['parser_class'] = DummyParser - super(fakeredis.FakeConnection, self).__init__(**kwargs) - if client is None: - client = _get_fake_redis_client() - self.client = client - if server is None: - server = fakeredis.FakeServer() - self._server = server - self._sock = FakeRedisSocket(server=server) - try: - self.on_connect() - except redis.exceptions.RedisError: - # clean up after any error in on_connect - self.disconnect() - raise - self._parser = () - self._avail_channel_ids = array( - virtual.base.ARRAY_TYPE_H, range(self.channel_max, 0, -1), - ) - self.cycle = kombu_redis.MultiChannelPoller() - conn_errs, channel_errs = kombu_redis.get_redis_error_classes() - self.connection_errors, self.channel_errors = conn_errs, channel_errs - - def disconnect(self): - self.disconnected = True - - -class FakeRedisConnectionPool(redis.ConnectionPool): - def __init__(self, connection_class, max_connections=None, - **connection_kwargs): - connection_class = FakeRedisConnection - connection_kwargs['client'] = None - connection_kwargs['server'] = None - self._connections = [] - super(FakeRedisConnectionPool, self).__init__( - connection_class=connection_class, - max_connections=max_connections, - **connection_kwargs - ) - - def get_connection(self, *args, **kwargs): - connection = self.connection_class(**self.connection_kwargs) - self._connections.append(connection) - return connection - - def release(self, connection): - pass - - -class FakeRedisClient(fakeredis.FakeStrictRedis): +class Client(object): queues = {} + sets = defaultdict(set) + hashes = defaultdict(dict) shard_hint = None def __init__(self, db=None, port=None, connection_pool=None, **kwargs): self._called = [] self._connection = None self.bgsave_raises_ResponseError = False - self.server = server = fakeredis.FakeServer() - connection_pool = FakeRedisConnectionPool(FakeRedisConnection) - self.connection_pool = connection_pool - super(FakeRedisClient, self).__init__( - db=db, port=port, connection_pool=connection_pool, server=server) - self.connection = FakeRedisConnection(self, server) - self.response_callbacks = dict() - - def __del__(self, key=None): - if key: - self.delete(key) - - def ping(self, *args, **kwargs): - return True - - def pipeline(self): - return FakePipeline(self.server, self.connection_pool, [], '1234', '') - - def set_response_callback(self, command, callback): - pass - - def _new_queue(self, queue, auto_delete=False, **kwargs): - self.queues[queue] = _Queue() - if auto_delete: - self.auto_delete_queues.add(queue) - - def rpop(self, key): - try: - return self.queues[key].get_nowait() - except (KeyError, Empty): - pass - - def llen(self, key): - try: - return self.queues[key].qsize() - except KeyError: - return 0 - - def lpush(self, key, value): - self.queues[key].put_nowait(value) + self.connection = self._sconnection(self) - def pubsub(self, *args, **kwargs): - self.connection_pool = FakeRedisConnectionPool(FakeRedisConnection) - return self + def bgsave(self): + self._called.append('BGSAVE') + if self.bgsave_raises_ResponseError: + raise ResponseError() def delete(self, key): self.queues.pop(key, None) - -class FakeRedisClientLite(object): - """The original FakeRedis client from Kombu to support the - Producer/Consumer TestCases, preferred to use FakeRedisClient.""" - queues = {} - sets = defaultdict(set) - hashes = defaultdict(dict) - shard_hint = None - - def __init__(self, db=None, port=None, connection_pool=None, **kwargs): - self._called = [] - self._connection = None - self.bgsave_raises_ResponseError = False - self.connection = self._sconnection(self) - def exists(self, key): return key in self.queues or key in self.sets @@ -233,9 +76,14 @@ class FakeRedisClientLite(object): self.sets[key].add(member) def zadd(self, key, *args): - (mapping,) = args - for item in mapping: - self.sets[key].add(item) + if redis.redis.VERSION[0] >= 3: + (mapping,) = args + for item in mapping: + self.sets[key].add(item) + else: + # TODO: remove me when we drop support for Redis-py v2 + (score1, member1) = args + self.sets[key].add(member1) def smembers(self, key): return self.sets.get(key, set()) @@ -243,9 +91,6 @@ class FakeRedisClientLite(object): def sismember(self, name, value): return value in self.sets.get(name, set()) - def scard(self, key): - return len(self.sets.get(key, set())) - def ping(self, *args, **kwargs): return True @@ -293,7 +138,7 @@ class FakeRedisClientLite(object): return k in self._called def pipeline(self): - return FakePipelineLite(self) + return Pipeline(self) def encode(self, value): return str(value) @@ -328,8 +173,22 @@ class FakeRedisClientLite(object): def send_command(self, cmd, *args): self._sock.data.append((cmd, args)) + def info(self): + return {'foo': 1} + + def pubsub(self, *args, **kwargs): + connection = self.connection + + class ConnectionPool(object): -class FakePipelineLite(object): + def get_connection(self, *args, **kwargs): + return connection + self.connection_pool = ConnectionPool() + + return self + + +class Pipeline(object): def __init__(self, client): self.client = client @@ -357,84 +216,10 @@ class FakePipelineLite(object): return [fun(*args, **kwargs) for fun, args, kwargs in stack] -class FakePipeline(redis.client.Pipeline): - - def __init__(self, server, connection_pool, - response_callbacks, transaction, shard_hint): - if not server: - server = fakeredis.FakeServer() - self._server = server - correct_pool_instance = isinstance( - connection_pool, FakeRedisConnectionPool) - if connection_pool is not None and not correct_pool_instance: - connection_pool = FakeRedisConnectionPool(FakeRedisConnection) - self.connection_pool = connection_pool - self.connection = FakeRedisConnection(self, server) - self.client = connection_pool.get_connection().client - self.response_callbacks = response_callbacks - self.transaction = transaction - self.shard_hint = shard_hint - self.watching = False - self.reset() - - def __enter__(self): - return self - - def __exit__(self, *exc_info): - pass - - def __getattr__(self, key): - if key not in self.__dict__: - - def _add(*args, **kwargs): - self.command_stack.append( - (getattr(self.connection.client, key), args, kwargs)) - return self - - return _add - return self.__dict__[key] - - def reset(self): - # Do nothing with the real connection - self.command_stack = [] - self.scripts = set() - - def execute(self): - stack = list(self.command_stack) - all_cmds = self.connection.pack_commands( - [args for args, _ in self.command_stack]) - self.connection.send_packed_command(all_cmds) - - response = [] - for cmd in all_cmds: - try: - response.append( - self.parse_response(self.connection, cmd)) - except ResponseError: - import sys - response.append(sys.exc_info()[1]) - - self.raise_first_error(self.command_stack, response) - results = [] - for t, kwargs in stack: - redis_func_name = t[0] - redis_func_name = redis_func_name.lower() - if redis_func_name == 'del': - redis_func_name = 'delete' - args = t[1:] - fun = getattr(self.client, redis_func_name) - r = fun(*args, **kwargs) - results.append(r) - - self.command_stack[:] = [] - self.reset() - return results - - -class FakeRedisKombuChannelLite(kombu_redis.Channel): +class Channel(redis.Channel): def _get_client(self): - return FakeRedisClientLite + return Client def _get_pool(self, asynchronous=False): return Mock() @@ -444,97 +229,42 @@ class FakeRedisKombuChannelLite(kombu_redis.Channel): def _new_queue(self, queue, **kwargs): for pri in self.priority_steps: - self.client._new_queue(self._queue_for_priority(queue, pri)) + self.client._new_queue(self._q_for_pri(queue, pri)) def pipeline(self): - return FakePipelineLite(FakeRedisClientLite()) - - -class FakeRedisKombuChannel(kombu_redis.Channel): - _fanout_queues = {} + return Pipeline(Client()) - def __init__(self, *args, **kwargs): - super(FakeRedisKombuChannel, self).__init__(*args, **kwargs) - def _get_client(self): - return FakeRedisClient - - def _create_client(self, asynchronous=False): - global _fake_redis_client - if _fake_redis_client is None: - _fake_redis_client = self._get_client()() - return _fake_redis_client - - @contextmanager - def conn_or_acquire(self, client=None): - if client: - yield client - else: - yield self._create_client() - - def _get_pool(self, asynchronous=False): - params = self._connparams(asynchronous=asynchronous) - self.keyprefix_fanout = self.keyprefix_fanout.format(db=params['db']) - return FakeRedisConnectionPool(**params) - - def _get_response_error(self): - return ResponseError - - def _new_queue(self, queue, **kwargs): - for pri in self.priority_steps: - self.client._new_queue(self._queue_for_priority(queue, pri)) - - def pipeline(self): - yield _get_fake_redis_client().pipeline() - - def basic_publish(self, message, exchange='', routing_key='', **kwargs): - self._inplace_augment_message(message, exchange, routing_key) - # anon exchange: routing_key is the destination queue - return self._put(routing_key, message, **kwargs) - - -class FakeRedisKombuTransportLite(kombu_redis.Transport): - Channel = FakeRedisKombuChannelLite - - def __init__(self, *args, **kwargs): - super(FakeRedisKombuTransportLite, self).__init__(*args, **kwargs) +class Transport(redis.Transport): + Channel = Channel def _get_errors(self): return ((KeyError,), (IndexError,)) -class FakeRedisKombuTransport(FakeRedisKombuTransportLite): - Channel = FakeRedisKombuChannel - - @skip.unless_module('redis') -class TestRedisChannel(unittest.TestCase): +class test_Channel: - def setUp(self): + def setup(self): self.connection = self.create_connection() self.channel = self.connection.default_channel - def tearDown(self): - self.connection = None - self.channel = None - global _fake_redis_client - _fake_redis_client = None - def create_connection(self, **kwargs): kwargs.setdefault('transport_options', {'fanout_patterns': True}) - return Connection(transport=FakeRedisKombuTransport, **kwargs) + return Connection(transport=Transport, **kwargs) def _get_one_delivery_tag(self, n='test_uniq_tag'): - chan = self.connection.default_channel - chan.exchange_declare(n) - chan.queue_declare(n) - chan.queue_bind(n, n, n) - msg = chan.prepare_message('quick brown fox') - chan.basic_publish(msg, n, n) - payload = chan._get(n) - assert payload - pymsg = chan.message_to_python(payload) - return pymsg.delivery_tag + with self.create_connection() as conn1: + chan = conn1.default_channel + chan.exchange_declare(n) + chan.queue_declare(n) + chan.queue_bind(n, n, n) + msg = chan.prepare_message('quick brown fox') + chan.basic_publish(msg, n, n) + payload = chan._get(n) + assert payload + pymsg = chan.message_to_python(payload) + return pymsg.delivery_tag def test_delivery_tag_is_uuid(self): seen = set() @@ -547,10 +277,9 @@ class TestRedisChannel(unittest.TestCase): assert len(tag) == 36 def test_disable_ack_emulation(self): - conn = Connection( - transport=FakeRedisKombuTransport, - transport_options={'ack_emulation': False} - ) + conn = Connection(transport=Transport, transport_options={ + 'ack_emulation': False, + }) chan = conn.channel() assert not chan.ack_emulation @@ -561,7 +290,7 @@ class TestRedisChannel(unittest.TestCase): pool_at_init = [pool] client = Mock(name='client') - class XChannel(FakeRedisKombuChannel): + class XChannel(Channel): def __init__(self, *args, **kwargs): self._pool = pool_at_init[0] @@ -570,12 +299,7 @@ class TestRedisChannel(unittest.TestCase): def _get_client(self): return lambda *_, **__: client - def _create_client(self, asynchronous=False): - if asynchronous: - return self.Client(connection_pool=self.async_pool) - return self.Client(connection_pool=self.pool) - - class XTransport(FakeRedisKombuTransport): + class XTransport(Transport): Channel = XChannel conn = Connection(transport=XTransport) @@ -670,15 +394,14 @@ class TestRedisChannel(unittest.TestCase): client = client() def pipe(*args, **kwargs): - return FakePipelineLite(client) - + return Pipeline(client) client.pipeline = pipe client.zrevrangebyscore.return_value = [ (1, 10), (2, 20), (3, 30), ] - qos = kombu_redis.QoS(self.channel) + qos = redis.QoS(self.channel) restore = qos.restore_by_tag = Mock(name='restore_by_tag') qos._vrestore_count = 1 qos.restore_visible() @@ -700,13 +423,14 @@ class TestRedisChannel(unittest.TestCase): assert qos._vrestore_count == 1 qos._vrestore_count = 0 - client.setnx.side_effect = kombu_redis.MutexHeld() + client.setnx.side_effect = redis.MutexHeld() qos.restore_visible() def test_basic_consume_when_fanout_queue(self): self.channel.exchange_declare(exchange='txconfan', type='fanout') self.channel.queue_declare(queue='txconfanq') self.channel.queue_bind(queue='txconfanq', exchange='txconfan') + assert 'txconfanq' in self.channel._fanout_queues self.channel.basic_consume('txconfanq', False, None, 1) assert 'txconfanq' in self.channel.active_fanout_queues @@ -820,7 +544,7 @@ class TestRedisChannel(unittest.TestCase): c = self.channel.client = Mock() c.parse_response.return_value = None - with pytest.raises(kombu_redis.Empty): + with pytest.raises(redis.Empty): self.channel._brpop_read() def test_poll_error(self): @@ -859,19 +583,19 @@ class TestRedisChannel(unittest.TestCase): self.channel._put('george', msg1) client().lpush.assert_called_with( - self.channel._queue_for_priority('george', 3), dumps(msg1), + self.channel._q_for_pri('george', 3), dumps(msg1), ) msg2 = {'properties': {'priority': 313}} self.channel._put('george', msg2) client().lpush.assert_called_with( - self.channel._queue_for_priority('george', 9), dumps(msg2), + self.channel._q_for_pri('george', 9), dumps(msg2), ) msg3 = {'properties': {}} self.channel._put('george', msg3) client().lpush.assert_called_with( - self.channel._queue_for_priority('george', 0), dumps(msg3), + self.channel._q_for_pri('george', 0), dumps(msg3), ) def test_delete(self): @@ -883,12 +607,10 @@ class TestRedisChannel(unittest.TestCase): x._delete('queue', 'exchange', 'routing_key', None) delete.assert_has_calls([ - call(x._queue_for_priority('queue', pri)) - for pri in kombu_redis.PRIORITY_STEPS + call(x._q_for_pri('queue', pri)) for pri in redis.PRIORITY_STEPS ]) - srem.assert_called_with( - x.keyprefix_queue % ('exchange',), - x.sep.join(['routing_key', '', 'queue'])) + srem.assert_called_with(x.keyprefix_queue % ('exchange',), + x.sep.join(['routing_key', '', 'queue'])) def test_has_queue(self): self.channel._create_client = Mock() @@ -897,8 +619,8 @@ class TestRedisChannel(unittest.TestCase): exists.return_value = True assert self.channel._has_queue('foo') exists.assert_has_calls([ - call(self.channel._queue_for_priority('foo', pri)) - for pri in kombu_redis.PRIORITY_STEPS + call(self.channel._q_for_pri('foo', pri)) + for pri in redis.PRIORITY_STEPS ]) exists.return_value = False @@ -942,11 +664,9 @@ class TestRedisChannel(unittest.TestCase): assert self.channel._connparams()['db'] == 124 def test_new_queue_with_auto_delete(self): - kombu_redis.Channel._new_queue( - self.channel, 'george', auto_delete=False) + redis.Channel._new_queue(self.channel, 'george', auto_delete=False) assert 'george' not in self.channel.auto_delete_queues - kombu_redis.Channel._new_queue( - self.channel, 'elaine', auto_delete=True) + redis.Channel._new_queue(self.channel, 'elaine', auto_delete=True) assert 'elaine' in self.channel.auto_delete_queues def test_connparams_regular_hostname(self): @@ -975,21 +695,22 @@ class TestRedisChannel(unittest.TestCase): cycle.rotate('elaine') def test_get_client(self): - kombu_redis_client = kombu_redis.Channel._get_client(self.channel) - assert kombu_redis_client + import redis as R + KombuRedis = redis.Channel._get_client(self.channel) + assert KombuRedis - redis_version = getattr(redis, 'VERSION', None) + Rv = getattr(R, 'VERSION', None) try: - redis.VERSION = (2, 4, 0) + R.VERSION = (2, 4, 0) with pytest.raises(VersionMismatch): - kombu_redis.Channel._get_client(self.channel) + redis.Channel._get_client(self.channel) finally: - if redis_version is not None: - redis.VERSION = redis_version + if Rv is not None: + R.VERSION = Rv def test_get_response_error(self): - kombu_error = kombu_redis.Channel._get_response_error(self.channel) - assert kombu_error is redis.exceptions.ResponseError + from redis.exceptions import ResponseError + assert redis.Channel._get_response_error(self.channel) is ResponseError def test_avail_client(self): self.channel._pool = Mock() @@ -1004,7 +725,7 @@ class TestRedisChannel(unittest.TestCase): transport.cycle.fds = {12: 'LISTEN', 13: 'BRPOP'} conn = Mock(name='conn') loop = Mock(name='loop') - kombu_redis.Transport.register_with_event_loop(transport, conn, loop) + redis.Transport.register_with_event_loop(transport, conn, loop) transport.cycle.on_poll_init.assert_called_with(loop.poller) loop.call_repeatedly.assert_called_with( 10, transport.cycle.maybe_restore_messages, @@ -1024,23 +745,34 @@ class TestRedisChannel(unittest.TestCase): cycle = transport.cycle = Mock(name='cyle') cycle.on_readable.return_value = None - kombu_redis.Transport.on_readable(transport, 13) + redis.Transport.on_readable(transport, 13) cycle.on_readable.assert_called_with(13) def test_transport_get_errors(self): - assert kombu_redis.Transport._get_errors(self.connection.transport) + assert redis.Transport._get_errors(self.connection.transport) def test_transport_driver_version(self): - assert kombu_redis.Transport.driver_version(self.connection.transport) + assert redis.Transport.driver_version(self.connection.transport) def test_transport_get_errors_when_InvalidData_used(self): + from redis import exceptions + + class ID(Exception): + pass + + DataError = getattr(exceptions, 'DataError', None) + InvalidData = getattr(exceptions, 'InvalidData', None) + exceptions.InvalidData = ID + exceptions.DataError = None try: - errors = kombu_redis.Transport._get_errors( - self.connection.transport) + errors = redis.Transport._get_errors(self.connection.transport) assert errors - assert redis.exceptions.DataError in errors[1] - except Exception: - raise + assert ID in errors[1] + finally: + if DataError is not None: + exceptions.DataError = DataError + if InvalidData is not None: + exceptions.InvalidData = InvalidData def test_empty_queues_key(self): channel = self.channel @@ -1048,14 +780,13 @@ class TestRedisChannel(unittest.TestCase): key = channel.keyprefix_queue % 'celery' # Everything is fine, there is a list of queues. - list_of_queues = 'celery\x06\x16\x06\x16celery' - channel.client.sadd(key, list_of_queues) + channel.client.sadd(key, 'celery\x06\x16\x06\x16celery') assert channel.get_table('celery') == [ ('celery', '', 'celery'), ] # ... then for some reason, the _kombu.binding.celery key gets lost - channel.client.srem(key, list_of_queues) + channel.client.srem(key) # which raises a channel error so that the consumer/publisher # can recover by redeclaring the required entities. @@ -1068,7 +799,7 @@ class TestRedisChannel(unittest.TestCase): connparams = conn.default_channel._connparams() assert issubclass( connparams['connection_class'], - redis.UnixDomainSocketConnection, + redis.redis.UnixDomainSocketConnection, ) assert connparams['path'] == '/tmp/redis.sock' @@ -1095,7 +826,7 @@ class TestRedisChannel(unittest.TestCase): connparams = conn.default_channel._connparams() assert issubclass( connparams['connection_class'], - redis.SSLConnection, + redis.redis.SSLConnection, ) def test_rediss_connection(self): @@ -1104,16 +835,16 @@ class TestRedisChannel(unittest.TestCase): connparams = conn.default_channel._connparams() assert issubclass( connparams['connection_class'], - redis.SSLConnection, + redis.redis.SSLConnection, ) def test_sep_transport_option(self): - conn_kwargs = dict( - transport=FakeRedisKombuTransport, - transport_options={'sep': ':'}) - with Connection(**conn_kwargs) as conn: + with Connection(transport=Transport, transport_options={ + 'sep': ':', + }) as conn: key = conn.default_channel.keyprefix_queue % 'celery' conn.default_channel.client.sadd(key, 'celery::celery') + assert conn.default_channel.sep == ':' assert conn.default_channel.get_table('celery') == [ ('celery', '', 'celery'), @@ -1121,32 +852,71 @@ class TestRedisChannel(unittest.TestCase): @skip.unless_module('redis') -@mock.patch('redis.Connection', FakeRedisConnection) -class TestRedisConnections(unittest.TestCase): +class test_Redis: - def setUp(self): - self.connection = self.create_connection() - self.exchange_name = exchange_name = 'test_Redis' - self.exchange = Exchange(exchange_name, type='direct') + def setup(self): + self.connection = Connection(transport=Transport) + self.exchange = Exchange('test_Redis', type='direct') self.queue = Queue('test_Redis', self.exchange, 'test_Redis') - self.queue(self.connection.default_channel).declare() - self.real_scard = FakeRedisClient.scard - def tearDown(self): + def teardown(self): self.connection.close() - self.queue = None - self.exchange = None - global _fake_redis_client - _fake_redis_client = None - FakeRedisClient.scard = self.real_scard - def create_connection(self, **kwargs): - kwargs.setdefault('transport_options', {'fanout_patterns': True}) - return Connection(transport=FakeRedisKombuTransport, **kwargs) + @mock.replace_module_value(redis.redis, 'VERSION', [3, 0, 0]) + def test_publish__get_redispyv3(self): + channel = self.connection.channel() + producer = Producer(channel, self.exchange, routing_key='test_Redis') + self.queue(channel).declare() + + producer.publish({'hello': 'world'}) + + assert self.queue(channel).get().payload == {'hello': 'world'} + assert self.queue(channel).get() is None + assert self.queue(channel).get() is None + assert self.queue(channel).get() is None + + @mock.replace_module_value(redis.redis, 'VERSION', [2, 5, 10]) + def test_publish__get_redispyv2(self): + channel = self.connection.channel() + producer = Producer(channel, self.exchange, routing_key='test_Redis') + self.queue(channel).declare() + + producer.publish({'hello': 'world'}) + + assert self.queue(channel).get().payload == {'hello': 'world'} + assert self.queue(channel).get() is None + assert self.queue(channel).get() is None + assert self.queue(channel).get() is None + + def test_publish__consume(self): + connection = Connection(transport=Transport) + channel = connection.channel() + producer = Producer(channel, self.exchange, routing_key='test_Redis') + consumer = Consumer(channel, queues=[self.queue]) + + producer.publish({'hello2': 'world2'}) + _received = [] + + def callback(message_data, message): + _received.append(message_data) + message.ack() + + consumer.register_callback(callback) + consumer.consume() + + assert channel in channel.connection.cycle._channels + try: + connection.drain_events(timeout=1) + assert _received + with pytest.raises(socket.timeout): + connection.drain_events(timeout=0.01) + finally: + channel.close() def test_purge(self): - channel = self.connection.default_channel + channel = self.connection.channel() producer = Producer(channel, self.exchange, routing_key='test_Redis') + self.queue(channel).declare() for i in range(10): producer.publish({'hello': 'world-%s' % (i,)}) @@ -1157,38 +927,38 @@ class TestRedisConnections(unittest.TestCase): def test_db_values(self): Connection(virtual_host=1, - transport=FakeRedisKombuTransport).channel() + transport=Transport).channel() Connection(virtual_host='1', - transport=FakeRedisKombuTransport).channel() + transport=Transport).channel() Connection(virtual_host='/1', - transport=FakeRedisKombuTransport).channel() + transport=Transport).channel() with pytest.raises(Exception): Connection('redis:///foo').channel() def test_db_port(self): - c1 = Connection(port=None, transport=FakeRedisKombuTransport).channel() + c1 = Connection(port=None, transport=Transport).channel() c1.close() - c2 = Connection(port=9999, transport=FakeRedisKombuTransport).channel() + c2 = Connection(port=9999, transport=Transport).channel() c2.close() def test_close_poller_not_active(self): - c = Connection(transport=FakeRedisKombuTransport).channel() + c = Connection(transport=Transport).channel() cycle = c.connection.cycle c.client.connection c.close() assert c not in cycle._channels def test_close_ResponseError(self): - c = Connection(transport=FakeRedisKombuTransport).channel() + c = Connection(transport=Transport).channel() c.client.bgsave_raises_ResponseError = True c.close() def test_close_disconnects(self): - c = Connection(transport=FakeRedisKombuTransport).channel() + c = Connection(transport=Transport).channel() conn1 = c.client.connection conn2 = c.subclient.connection c.close() @@ -1202,133 +972,62 @@ class TestRedisConnections(unittest.TestCase): channel.close() def test_get_client(self): - conn = Connection(transport=FakeRedisKombuTransport) - chan = conn.channel() - assert chan.Client - assert chan.ResponseError - assert conn.transport.connection_errors - assert conn.transport.channel_errors + with mock.module_exists(*_redis_modules()): + conn = Connection(transport=Transport) + chan = conn.channel() + assert chan.Client + assert chan.ResponseError + assert conn.transport.connection_errors + assert conn.transport.channel_errors def test_check_at_least_we_try_to_connect_and_fail(self): + import redis connection = Connection('redis://localhost:65534/') with pytest.raises(redis.exceptions.ConnectionError): chan = connection.channel() chan._size('some_queue') - def test_redis_queue_lookup_happy_path(self): - fake_redis_client = _get_fake_redis_client() - redis_channel = self.connection.default_channel - routing_key = redis_channel.keyprefix_queue % self.exchange_name - redis_channel.queue_bind(routing_key, self.exchange_name, routing_key) - fake_redis_client.sadd(routing_key, routing_key) - lookup_queue_result = redis_channel._lookup( - exchange=self.exchange_name, - routing_key=routing_key, - default='default_queue') - assert lookup_queue_result == [routing_key] - - def test_redis_queue_lookup_gets_default_when_queue_doesnot_exist(self): - # Given: A test redis client and channel - fake_redis_client = _get_fake_redis_client() - redis_channel = self.connection.default_channel - # Given: The default queue is set: - default_queue = 'default_queue' - redis_channel.deadletter_queue = default_queue - # Determine the routing key - routing_key = redis_channel.keyprefix_queue % self.exchange - fake_redis_client.sadd(routing_key, "DoesNotExist") - lookup_queue_result = redis_channel._lookup( - exchange=self.exchange_name, - routing_key=routing_key, - default=default_queue) - assert lookup_queue_result == [redis_channel.deadletter_queue] - - def test_redis_queue_lookup_gets_queue_when_exchange_doesnot_exist(self): - # Given: A test redis client and channel - fake_redis_client = _get_fake_redis_client() - redis_channel = self.connection.default_channel - # Given: The default queue is set: - default_queue = 'default_queue' - redis_channel.deadletter_queue = default_queue - # Determine the routing key - routing_key = redis_channel.keyprefix_queue % self.exchange - fake_redis_client.sadd(routing_key, routing_key) - lookup_queue_result = redis_channel._lookup( - exchange=None, - routing_key=routing_key, - default=default_queue) - assert lookup_queue_result == [routing_key] - - def test_redis_queue_lookup_gets_default_when_route_doesnot_exist(self): - # Given: A test redis client and channel - fake_redis_client = _get_fake_redis_client() - redis_channel = self.connection.default_channel - # Given: The default queue is set: - default_queue = 'default_queue' - redis_channel.deadletter_queue = default_queue - # Determine the routing key - routing_key = redis_channel.keyprefix_queue % self.exchange - fake_redis_client.sadd(routing_key, "DoesNotExist") - lookup_queue_result = redis_channel._lookup( - exchange=None, - routing_key=None, - default=None) - assert lookup_queue_result == [default_queue] - - def test_redis_queue_lookup_raises_inconsistency_error(self): - connection = Mock(client=Mock( - hostname='127.0.0.1', - virtual_host='/', port=6379, - transport_options={}, - )) - redis_channel = FakeRedisKombuChannel(connection) - exchange = Mock(name='Exchange') - try: - redis_channel._lookup( - exchange=exchange.name, - routing_key='routing_key', - default=None) - except kombu_redis.InconsistencyError: - pass # This is expected - else: - raise("Redis test did not raise expected InconsistencyError!") - - def test_redis_queue_lookup_client_raises_key_error_gets_default(self): - fake_redis_client = _get_fake_redis_client() - fake_redis_client.scard = Mock(side_effect=KeyError) - redis_channel = self.connection.default_channel - routing_key = redis_channel.keyprefix_queue % self.exchange - redis_channel.queue_bind(routing_key, self.exchange_name, routing_key) - fake_redis_client.sadd(routing_key, routing_key) - default_queue_name = 'default_queue' - lookup_queue_result = redis_channel._lookup( - exchange=self.exchange_name, - routing_key=routing_key, - default=default_queue_name) - assert lookup_queue_result == [default_queue_name] - - def test_redis_queue_lookup_client_raises_key_error_gets_deadletter(self): - fake_redis_client = _get_fake_redis_client() - fake_redis_client.scard = Mock(side_effect=KeyError) - redis_channel = self.connection.default_channel - routing_key = redis_channel.keyprefix_queue % self.exchange - redis_channel.queue_bind(routing_key, self.exchange_name, routing_key) - fake_redis_client.sadd(routing_key, routing_key) - default_queue_name = 'deadletter_queue' - redis_channel.deadletter_queue = default_queue_name - lookup_queue_result = redis_channel._lookup( - exchange=self.exchange_name, - routing_key=routing_key, - default=None) - assert lookup_queue_result == [default_queue_name] + +def _redis_modules(): + + class ConnectionError(Exception): + pass + + class AuthenticationError(Exception): + pass + + class InvalidData(Exception): + pass + + class InvalidResponse(Exception): + pass + + class ResponseError(Exception): + pass + + exceptions = types.ModuleType(bytes_if_py2('redis.exceptions')) + exceptions.ConnectionError = ConnectionError + exceptions.AuthenticationError = AuthenticationError + exceptions.InvalidData = InvalidData + exceptions.InvalidResponse = InvalidResponse + exceptions.ResponseError = ResponseError + + class Redis(object): + pass + + myredis = types.ModuleType(bytes_if_py2('redis')) + myredis.exceptions = exceptions + myredis.Redis = Redis + + return myredis, exceptions @skip.unless_module('redis') -class TestKombuRedisMultiChannelPoller(unittest.TestCase): +class test_MultiChannelPoller: - def setUp(self): - self.Poller = kombu_redis.MultiChannelPoller + def setup(self): + self.Poller = redis.MultiChannelPoller def test_on_poll_start(self): p = self.Poller() @@ -1384,7 +1083,7 @@ class TestKombuRedisMultiChannelPoller(unittest.TestCase): redis_ctx_mock.__exit__ = Mock() redis_ctx_mock.__enter__ = Mock(return_value=redis_client_mock) chan1.conn_or_acquire.return_value = redis_ctx_mock - qos = kombu_redis.QoS(chan1) + qos = redis.QoS(chan1) qos.visibility_timeout = timeout qos.restore_visible() redis_client_mock.zrevrangebyscore\ @@ -1398,17 +1097,17 @@ class TestKombuRedisMultiChannelPoller(unittest.TestCase): chan.handlers = {'BRPOP': Mock(name='BRPOP')} chan.qos.can_consume.return_value = False - p.handle_event(13, kombu_redis.READ) + p.handle_event(13, redis.READ) chan.handlers['BRPOP'].assert_not_called() chan.qos.can_consume.return_value = True - p.handle_event(13, kombu_redis.READ) + p.handle_event(13, redis.READ) chan.handlers['BRPOP'].assert_called_with() - p.handle_event(13, kombu_redis.ERR) + p.handle_event(13, redis.ERR) chan._poll_error.assert_called_with('BRPOP') - p.handle_event(13, ~(kombu_redis.READ | kombu_redis.ERR)) + p.handle_event(13, ~(redis.READ | redis.ERR)) def test_fds(self): p = self.Poller() @@ -1535,12 +1234,12 @@ class TestKombuRedisMultiChannelPoller(unittest.TestCase): def test_get_no_actions(self): p, channel = self.create_get() - with pytest.raises(kombu_redis.Empty): + with pytest.raises(redis.Empty): p.get(Mock()) def test_qos_reject(self): p, channel = self.create_get() - qos = kombu_redis.QoS(channel) + qos = redis.QoS(channel) qos.ack = Mock(name='Qos.ack') qos.reject(1234) qos.ack.assert_called_with(1234) @@ -1549,7 +1248,7 @@ class TestKombuRedisMultiChannelPoller(unittest.TestCase): p, channel = self.create_get(queues=['a_queue']) channel.qos.can_consume.return_value = True - with pytest.raises(kombu_redis.Empty): + with pytest.raises(redis.Empty): p.get(Mock()) p._register_BRPOP.assert_called_with(channel) @@ -1558,7 +1257,7 @@ class TestKombuRedisMultiChannelPoller(unittest.TestCase): p, channel = self.create_get(queues=['a_queue']) channel.qos.can_consume.return_value = False - with pytest.raises(kombu_redis.Empty): + with pytest.raises(redis.Empty): p.get(Mock()) p._register_BRPOP.assert_not_called() @@ -1566,7 +1265,7 @@ class TestKombuRedisMultiChannelPoller(unittest.TestCase): def test_get_listen(self): p, channel = self.create_get(fanouts=['f_queue']) - with pytest.raises(kombu_redis.Empty): + with pytest.raises(redis.Empty): p.get(Mock()) p._register_LISTEN.assert_called_with(channel) @@ -1575,7 +1274,7 @@ class TestKombuRedisMultiChannelPoller(unittest.TestCase): p, channel = self.create_get(events=[(1, eventio.ERR)]) p._fd_to_chan[1] = (channel, 'BRPOP') - with pytest.raises(kombu_redis.Empty): + with pytest.raises(redis.Empty): p.get(Mock()) channel._poll_error.assert_called_with('BRPOP') @@ -1585,14 +1284,14 @@ class TestKombuRedisMultiChannelPoller(unittest.TestCase): (1, eventio.ERR)]) p._fd_to_chan[1] = (channel, 'BRPOP') - with pytest.raises(kombu_redis.Empty): + with pytest.raises(redis.Empty): p.get(Mock()) channel._poll_error.assert_called_with('BRPOP') @skip.unless_module('redis') -class TestKombuRedisMutex(unittest.TestCase): +class test_Mutex: def test_mutex(self, lock_id='xxx'): client = Mock(name='client') @@ -1604,13 +1303,13 @@ class TestKombuRedisMutex(unittest.TestCase): pipe = client.pipeline.return_value pipe.get.return_value = lock_id held = False - with kombu_redis.Mutex(client, 'foo1', 100): + with redis.Mutex(client, 'foo1', 100): held = True assert held client.setnx.assert_called_with('foo1', lock_id) pipe.get.return_value = 'yyy' held = False - with kombu_redis.Mutex(client, 'foo1', 100): + with redis.Mutex(client, 'foo1', 100): held = True assert held @@ -1618,93 +1317,35 @@ class TestKombuRedisMutex(unittest.TestCase): client.expire.reset_mock() pipe.get.return_value = lock_id client.setnx.return_value = False - with pytest.raises(kombu_redis.MutexHeld): + with pytest.raises(redis.MutexHeld): held = False - with kombu_redis.Mutex(client, 'foo1', '100'): + with redis.Mutex(client, 'foo1', '100'): held = True assert not held client.ttl.return_value = 0 - with pytest.raises(kombu_redis.MutexHeld): + with pytest.raises(redis.MutexHeld): held = False - with kombu_redis.Mutex(client, 'foo1', '100'): + with redis.Mutex(client, 'foo1', '100'): held = True assert not held client.expire.assert_called() # Wins but raises WatchError (and that is ignored) client.setnx.return_value = True - pipe.watch.side_effect = redis.WatchError() + pipe.watch.side_effect = redis.redis.WatchError() held = False - with kombu_redis.Mutex(client, 'foo1', 100): + with redis.Mutex(client, 'foo1', 100): held = True assert held -class TestRedisProducerConsumer(unittest.TestCase): - def setUp(self): - self.connection = self.create_connection() - self.channel = self.connection.default_channel - self.routing_key = routing_key = 'test_redis_producer' - self.exchange_name = exchange_name = 'test_redis_producer' - self.exchange = Exchange(exchange_name, type='direct') - self.queue = Queue(routing_key, self.exchange, routing_key) - - self.queue(self.connection.default_channel).declare() - self.channel.queue_bind(routing_key, self.exchange_name, routing_key) - - def create_connection(self, **kwargs): - kwargs.setdefault('transport_options', {'fanout_patterns': True}) - return Connection(transport=FakeRedisKombuTransportLite, **kwargs) - - def teardown(self): - self.connection.close() - - def test_publish__get(self): - channel = self.connection.channel() - producer = Producer(channel, self.exchange, - routing_key=self.routing_key) - self.queue(channel).declare() - - producer.publish({'hello': 'world'}) - - assert self.queue(channel).get().payload == {'hello': 'world'} - assert self.queue(channel).get() is None - assert self.queue(channel).get() is None - assert self.queue(channel).get() is None - - def test_publish__consume(self): - connection = self.create_connection() - channel = connection.default_channel - producer = Producer(channel, self.exchange, - routing_key=self.routing_key) - consumer = Consumer(channel, queues=[self.queue]) - - producer.publish({'hello2': 'world2'}) - _received = [] - - def callback(message_data, message): - _received.append(message_data) - message.ack() - - consumer.register_callback(callback) - consumer.consume() - - assert channel in channel.connection.cycle._channels - try: - connection.drain_events(timeout=1) - assert _received - with pytest.raises(socket.timeout): - connection.drain_events(timeout=0.01) - finally: - channel.close() - - @skip.unless_module('redis.sentinel') -class TestRedisSentinel(unittest.TestCase): +class test_RedisSentinel: def test_method_called(self): - with patch.object(kombu_redis.SentinelChannel, - '_sentinel_managed_pool') as p: + from kombu.transport.redis import SentinelChannel + + with patch.object(SentinelChannel, '_sentinel_managed_pool') as p: connection = Connection( 'sentinel://localhost:65534/', transport_options={ @@ -1768,11 +1409,13 @@ class TestRedisSentinel(unittest.TestCase): master_for().connection_pool.get_connection.assert_called() def test_can_create_connection(self): + from redis.exceptions import ConnectionError + connection = Connection( 'sentinel://localhost:65534/', transport_options={ 'master_name': 'not_important', }, ) - with pytest.raises(redis.exceptions.ConnectionError): + with pytest.raises(ConnectionError): connection.channel() |