diff options
author | Ask Solem <ask@celeryproject.org> | 2012-04-23 12:58:16 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2012-04-23 12:58:16 +0100 |
commit | d2c2d9d0e2935179fc7c01ff9b2a6ebc7f1e0856 (patch) | |
tree | a00014fdda3e4c7f884ccedd32eff14b590b2679 | |
parent | f187ccdbcf1618a7fe43d2a7375967c05ee92e89 (diff) | |
download | kombu-d2c2d9d0e2935179fc7c01ff9b2a6ebc7f1e0856.tar.gz |
Declared entities cache now stored with underlying connection
-rw-r--r-- | kombu/common.py | 5 | ||||
-rw-r--r-- | kombu/connection.py | 18 | ||||
-rw-r--r-- | kombu/tests/test_common.py | 8 |
3 files changed, 23 insertions, 8 deletions
diff --git a/kombu/common.py b/kombu/common.py index f16c3883..807a4108 100644 --- a/kombu/common.py +++ b/kombu/common.py @@ -14,7 +14,7 @@ from __future__ import with_statement import socket import sys -from collections import defaultdict, deque +from collections import deque from functools import partial from itertools import count @@ -28,7 +28,6 @@ __all__ = ["Broadcast", "entry_to_queue", "maybe_declare", "uuid", "itermessages", "send_reply", "isend_reply", "collect_replies", "insured", "ipublish"] -declared_entities = defaultdict(lambda: set()) insured_logger = Log("kombu.insurance") @@ -63,7 +62,7 @@ def maybe_declare(entity, channel, retry=False, **retry_policy): def _maybe_declare(entity, channel): - declared = declared_entities[channel.connection.client] + declared = channel.connection.client.declared_entities if not entity.is_bound: entity = entity.bind(channel) if not entity.can_cache_declaration or entity not in declared: diff --git a/kombu/connection.py b/kombu/connection.py index e2596603..590148f4 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -81,6 +81,10 @@ class BrokerConnection(object): uri_passthrough = set(["sqla", "sqlalchemy"]) uri_prefix = None + #: The cache of declared entities is per connection, + #: in case the server loses data. + declared_entities = None + def __init__(self, hostname="localhost", userid=None, password=None, virtual_host=None, port=None, insist=False, ssl=False, transport=None, connect_timeout=5, @@ -116,6 +120,8 @@ class BrokerConnection(object): if uri_prefix: self.uri_prefix = uri_prefix + self.declared_entities = set() + def _init_params(self, hostname, userid, password, virtual_host, port, insist, ssl, transport, connect_timeout, login_method): self.hostname = hostname @@ -166,7 +172,9 @@ class BrokerConnection(object): except (self.connection_errors + self.channel_errors): pass - def _close(self): + def _do_close_self(self): + # Closes only the connection and channel(s) not transport. + self.declared_entities.clear() if self._default_channel: self.maybe_close_channel(self._default_channel) if self._connection: @@ -175,10 +183,13 @@ class BrokerConnection(object): except self.connection_errors + (AttributeError, socket.error): pass self._connection = None - self._debug("closed") + + def _close(self): + self._do_close_self() if self._transport: self._transport.client = None self._transport = None + self._debug("closed") self._closed = True def release(self): @@ -268,7 +279,7 @@ class BrokerConnection(object): raise errback and errback(exc, 0) self._connection = None - self.close() + self._do_close_self() remaining_retries = None if max_retries is not None: remaining_retries = max(max_retries - retries, 1) @@ -558,6 +569,7 @@ class BrokerConnection(object): """ if not self._closed: if not self.connected: + self.declared_entities.clear() self._default_channel = None self._connection = self._establish_connection() self._closed = False diff --git a/kombu/tests/test_common.py b/kombu/tests/test_common.py index efa1a870..0ba44154 100644 --- a/kombu/tests/test_common.py +++ b/kombu/tests/test_common.py @@ -6,7 +6,7 @@ import socket from mock import patch from kombu import common -from kombu.common import (Broadcast, maybe_declare, declared_entities, +from kombu.common import (Broadcast, maybe_declare, send_reply, isend_reply, collect_replies) from .utils import TestCase @@ -32,13 +32,15 @@ class test_maybe_declare(TestCase): def test_cacheable(self): channel = Mock() + client = channel.connection.client = Mock() + client.declared_entities = set() entity = Mock() entity.can_cache_declaration = True entity.is_bound = True maybe_declare(entity, channel) self.assertEqual(entity.declare.call_count, 1) - self.assertIn(entity, declared_entities[channel.connection.client]) + self.assertIn(entity, channel.connection.client.declared_entities) maybe_declare(entity, channel) self.assertEqual(entity.declare.call_count, 1) @@ -57,6 +59,7 @@ class test_maybe_declare(TestCase): def test_binds_entities(self): channel = Mock() + channel.connection.client.declared_entities = set() entity = Mock() entity.can_cache_declaration = True entity.is_bound = False @@ -84,6 +87,7 @@ class test_replies(TestCase): exchange = Mock() exchange.is_bound = True producer = Mock() + producer.channel.connection.client.declared_entities = set() send_reply(exchange, req, {"hello": "world"}, producer) self.assertTrue(producer.publish.call_count) |