summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2012-11-14 16:46:08 +0000
committerAsk Solem <ask@celeryproject.org>2012-11-15 12:54:34 +0000
commit325fcc0522571096b424080c232f0d87a1e5865e (patch)
treeb483ac6cf52a1885890af0fe550e0dfb2f6afd24
parent4160f35da03a2c684e678674283f2a08202882f6 (diff)
downloadkombu-325fcc0522571096b424080c232f0d87a1e5865e.tar.gz
Producer: Support lazy channels using ChannelPromise, and connections will always be treated lazily (though, auto_declare will force evaluation, so a good idea to stop using it)
-rw-r--r--kombu/abstract.py8
-rw-r--r--kombu/connection.py20
-rw-r--r--kombu/messaging.py103
-rw-r--r--kombu/pools.py7
-rw-r--r--kombu/tests/mocks.py8
-rw-r--r--kombu/tests/test_messaging.py9
-rw-r--r--kombu/tests/test_pools.py5
-rw-r--r--kombu/transport/amqplib.py6
-rw-r--r--kombu/transport/pika.py4
-rw-r--r--kombu/transport/pyamqp.py4
-rw-r--r--kombu/transport/virtual/__init__.py4
-rw-r--r--kombu/utils/__init__.py16
12 files changed, 128 insertions, 66 deletions
diff --git a/kombu/abstract.py b/kombu/abstract.py
index 0d23dc7e..f0968c1a 100644
--- a/kombu/abstract.py
+++ b/kombu/abstract.py
@@ -15,6 +15,7 @@ from copy import copy
from .connection import maybe_channel
from .exceptions import NotBoundError
from .five import items
+from .utils import ChannelPromise
__all__ = ['Object', 'MaybeChannelBound']
@@ -111,8 +112,11 @@ class MaybeChannelBound(Object):
@property
def channel(self):
"""Current channel if the object is bound."""
- if self._channel is None:
+ channel = self._channel
+ if channel is None:
raise NotBoundError(
"Can't call method on {0} not bound to a channel".format(
type(self).__name__))
- return self._channel
+ if isinstance(channel, ChannelPromise):
+ channel = self._channel = channel()
+ return channel
diff --git a/kombu/connection.py b/kombu/connection.py
index dab942df..e23be740 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -33,20 +33,23 @@ from .utils import cached_property, retry_over_time, shufflecycle
from .utils.compat import OrderedDict
from .utils.url import parse_url
+__all__ = ['Connection', 'ConnectionPool', 'ChannelPool']
+
RESOLVE_ALIASES = {'pyamqp': 'amqp',
'librabbitmq': 'amqp'}
_LOG_CONNECTION = os.environ.get('KOMBU_LOG_CONNECTION', False)
_LOG_CHANNEL = os.environ.get('KOMBU_LOG_CHANNEL', False)
-__all__ = ['Connection', 'ConnectionPool', 'ChannelPool']
+#: List of URI schemes that should not be parsed, but sent
+#: directly to the transport instead.
URI_PASSTHROUGH = frozenset(['sqla', 'sqlalchemy', 'zeromq', 'zmq'])
logger = get_logger(__name__)
roundrobin_failover = cycle
failover_strategies = {
- 'round-robin': cycle,
+ 'round-robin': roundrobin_failover,
'shuffle': shufflecycle,
}
@@ -100,12 +103,12 @@ class Connection(object):
.. note::
The connection is established lazily when needed. If you need the
- connection to be established, then force it to do so using
+ connection to be established, then force it by calling
:meth:`connect`::
>>> conn.connect()
- Remember to always close the connection::
+ and always remember to close the connection::
>>> conn.release()
@@ -133,8 +136,8 @@ class Connection(object):
#: of connection failure (initialized by :attr:`failover_strategy`).
cycle = None
- #: Additional transport specific options, passed on to the transport
- #: instance.
+ #: Additional transport specific options,
+ #: passed on to the transport instance.
transport_options = None
#: Strategy used to select new hosts when reconnecting after connection
@@ -1019,7 +1022,6 @@ class ChannelPool(Resource):
def prepare(self, channel):
if isinstance(channel, Callable):
channel = channel()
-
return channel
@@ -1029,3 +1031,7 @@ def maybe_channel(channel):
if isinstance(channel, Connection):
return channel.default_channel
return channel
+
+
+def is_connection(obj):
+ return isinstance(obj, Connection)
diff --git a/kombu/messaging.py b/kombu/messaging.py
index a05d9847..7b16628a 100644
--- a/kombu/messaging.py
+++ b/kombu/messaging.py
@@ -13,11 +13,11 @@ from __future__ import absolute_import
from itertools import count
from .compression import compress
-from .connection import maybe_channel
-from .entity import Exchange, Queue
-from .five import text_t, values
+from .connection import maybe_channel, is_connection
+from .entity import Exchange, Queue, DELIVERY_MODES
+from .five import int_types, text_t, values
from .serialization import encode
-from .utils import maybe_list
+from .utils import ChannelPromise, maybe_list
__all__ = ['Exchange', 'Queue', 'Producer', 'Consumer']
@@ -43,13 +43,11 @@ class Producer(object):
Note that the producer needs to drain events to use this feature.
"""
- #: The connection channel used.
- channel = None
- #: Default exchange.
+ #: Default exchange
exchange = None
- # Default routing key.
+ #: Default routing key.
routing_key = ''
#: Default serializer to use. Default is JSON.
@@ -66,22 +64,27 @@ class Producer(object):
#: Basic return callback.
on_return = None
+ #: Set if channel argument was a Connection instance (using
+ #: default_channel).
+ __connection__ = None
+
def __init__(self, channel, exchange=None, routing_key=None,
serializer=None, auto_declare=None, compression=None,
on_return=None):
- self.channel = channel
- self.exchange = exchange or self.exchange
- if self.exchange is None:
- self.exchange = Exchange('')
+ self._channel = channel
+ self.exchange = exchange
self.routing_key = routing_key or self.routing_key
self.serializer = serializer or self.serializer
self.compression = compression or self.compression
self.on_return = on_return or self.on_return
+ self._channel_promise = None
+ if self.exchange is None:
+ self.exchange = Exchange('')
if auto_declare is not None:
self.auto_declare = auto_declare
- if self.channel:
- self.revive(self.channel)
+ if self._channel:
+ self.revive(self._channel)
def __repr__(self):
return '<Producer: {0.channel}>'.format(self)
@@ -145,44 +148,74 @@ class Producer(object):
retry_policy = {} if retry_policy is None else retry_policy
routing_key = self.routing_key if routing_key is None else routing_key
compression = self.compression if compression is None else compression
+ exchange = exchange or self.exchange
if isinstance(exchange, Exchange):
+ delivery_mode = delivery_mode or exchange.delivery_mode
exchange = exchange.name
+ else:
+ delivery_mode = delivery_mode or self.exchange.delivery_mode
+ if not isinstance(delivery_mode, int_types):
+ delivery_mode = DELIVERY_MODES[delivery_mode]
+ properties['delivery_mode'] = delivery_mode
body, content_type, content_encoding = self._prepare(
body, serializer, content_type, content_encoding,
compression, headers)
- message = self.exchange.Message(body,
- delivery_mode,
- priority,
- content_type,
- content_encoding,
- headers=headers,
- properties=properties)
+
publish = self._publish
if retry:
publish = self.connection.ensure(self, publish, **retry_policy)
- return publish(message, routing_key, mandatory,
- immediate, exchange, declare)
-
- def _publish(self, message, routing_key, mandatory, immediate, exchange,
- declare):
+ return publish(body, priority, content_type,
+ content_encoding, headers, properties,
+ routing_key, mandatory, immediate, exchange, declare)
+
+ def _publish(self, body, priority, content_type, content_encoding,
+ headers, properties, routing_key, mandatory,
+ immediate, exchange, declare):
+ channel = self.channel
+ message = channel.prepare_message(
+ body, priority, content_type,
+ content_encoding, headers, properties,
+ )
if declare:
maybe_declare = self.maybe_declare
[maybe_declare(entity) for entity in declare]
- return self.exchange.publish(message, routing_key,
- mandatory, immediate, exchange)
+ return channel.basic_publish(message,
+ exchange=exchange, routing_key=routing_key,
+ mandatory=mandatory, immediate=immediate,
+ )
+
+ def _get_channel(self):
+ channel = self._channel
+ if isinstance(channel, ChannelPromise):
+ channel = self._channel = channel()
+ self.exchange.revive(channel)
+ if self.on_return:
+ channel.events['basic_return'].add(self.on_return)
+ return channel
+
+ def _set_channel(self, channel):
+ self._channel = channel
+ channel = property(_get_channel, _set_channel)
def revive(self, channel):
"""Revive the producer after connection loss."""
- channel = self.channel = maybe_channel(channel)
- self.exchange = self.exchange(channel)
- self.exchange.revive(channel)
-
+ if is_connection(channel):
+ promise = ChannelPromise(lambda: channel.default_channel)
+ self.__connection__ = channel
+ self._channel = promise
+ self.exchange = self.exchange(promise)
+ else:
+ # Channel already concrete
+ self._channel = channel
+ if self.on_return:
+ self._channel.events['basic_return'].add(self.on_return)
+ self.exchange = self.exchange(channel)
if self.auto_declare:
+ # auto_decare is not recommended as this will force
+ # evaluation of the channel.
self.declare()
- if self.on_return:
- self.channel.events['basic_return'].append(self.on_return)
def __enter__(self):
return self
@@ -224,7 +257,7 @@ class Producer(object):
@property
def connection(self):
try:
- return self.channel.connection.client
+ return self.__connection__ or self.channel.connection.client
except AttributeError:
pass
diff --git a/kombu/pools.py b/kombu/pools.py
index 968a7a49..0cd03c41 100644
--- a/kombu/pools.py
+++ b/kombu/pools.py
@@ -55,13 +55,12 @@ class ProducerPool(Resource):
if isinstance(p, Callable):
p = p()
if not p.channel:
- connection = self._acquire_connection()
- p.revive(connection.default_channel)
+ p.revive(self._acquire_connection())
return p
def release(self, resource):
- if resource.connection:
- resource.connection.release()
+ if resource.__connection__:
+ resource.__connection__.release()
resource.channel = None
super(ProducerPool, self).release(resource)
diff --git a/kombu/tests/mocks.py b/kombu/tests/mocks.py
index a68431c4..6e1be5eb 100644
--- a/kombu/tests/mocks.py
+++ b/kombu/tests/mocks.py
@@ -28,7 +28,7 @@ class Channel(base.StdChannel):
self.called = []
self.deliveries = count(1)
self.to_deliver = []
- self.events = {'basic_return': []}
+ self.events = {'basic_return': set()}
def _called(self, name):
self.called.append(name)
@@ -39,10 +39,10 @@ class Channel(base.StdChannel):
def exchange_declare(self, *args, **kwargs):
self._called('exchange_declare')
- def prepare_message(self, message_data, properties={}, priority=0,
- content_type=None, content_encoding=None, headers=None):
+ def prepare_message(self, body, priority=0, content_type=None,
+ content_encoding=None, headers=None, properties={}):
self._called('prepare_message')
- return dict(body=message_data,
+ return dict(body=body,
headers=headers,
properties=properties,
priority=priority,
diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py
index 51991909..4c5fe619 100644
--- a/kombu/tests/test_messaging.py
+++ b/kombu/tests/test_messaging.py
@@ -108,16 +108,18 @@ class test_Producer(TestCase):
def test_publish_with_Exchange_instance(self):
p = self.connection.Producer()
- p.exchange.publish = Mock()
+ p.channel = Mock()
p.publish('hello', exchange=Exchange('foo'))
- self.assertEqual(p.exchange.publish.call_args[0][4], 'foo')
+ self.assertEqual(
+ p._channel.basic_publish.call_args[1]['exchange'], 'foo',
+ )
def test_publish_retry_with_declare(self):
p = self.connection.Producer()
p.maybe_declare = Mock()
p.connection.ensure = Mock()
ex = Exchange('foo')
- p._publish('hello', 'rk', 0, 0, ex, declare=[ex])
+ p._publish('hello', 0, '', '', {}, {}, 'rk', 0, 0, ex, declare=[ex])
p.maybe_declare.assert_called_with(ex)
def test_revive_when_channel_is_connection(self):
@@ -141,6 +143,7 @@ class test_Producer(TestCase):
def test_connection_property_handles_AttributeError(self):
p = self.connection.Producer()
p.channel = object()
+ p.__connection__ = None
self.assertIsNone(p.connection)
def test_publish(self):
diff --git a/kombu/tests/test_pools.py b/kombu/tests/test_pools.py
index 0c667f8e..32d1294f 100644
--- a/kombu/tests/test_pools.py
+++ b/kombu/tests/test_pools.py
@@ -61,7 +61,7 @@ class test_ProducerPool(TestCase):
first = pool._resource.get_nowait()
producer = pool.prepare(first)
self.assertTrue(self.connections.acquire.called)
- producer.revive.assert_called_with(connection.default_channel)
+ producer.revive.assert_called_with(connection)
def test_prepare_channel_already_created(self):
self.connections.acquire.return_value = Mock()
@@ -79,8 +79,9 @@ class test_ProducerPool(TestCase):
def test_release(self):
p = Mock()
p.channel = Mock()
+ p.__connection__ = Mock()
self.pool.release(p)
- p.connection.release.assert_called_with()
+ p.__connection__.release.assert_called_with()
self.assertIsNone(p.channel)
diff --git a/kombu/transport/amqplib.py b/kombu/transport/amqplib.py
index 14748859..9f9f467d 100644
--- a/kombu/transport/amqplib.py
+++ b/kombu/transport/amqplib.py
@@ -264,17 +264,17 @@ class Message(base.Message):
class Channel(_Channel, base.StdChannel):
Message = Message
- events = {'basic_return': []}
+ events = {'basic_return': set()}
def __init__(self, *args, **kwargs):
self.no_ack_consumers = set()
super(Channel, self).__init__(*args, **kwargs)
- def prepare_message(self, message_data, priority=None,
+ def prepare_message(self, body, priority=None,
content_type=None, content_encoding=None, headers=None,
properties=None):
"""Encapsulate data into a AMQP message."""
- return amqp.Message(message_data, priority=priority,
+ return amqp.Message(body, priority=priority,
content_type=content_type,
content_encoding=content_encoding,
application_headers=headers,
diff --git a/kombu/transport/pika.py b/kombu/transport/pika.py
index 5c80724e..ce60dd6c 100644
--- a/kombu/transport/pika.py
+++ b/kombu/transport/pika.py
@@ -109,7 +109,7 @@ class Channel(channel.Channel, base.StdChannel):
queue, no_ack,
False, consumer_tag)
- def prepare_message(self, message_data, priority=None,
+ def prepare_message(self, body, priority=None,
content_type=None, content_encoding=None, headers=None,
properties=None):
properties = BasicProperties(priority=priority,
@@ -117,7 +117,7 @@ class Channel(channel.Channel, base.StdChannel):
content_encoding=content_encoding,
headers=headers,
**properties)
- return message_data, properties
+ return body, properties
def message_to_python(self, raw_message):
return self.Message(channel=self, amqp_message=raw_message)
diff --git a/kombu/transport/pyamqp.py b/kombu/transport/pyamqp.py
index 1274be11..d03178e0 100644
--- a/kombu/transport/pyamqp.py
+++ b/kombu/transport/pyamqp.py
@@ -45,11 +45,11 @@ class Message(base.Message):
class Channel(amqp.Channel, base.StdChannel):
Message = Message
- def prepare_message(self, message_data, priority=None,
+ def prepare_message(self, body, priority=None,
content_type=None, content_encoding=None, headers=None,
properties=None):
"""Encapsulate data into a AMQP message."""
- return amqp.Message(message_data, priority=priority,
+ return amqp.Message(body, priority=priority,
content_type=content_type,
content_encoding=content_encoding,
application_headers=headers,
diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py
index 91ee64dc..4bf13f85 100644
--- a/kombu/transport/virtual/__init__.py
+++ b/kombu/transport/virtual/__init__.py
@@ -595,7 +595,7 @@ class Channel(AbstractChannel, base.StdChannel):
return self.Message(self, payload=raw_message)
return raw_message
- def prepare_message(self, message_data, priority=None,
+ def prepare_message(self, body, priority=None,
content_type=None, content_encoding=None, headers=None,
properties=None):
"""Prepare message data."""
@@ -603,7 +603,7 @@ class Channel(AbstractChannel, base.StdChannel):
info = properties.setdefault('delivery_info', {})
info['priority'] = priority or 0
- return {'body': message_data,
+ return {'body': body,
'content-encoding': content_encoding,
'content-type': content_type,
'headers': headers or {},
diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py
index 0562cb5c..e18cc269 100644
--- a/kombu/utils/__init__.py
+++ b/kombu/utils/__init__.py
@@ -384,3 +384,19 @@ def entrypoints(namespace):
except ImportError:
return iter([])
return ((ep, ep.load()) for ep in iter_entry_points(namespace))
+
+
+class ChannelPromise(object):
+
+ def __init__(self, contract):
+ self.__contract__ = contract
+
+ def __call__(self):
+ try:
+ return self.__value__
+ except AttributeError:
+ value = self.__value__ = self.__contract__()
+ return value
+
+ def __repr__(self):
+ return '<promise: %r>' % (self(), )