summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2014-06-05 16:04:50 +0100
committerAsk Solem <ask@celeryproject.org>2014-06-05 16:16:55 +0100
commitd1b017b99c0465a716d711f01eefdf58eca8e8aa (patch)
tree0d72e3d5bca33b2cb0a369fd9718fc427fd05365
parent19e83733c26d907522616e3ab3c7e3b323eaf50e (diff)
downloadkombu-d1b017b99c0465a716d711f01eefdf58eca8e8aa.tar.gz
Optimizes maybe_declare for transient queues
-rw-r--r--kombu/common.py45
-rw-r--r--kombu/connection.py1
-rw-r--r--kombu/entity.py2
-rw-r--r--kombu/messaging.py2
-rw-r--r--kombu/tests/test_common.py14
-rw-r--r--kombu/tests/test_entities.py2
-rw-r--r--kombu/tests/test_messaging.py2
7 files changed, 38 insertions, 30 deletions
diff --git a/kombu/common.py b/kombu/common.py
index 1a6e587b..799d0b64 100644
--- a/kombu/common.py
+++ b/kombu/common.py
@@ -22,7 +22,6 @@ from amqp import RecoverableConnectionError
from .entity import Exchange, Queue
from .five import range
from .log import get_logger
-from .messaging import Consumer as _Consumer
from .serialization import registry as serializers
from .utils import uuid
@@ -91,33 +90,39 @@ def declaration_cached(entity, channel):
def maybe_declare(entity, channel=None, retry=False, **retry_policy):
- if not entity.is_bound:
- assert channel
- entity = entity.bind(channel)
+ is_bound = entity.is_bound
+
+ if channel is None:
+ assert is_bound
+ channel = entity.channel
+
+ declared = ident = None
+ if channel.connection and entity.can_cache_declaration:
+ declared = channel.connection.client.declared_entities
+ ident = hash(entity)
+ if ident in declared:
+ return False
+
+ entity = entity if is_bound else entity.bind(channel)
if retry:
- return _imaybe_declare(entity, **retry_policy)
- return _maybe_declare(entity)
+ return _imaybe_declare(entity, declared, ident, channel, **retry_policy)
+ return _maybe_declare(entity, declared, ident, channel)
-def _maybe_declare(entity):
- channel = entity.channel
+def _maybe_declare(entity, declared, ident, channel):
+ channel = channel or entity.channel
if not channel.connection:
raise RecoverableConnectionError('channel disconnected')
- if entity.can_cache_declaration:
- declared = channel.connection.client.declared_entities
- ident = hash(entity)
- if ident not in declared:
- entity.declare()
- declared.add(ident)
- return True
- return False
entity.declare()
+ if declared is not None and ident:
+ declared.add(ident)
return True
-def _imaybe_declare(entity, **retry_policy):
+def _imaybe_declare(entity, declared, ident, channel, **retry_policy):
return entity.channel.connection.client.ensure(
- entity, _maybe_declare, **retry_policy)(entity)
+ entity, _maybe_declare, **retry_policy)(
+ entity, declared, ident, channel)
def drain_consumer(consumer, limit=1, timeout=None, callbacks=None):
@@ -138,8 +143,8 @@ def drain_consumer(consumer, limit=1, timeout=None, callbacks=None):
def itermessages(conn, channel, queue, limit=1, timeout=None,
- Consumer=_Consumer, callbacks=None, **kwargs):
- return drain_consumer(Consumer(channel, queues=[queue], **kwargs),
+ callbacks=None, **kwargs):
+ return drain_consumer(conn.Consumer(channel, queues=[queue], **kwargs),
limit=limit, timeout=timeout, callbacks=callbacks)
diff --git a/kombu/connection.py b/kombu/connection.py
index 63b4b685..b18d67d3 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -194,6 +194,7 @@ class Connection(object):
"""Switch connection parameters to use a new URL (does not
reconnect)"""
self.close()
+ self.declared_entities.clear()
self._closed = False
self._init_params(**dict(self._initial_params, **parse_url(url)))
diff --git a/kombu/entity.py b/kombu/entity.py
index fda53bef..3856a706 100644
--- a/kombu/entity.py
+++ b/kombu/entity.py
@@ -672,7 +672,7 @@ class Queue(MaybeChannelBound):
@property
def can_cache_declaration(self):
- return self.durable and not self.auto_delete
+ return True
@classmethod
def from_dict(self, queue, **options):
diff --git a/kombu/messaging.py b/kombu/messaging.py
index 98d59d45..8b923950 100644
--- a/kombu/messaging.py
+++ b/kombu/messaging.py
@@ -11,6 +11,7 @@ import numbers
from itertools import count
+from .common import maybe_declare
from .compression import compress
from .connection import maybe_channel, is_connection
from .entity import Exchange, Queue, DELIVERY_MODES
@@ -107,7 +108,6 @@ class Producer(object):
"""Declare the exchange if it hasn't already been declared
during this session."""
if entity:
- from .common import maybe_declare
return maybe_declare(entity, self.channel, retry, **retry_policy)
def publish(self, body, routing_key=None, delivery_mode=None,
diff --git a/kombu/tests/test_common.py b/kombu/tests/test_common.py
index 34406992..c4eebb71 100644
--- a/kombu/tests/test_common.py
+++ b/kombu/tests/test_common.py
@@ -105,6 +105,8 @@ class test_maybe_declare(Case):
def test_with_retry(self):
channel = Mock()
+ client = channel.connection.client = Mock()
+ client.declared_entities = set()
entity = Mock()
entity.can_cache_declaration = True
entity.is_bound = True
@@ -265,8 +267,8 @@ class test_itermessages(Case):
conn = self.MockConnection()
channel = Mock()
channel.connection.client = conn
- it = common.itermessages(conn, channel, 'q', limit=1,
- Consumer=MockConsumer)
+ conn.Consumer = MockConsumer
+ it = common.itermessages(conn, channel, 'q', limit=1)
ret = next(it)
self.assertTupleEqual(ret, ('body', 'message'))
@@ -279,8 +281,8 @@ class test_itermessages(Case):
conn.should_raise_timeout = True
channel = Mock()
channel.connection.client = conn
- it = common.itermessages(conn, channel, 'q', limit=1,
- Consumer=MockConsumer)
+ conn.Consumer = MockConsumer
+ it = common.itermessages(conn, channel, 'q', limit=1)
with self.assertRaises(StopIteration):
next(it)
@@ -291,8 +293,8 @@ class test_itermessages(Case):
deque_instance.popleft.side_effect = IndexError()
conn = self.MockConnection()
channel = Mock()
- it = common.itermessages(conn, channel, 'q', limit=1,
- Consumer=MockConsumer)
+ conn.Consumer = MockConsumer
+ it = common.itermessages(conn, channel, 'q', limit=1)
with self.assertRaises(StopIteration):
next(it)
diff --git a/kombu/tests/test_entities.py b/kombu/tests/test_entities.py
index 165160f1..0ce359d4 100644
--- a/kombu/tests/test_entities.py
+++ b/kombu/tests/test_entities.py
@@ -285,7 +285,7 @@ class test_Queue(Case):
def test_can_cache_declaration(self):
self.assertTrue(Queue('a', durable=True).can_cache_declaration)
- self.assertFalse(Queue('a', durable=False).can_cache_declaration)
+ self.assertTrue(Queue('a', durable=False).can_cache_declaration)
def test_eq(self):
q1 = Queue('xxx', Exchange('xxx', 'direct'), 'xxx')
diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py
index 877d36cc..6bcf89b3 100644
--- a/kombu/tests/test_messaging.py
+++ b/kombu/tests/test_messaging.py
@@ -36,7 +36,7 @@ class test_Producer(Case):
p = Producer(None)
self.assertFalse(p._channel)
- @patch('kombu.common.maybe_declare')
+ @patch('kombu.messaging.maybe_declare')
def test_maybe_declare(self, maybe_declare):
p = self.connection.Producer()
q = Queue('foo')