diff options
author | Ask Solem <ask@celeryproject.org> | 2012-11-14 16:46:08 +0000 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2012-11-15 12:54:34 +0000 |
commit | 325fcc0522571096b424080c232f0d87a1e5865e (patch) | |
tree | b483ac6cf52a1885890af0fe550e0dfb2f6afd24 | |
parent | 4160f35da03a2c684e678674283f2a08202882f6 (diff) | |
download | kombu-325fcc0522571096b424080c232f0d87a1e5865e.tar.gz |
Producer: Support lazy channels using ChannelPromise, and connections will always be treated lazily (though, auto_declare will force evaluation, so a good idea to stop using it)
-rw-r--r-- | kombu/abstract.py | 8 | ||||
-rw-r--r-- | kombu/connection.py | 20 | ||||
-rw-r--r-- | kombu/messaging.py | 103 | ||||
-rw-r--r-- | kombu/pools.py | 7 | ||||
-rw-r--r-- | kombu/tests/mocks.py | 8 | ||||
-rw-r--r-- | kombu/tests/test_messaging.py | 9 | ||||
-rw-r--r-- | kombu/tests/test_pools.py | 5 | ||||
-rw-r--r-- | kombu/transport/amqplib.py | 6 | ||||
-rw-r--r-- | kombu/transport/pika.py | 4 | ||||
-rw-r--r-- | kombu/transport/pyamqp.py | 4 | ||||
-rw-r--r-- | kombu/transport/virtual/__init__.py | 4 | ||||
-rw-r--r-- | kombu/utils/__init__.py | 16 |
12 files changed, 128 insertions, 66 deletions
diff --git a/kombu/abstract.py b/kombu/abstract.py index 0d23dc7e..f0968c1a 100644 --- a/kombu/abstract.py +++ b/kombu/abstract.py @@ -15,6 +15,7 @@ from copy import copy from .connection import maybe_channel from .exceptions import NotBoundError from .five import items +from .utils import ChannelPromise __all__ = ['Object', 'MaybeChannelBound'] @@ -111,8 +112,11 @@ class MaybeChannelBound(Object): @property def channel(self): """Current channel if the object is bound.""" - if self._channel is None: + channel = self._channel + if channel is None: raise NotBoundError( "Can't call method on {0} not bound to a channel".format( type(self).__name__)) - return self._channel + if isinstance(channel, ChannelPromise): + channel = self._channel = channel() + return channel diff --git a/kombu/connection.py b/kombu/connection.py index dab942df..e23be740 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -33,20 +33,23 @@ from .utils import cached_property, retry_over_time, shufflecycle from .utils.compat import OrderedDict from .utils.url import parse_url +__all__ = ['Connection', 'ConnectionPool', 'ChannelPool'] + RESOLVE_ALIASES = {'pyamqp': 'amqp', 'librabbitmq': 'amqp'} _LOG_CONNECTION = os.environ.get('KOMBU_LOG_CONNECTION', False) _LOG_CHANNEL = os.environ.get('KOMBU_LOG_CHANNEL', False) -__all__ = ['Connection', 'ConnectionPool', 'ChannelPool'] +#: List of URI schemes that should not be parsed, but sent +#: directly to the transport instead. URI_PASSTHROUGH = frozenset(['sqla', 'sqlalchemy', 'zeromq', 'zmq']) logger = get_logger(__name__) roundrobin_failover = cycle failover_strategies = { - 'round-robin': cycle, + 'round-robin': roundrobin_failover, 'shuffle': shufflecycle, } @@ -100,12 +103,12 @@ class Connection(object): .. note:: The connection is established lazily when needed. If you need the - connection to be established, then force it to do so using + connection to be established, then force it by calling :meth:`connect`:: >>> conn.connect() - Remember to always close the connection:: + and always remember to close the connection:: >>> conn.release() @@ -133,8 +136,8 @@ class Connection(object): #: of connection failure (initialized by :attr:`failover_strategy`). cycle = None - #: Additional transport specific options, passed on to the transport - #: instance. + #: Additional transport specific options, + #: passed on to the transport instance. transport_options = None #: Strategy used to select new hosts when reconnecting after connection @@ -1019,7 +1022,6 @@ class ChannelPool(Resource): def prepare(self, channel): if isinstance(channel, Callable): channel = channel() - return channel @@ -1029,3 +1031,7 @@ def maybe_channel(channel): if isinstance(channel, Connection): return channel.default_channel return channel + + +def is_connection(obj): + return isinstance(obj, Connection) diff --git a/kombu/messaging.py b/kombu/messaging.py index a05d9847..7b16628a 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -13,11 +13,11 @@ from __future__ import absolute_import from itertools import count from .compression import compress -from .connection import maybe_channel -from .entity import Exchange, Queue -from .five import text_t, values +from .connection import maybe_channel, is_connection +from .entity import Exchange, Queue, DELIVERY_MODES +from .five import int_types, text_t, values from .serialization import encode -from .utils import maybe_list +from .utils import ChannelPromise, maybe_list __all__ = ['Exchange', 'Queue', 'Producer', 'Consumer'] @@ -43,13 +43,11 @@ class Producer(object): Note that the producer needs to drain events to use this feature. """ - #: The connection channel used. - channel = None - #: Default exchange. + #: Default exchange exchange = None - # Default routing key. + #: Default routing key. routing_key = '' #: Default serializer to use. Default is JSON. @@ -66,22 +64,27 @@ class Producer(object): #: Basic return callback. on_return = None + #: Set if channel argument was a Connection instance (using + #: default_channel). + __connection__ = None + def __init__(self, channel, exchange=None, routing_key=None, serializer=None, auto_declare=None, compression=None, on_return=None): - self.channel = channel - self.exchange = exchange or self.exchange - if self.exchange is None: - self.exchange = Exchange('') + self._channel = channel + self.exchange = exchange self.routing_key = routing_key or self.routing_key self.serializer = serializer or self.serializer self.compression = compression or self.compression self.on_return = on_return or self.on_return + self._channel_promise = None + if self.exchange is None: + self.exchange = Exchange('') if auto_declare is not None: self.auto_declare = auto_declare - if self.channel: - self.revive(self.channel) + if self._channel: + self.revive(self._channel) def __repr__(self): return '<Producer: {0.channel}>'.format(self) @@ -145,44 +148,74 @@ class Producer(object): retry_policy = {} if retry_policy is None else retry_policy routing_key = self.routing_key if routing_key is None else routing_key compression = self.compression if compression is None else compression + exchange = exchange or self.exchange if isinstance(exchange, Exchange): + delivery_mode = delivery_mode or exchange.delivery_mode exchange = exchange.name + else: + delivery_mode = delivery_mode or self.exchange.delivery_mode + if not isinstance(delivery_mode, int_types): + delivery_mode = DELIVERY_MODES[delivery_mode] + properties['delivery_mode'] = delivery_mode body, content_type, content_encoding = self._prepare( body, serializer, content_type, content_encoding, compression, headers) - message = self.exchange.Message(body, - delivery_mode, - priority, - content_type, - content_encoding, - headers=headers, - properties=properties) + publish = self._publish if retry: publish = self.connection.ensure(self, publish, **retry_policy) - return publish(message, routing_key, mandatory, - immediate, exchange, declare) - - def _publish(self, message, routing_key, mandatory, immediate, exchange, - declare): + return publish(body, priority, content_type, + content_encoding, headers, properties, + routing_key, mandatory, immediate, exchange, declare) + + def _publish(self, body, priority, content_type, content_encoding, + headers, properties, routing_key, mandatory, + immediate, exchange, declare): + channel = self.channel + message = channel.prepare_message( + body, priority, content_type, + content_encoding, headers, properties, + ) if declare: maybe_declare = self.maybe_declare [maybe_declare(entity) for entity in declare] - return self.exchange.publish(message, routing_key, - mandatory, immediate, exchange) + return channel.basic_publish(message, + exchange=exchange, routing_key=routing_key, + mandatory=mandatory, immediate=immediate, + ) + + def _get_channel(self): + channel = self._channel + if isinstance(channel, ChannelPromise): + channel = self._channel = channel() + self.exchange.revive(channel) + if self.on_return: + channel.events['basic_return'].add(self.on_return) + return channel + + def _set_channel(self, channel): + self._channel = channel + channel = property(_get_channel, _set_channel) def revive(self, channel): """Revive the producer after connection loss.""" - channel = self.channel = maybe_channel(channel) - self.exchange = self.exchange(channel) - self.exchange.revive(channel) - + if is_connection(channel): + promise = ChannelPromise(lambda: channel.default_channel) + self.__connection__ = channel + self._channel = promise + self.exchange = self.exchange(promise) + else: + # Channel already concrete + self._channel = channel + if self.on_return: + self._channel.events['basic_return'].add(self.on_return) + self.exchange = self.exchange(channel) if self.auto_declare: + # auto_decare is not recommended as this will force + # evaluation of the channel. self.declare() - if self.on_return: - self.channel.events['basic_return'].append(self.on_return) def __enter__(self): return self @@ -224,7 +257,7 @@ class Producer(object): @property def connection(self): try: - return self.channel.connection.client + return self.__connection__ or self.channel.connection.client except AttributeError: pass diff --git a/kombu/pools.py b/kombu/pools.py index 968a7a49..0cd03c41 100644 --- a/kombu/pools.py +++ b/kombu/pools.py @@ -55,13 +55,12 @@ class ProducerPool(Resource): if isinstance(p, Callable): p = p() if not p.channel: - connection = self._acquire_connection() - p.revive(connection.default_channel) + p.revive(self._acquire_connection()) return p def release(self, resource): - if resource.connection: - resource.connection.release() + if resource.__connection__: + resource.__connection__.release() resource.channel = None super(ProducerPool, self).release(resource) diff --git a/kombu/tests/mocks.py b/kombu/tests/mocks.py index a68431c4..6e1be5eb 100644 --- a/kombu/tests/mocks.py +++ b/kombu/tests/mocks.py @@ -28,7 +28,7 @@ class Channel(base.StdChannel): self.called = [] self.deliveries = count(1) self.to_deliver = [] - self.events = {'basic_return': []} + self.events = {'basic_return': set()} def _called(self, name): self.called.append(name) @@ -39,10 +39,10 @@ class Channel(base.StdChannel): def exchange_declare(self, *args, **kwargs): self._called('exchange_declare') - def prepare_message(self, message_data, properties={}, priority=0, - content_type=None, content_encoding=None, headers=None): + def prepare_message(self, body, priority=0, content_type=None, + content_encoding=None, headers=None, properties={}): self._called('prepare_message') - return dict(body=message_data, + return dict(body=body, headers=headers, properties=properties, priority=priority, diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py index 51991909..4c5fe619 100644 --- a/kombu/tests/test_messaging.py +++ b/kombu/tests/test_messaging.py @@ -108,16 +108,18 @@ class test_Producer(TestCase): def test_publish_with_Exchange_instance(self): p = self.connection.Producer() - p.exchange.publish = Mock() + p.channel = Mock() p.publish('hello', exchange=Exchange('foo')) - self.assertEqual(p.exchange.publish.call_args[0][4], 'foo') + self.assertEqual( + p._channel.basic_publish.call_args[1]['exchange'], 'foo', + ) def test_publish_retry_with_declare(self): p = self.connection.Producer() p.maybe_declare = Mock() p.connection.ensure = Mock() ex = Exchange('foo') - p._publish('hello', 'rk', 0, 0, ex, declare=[ex]) + p._publish('hello', 0, '', '', {}, {}, 'rk', 0, 0, ex, declare=[ex]) p.maybe_declare.assert_called_with(ex) def test_revive_when_channel_is_connection(self): @@ -141,6 +143,7 @@ class test_Producer(TestCase): def test_connection_property_handles_AttributeError(self): p = self.connection.Producer() p.channel = object() + p.__connection__ = None self.assertIsNone(p.connection) def test_publish(self): diff --git a/kombu/tests/test_pools.py b/kombu/tests/test_pools.py index 0c667f8e..32d1294f 100644 --- a/kombu/tests/test_pools.py +++ b/kombu/tests/test_pools.py @@ -61,7 +61,7 @@ class test_ProducerPool(TestCase): first = pool._resource.get_nowait() producer = pool.prepare(first) self.assertTrue(self.connections.acquire.called) - producer.revive.assert_called_with(connection.default_channel) + producer.revive.assert_called_with(connection) def test_prepare_channel_already_created(self): self.connections.acquire.return_value = Mock() @@ -79,8 +79,9 @@ class test_ProducerPool(TestCase): def test_release(self): p = Mock() p.channel = Mock() + p.__connection__ = Mock() self.pool.release(p) - p.connection.release.assert_called_with() + p.__connection__.release.assert_called_with() self.assertIsNone(p.channel) diff --git a/kombu/transport/amqplib.py b/kombu/transport/amqplib.py index 14748859..9f9f467d 100644 --- a/kombu/transport/amqplib.py +++ b/kombu/transport/amqplib.py @@ -264,17 +264,17 @@ class Message(base.Message): class Channel(_Channel, base.StdChannel): Message = Message - events = {'basic_return': []} + events = {'basic_return': set()} def __init__(self, *args, **kwargs): self.no_ack_consumers = set() super(Channel, self).__init__(*args, **kwargs) - def prepare_message(self, message_data, priority=None, + def prepare_message(self, body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None): """Encapsulate data into a AMQP message.""" - return amqp.Message(message_data, priority=priority, + return amqp.Message(body, priority=priority, content_type=content_type, content_encoding=content_encoding, application_headers=headers, diff --git a/kombu/transport/pika.py b/kombu/transport/pika.py index 5c80724e..ce60dd6c 100644 --- a/kombu/transport/pika.py +++ b/kombu/transport/pika.py @@ -109,7 +109,7 @@ class Channel(channel.Channel, base.StdChannel): queue, no_ack, False, consumer_tag) - def prepare_message(self, message_data, priority=None, + def prepare_message(self, body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None): properties = BasicProperties(priority=priority, @@ -117,7 +117,7 @@ class Channel(channel.Channel, base.StdChannel): content_encoding=content_encoding, headers=headers, **properties) - return message_data, properties + return body, properties def message_to_python(self, raw_message): return self.Message(channel=self, amqp_message=raw_message) diff --git a/kombu/transport/pyamqp.py b/kombu/transport/pyamqp.py index 1274be11..d03178e0 100644 --- a/kombu/transport/pyamqp.py +++ b/kombu/transport/pyamqp.py @@ -45,11 +45,11 @@ class Message(base.Message): class Channel(amqp.Channel, base.StdChannel): Message = Message - def prepare_message(self, message_data, priority=None, + def prepare_message(self, body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None): """Encapsulate data into a AMQP message.""" - return amqp.Message(message_data, priority=priority, + return amqp.Message(body, priority=priority, content_type=content_type, content_encoding=content_encoding, application_headers=headers, diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index 91ee64dc..4bf13f85 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -595,7 +595,7 @@ class Channel(AbstractChannel, base.StdChannel): return self.Message(self, payload=raw_message) return raw_message - def prepare_message(self, message_data, priority=None, + def prepare_message(self, body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None): """Prepare message data.""" @@ -603,7 +603,7 @@ class Channel(AbstractChannel, base.StdChannel): info = properties.setdefault('delivery_info', {}) info['priority'] = priority or 0 - return {'body': message_data, + return {'body': body, 'content-encoding': content_encoding, 'content-type': content_type, 'headers': headers or {}, diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py index 0562cb5c..e18cc269 100644 --- a/kombu/utils/__init__.py +++ b/kombu/utils/__init__.py @@ -384,3 +384,19 @@ def entrypoints(namespace): except ImportError: return iter([]) return ((ep, ep.load()) for ep in iter_entry_points(namespace)) + + +class ChannelPromise(object): + + def __init__(self, contract): + self.__contract__ = contract + + def __call__(self): + try: + return self.__value__ + except AttributeError: + value = self.__value__ = self.__contract__() + return value + + def __repr__(self): + return '<promise: %r>' % (self(), ) |