diff options
-rw-r--r-- | kombu/common.py | 45 | ||||
-rw-r--r-- | kombu/connection.py | 1 | ||||
-rw-r--r-- | kombu/entity.py | 2 | ||||
-rw-r--r-- | kombu/messaging.py | 2 | ||||
-rw-r--r-- | kombu/tests/test_common.py | 14 | ||||
-rw-r--r-- | kombu/tests/test_entities.py | 2 | ||||
-rw-r--r-- | kombu/tests/test_messaging.py | 2 |
7 files changed, 38 insertions, 30 deletions
diff --git a/kombu/common.py b/kombu/common.py index 1a6e587b..799d0b64 100644 --- a/kombu/common.py +++ b/kombu/common.py @@ -22,7 +22,6 @@ from amqp import RecoverableConnectionError from .entity import Exchange, Queue from .five import range from .log import get_logger -from .messaging import Consumer as _Consumer from .serialization import registry as serializers from .utils import uuid @@ -91,33 +90,39 @@ def declaration_cached(entity, channel): def maybe_declare(entity, channel=None, retry=False, **retry_policy): - if not entity.is_bound: - assert channel - entity = entity.bind(channel) + is_bound = entity.is_bound + + if channel is None: + assert is_bound + channel = entity.channel + + declared = ident = None + if channel.connection and entity.can_cache_declaration: + declared = channel.connection.client.declared_entities + ident = hash(entity) + if ident in declared: + return False + + entity = entity if is_bound else entity.bind(channel) if retry: - return _imaybe_declare(entity, **retry_policy) - return _maybe_declare(entity) + return _imaybe_declare(entity, declared, ident, channel, **retry_policy) + return _maybe_declare(entity, declared, ident, channel) -def _maybe_declare(entity): - channel = entity.channel +def _maybe_declare(entity, declared, ident, channel): + channel = channel or entity.channel if not channel.connection: raise RecoverableConnectionError('channel disconnected') - if entity.can_cache_declaration: - declared = channel.connection.client.declared_entities - ident = hash(entity) - if ident not in declared: - entity.declare() - declared.add(ident) - return True - return False entity.declare() + if declared is not None and ident: + declared.add(ident) return True -def _imaybe_declare(entity, **retry_policy): +def _imaybe_declare(entity, declared, ident, channel, **retry_policy): return entity.channel.connection.client.ensure( - entity, _maybe_declare, **retry_policy)(entity) + entity, _maybe_declare, **retry_policy)( + entity, declared, ident, channel) def drain_consumer(consumer, limit=1, timeout=None, callbacks=None): @@ -138,8 +143,8 @@ def drain_consumer(consumer, limit=1, timeout=None, callbacks=None): def itermessages(conn, channel, queue, limit=1, timeout=None, - Consumer=_Consumer, callbacks=None, **kwargs): - return drain_consumer(Consumer(channel, queues=[queue], **kwargs), + callbacks=None, **kwargs): + return drain_consumer(conn.Consumer(channel, queues=[queue], **kwargs), limit=limit, timeout=timeout, callbacks=callbacks) diff --git a/kombu/connection.py b/kombu/connection.py index 63b4b685..b18d67d3 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -194,6 +194,7 @@ class Connection(object): """Switch connection parameters to use a new URL (does not reconnect)""" self.close() + self.declared_entities.clear() self._closed = False self._init_params(**dict(self._initial_params, **parse_url(url))) diff --git a/kombu/entity.py b/kombu/entity.py index fda53bef..3856a706 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -672,7 +672,7 @@ class Queue(MaybeChannelBound): @property def can_cache_declaration(self): - return self.durable and not self.auto_delete + return True @classmethod def from_dict(self, queue, **options): diff --git a/kombu/messaging.py b/kombu/messaging.py index 98d59d45..8b923950 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -11,6 +11,7 @@ import numbers from itertools import count +from .common import maybe_declare from .compression import compress from .connection import maybe_channel, is_connection from .entity import Exchange, Queue, DELIVERY_MODES @@ -107,7 +108,6 @@ class Producer(object): """Declare the exchange if it hasn't already been declared during this session.""" if entity: - from .common import maybe_declare return maybe_declare(entity, self.channel, retry, **retry_policy) def publish(self, body, routing_key=None, delivery_mode=None, diff --git a/kombu/tests/test_common.py b/kombu/tests/test_common.py index 34406992..c4eebb71 100644 --- a/kombu/tests/test_common.py +++ b/kombu/tests/test_common.py @@ -105,6 +105,8 @@ class test_maybe_declare(Case): def test_with_retry(self): channel = Mock() + client = channel.connection.client = Mock() + client.declared_entities = set() entity = Mock() entity.can_cache_declaration = True entity.is_bound = True @@ -265,8 +267,8 @@ class test_itermessages(Case): conn = self.MockConnection() channel = Mock() channel.connection.client = conn - it = common.itermessages(conn, channel, 'q', limit=1, - Consumer=MockConsumer) + conn.Consumer = MockConsumer + it = common.itermessages(conn, channel, 'q', limit=1) ret = next(it) self.assertTupleEqual(ret, ('body', 'message')) @@ -279,8 +281,8 @@ class test_itermessages(Case): conn.should_raise_timeout = True channel = Mock() channel.connection.client = conn - it = common.itermessages(conn, channel, 'q', limit=1, - Consumer=MockConsumer) + conn.Consumer = MockConsumer + it = common.itermessages(conn, channel, 'q', limit=1) with self.assertRaises(StopIteration): next(it) @@ -291,8 +293,8 @@ class test_itermessages(Case): deque_instance.popleft.side_effect = IndexError() conn = self.MockConnection() channel = Mock() - it = common.itermessages(conn, channel, 'q', limit=1, - Consumer=MockConsumer) + conn.Consumer = MockConsumer + it = common.itermessages(conn, channel, 'q', limit=1) with self.assertRaises(StopIteration): next(it) diff --git a/kombu/tests/test_entities.py b/kombu/tests/test_entities.py index 165160f1..0ce359d4 100644 --- a/kombu/tests/test_entities.py +++ b/kombu/tests/test_entities.py @@ -285,7 +285,7 @@ class test_Queue(Case): def test_can_cache_declaration(self): self.assertTrue(Queue('a', durable=True).can_cache_declaration) - self.assertFalse(Queue('a', durable=False).can_cache_declaration) + self.assertTrue(Queue('a', durable=False).can_cache_declaration) def test_eq(self): q1 = Queue('xxx', Exchange('xxx', 'direct'), 'xxx') diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py index 877d36cc..6bcf89b3 100644 --- a/kombu/tests/test_messaging.py +++ b/kombu/tests/test_messaging.py @@ -36,7 +36,7 @@ class test_Producer(Case): p = Producer(None) self.assertFalse(p._channel) - @patch('kombu.common.maybe_declare') + @patch('kombu.messaging.maybe_declare') def test_maybe_declare(self, maybe_declare): p = self.connection.Producer() q = Queue('foo') |