summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2012-04-23 12:58:16 +0100
committerAsk Solem <ask@celeryproject.org>2012-04-23 12:58:16 +0100
commitd2c2d9d0e2935179fc7c01ff9b2a6ebc7f1e0856 (patch)
treea00014fdda3e4c7f884ccedd32eff14b590b2679
parentf187ccdbcf1618a7fe43d2a7375967c05ee92e89 (diff)
downloadkombu-d2c2d9d0e2935179fc7c01ff9b2a6ebc7f1e0856.tar.gz
Declared entities cache now stored with underlying connection
-rw-r--r--kombu/common.py5
-rw-r--r--kombu/connection.py18
-rw-r--r--kombu/tests/test_common.py8
3 files changed, 23 insertions, 8 deletions
diff --git a/kombu/common.py b/kombu/common.py
index f16c3883..807a4108 100644
--- a/kombu/common.py
+++ b/kombu/common.py
@@ -14,7 +14,7 @@ from __future__ import with_statement
import socket
import sys
-from collections import defaultdict, deque
+from collections import deque
from functools import partial
from itertools import count
@@ -28,7 +28,6 @@ __all__ = ["Broadcast", "entry_to_queue", "maybe_declare", "uuid",
"itermessages", "send_reply", "isend_reply",
"collect_replies", "insured", "ipublish"]
-declared_entities = defaultdict(lambda: set())
insured_logger = Log("kombu.insurance")
@@ -63,7 +62,7 @@ def maybe_declare(entity, channel, retry=False, **retry_policy):
def _maybe_declare(entity, channel):
- declared = declared_entities[channel.connection.client]
+ declared = channel.connection.client.declared_entities
if not entity.is_bound:
entity = entity.bind(channel)
if not entity.can_cache_declaration or entity not in declared:
diff --git a/kombu/connection.py b/kombu/connection.py
index e2596603..590148f4 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -81,6 +81,10 @@ class BrokerConnection(object):
uri_passthrough = set(["sqla", "sqlalchemy"])
uri_prefix = None
+ #: The cache of declared entities is per connection,
+ #: in case the server loses data.
+ declared_entities = None
+
def __init__(self, hostname="localhost", userid=None,
password=None, virtual_host=None, port=None, insist=False,
ssl=False, transport=None, connect_timeout=5,
@@ -116,6 +120,8 @@ class BrokerConnection(object):
if uri_prefix:
self.uri_prefix = uri_prefix
+ self.declared_entities = set()
+
def _init_params(self, hostname, userid, password, virtual_host, port,
insist, ssl, transport, connect_timeout, login_method):
self.hostname = hostname
@@ -166,7 +172,9 @@ class BrokerConnection(object):
except (self.connection_errors + self.channel_errors):
pass
- def _close(self):
+ def _do_close_self(self):
+ # Closes only the connection and channel(s) not transport.
+ self.declared_entities.clear()
if self._default_channel:
self.maybe_close_channel(self._default_channel)
if self._connection:
@@ -175,10 +183,13 @@ class BrokerConnection(object):
except self.connection_errors + (AttributeError, socket.error):
pass
self._connection = None
- self._debug("closed")
+
+ def _close(self):
+ self._do_close_self()
if self._transport:
self._transport.client = None
self._transport = None
+ self._debug("closed")
self._closed = True
def release(self):
@@ -268,7 +279,7 @@ class BrokerConnection(object):
raise
errback and errback(exc, 0)
self._connection = None
- self.close()
+ self._do_close_self()
remaining_retries = None
if max_retries is not None:
remaining_retries = max(max_retries - retries, 1)
@@ -558,6 +569,7 @@ class BrokerConnection(object):
"""
if not self._closed:
if not self.connected:
+ self.declared_entities.clear()
self._default_channel = None
self._connection = self._establish_connection()
self._closed = False
diff --git a/kombu/tests/test_common.py b/kombu/tests/test_common.py
index efa1a870..0ba44154 100644
--- a/kombu/tests/test_common.py
+++ b/kombu/tests/test_common.py
@@ -6,7 +6,7 @@ import socket
from mock import patch
from kombu import common
-from kombu.common import (Broadcast, maybe_declare, declared_entities,
+from kombu.common import (Broadcast, maybe_declare,
send_reply, isend_reply, collect_replies)
from .utils import TestCase
@@ -32,13 +32,15 @@ class test_maybe_declare(TestCase):
def test_cacheable(self):
channel = Mock()
+ client = channel.connection.client = Mock()
+ client.declared_entities = set()
entity = Mock()
entity.can_cache_declaration = True
entity.is_bound = True
maybe_declare(entity, channel)
self.assertEqual(entity.declare.call_count, 1)
- self.assertIn(entity, declared_entities[channel.connection.client])
+ self.assertIn(entity, channel.connection.client.declared_entities)
maybe_declare(entity, channel)
self.assertEqual(entity.declare.call_count, 1)
@@ -57,6 +59,7 @@ class test_maybe_declare(TestCase):
def test_binds_entities(self):
channel = Mock()
+ channel.connection.client.declared_entities = set()
entity = Mock()
entity.can_cache_declaration = True
entity.is_bound = False
@@ -84,6 +87,7 @@ class test_replies(TestCase):
exchange = Mock()
exchange.is_bound = True
producer = Mock()
+ producer.channel.connection.client.declared_entities = set()
send_reply(exchange, req, {"hello": "world"}, producer)
self.assertTrue(producer.publish.call_count)