diff options
author | Ask Solem <ask@celeryproject.org> | 2012-06-24 16:32:17 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2012-06-24 16:32:17 +0100 |
commit | fa816f6dc920aeb7b68b75f98fa1656da57d05c8 (patch) | |
tree | c2f2d076cc62ea62405fb6f267accab61d1f67c9 | |
parent | 4922c4aaea77be7a32a7d904104b055159e0da3e (diff) | |
download | kombu-fa816f6dc920aeb7b68b75f98fa1656da57d05c8.tar.gz |
BrokerConnection is now Connection in docs
36 files changed, 307 insertions, 376 deletions
@@ -1,7 +1,19 @@ +.. _changelog: + ================ Change history ================ +.. _version-2.2.3: + +2.2.3 +===== +:release-date: TBA + +- Connection.clone() + +.. _version-2.2.2: + 2.2.2 ===== :release-date: 2012-06-22 02:30 P.M BST @@ -42,7 +54,7 @@ - Exchange & Queue can now be bound to connections (which will use the default channel): - >>> exchange = Exchange("name") + >>> exchange = Exchange('name') >>> bound_exchange = exchange(connection) >>> bound_exchange.declare() @@ -140,8 +152,8 @@ News The timeout is set to one hour by default, but can be changed by configuring a transport option: - >>> BrokerConnection("redis://", transport_options={ - ... "visibility_timeout": 1800, # 30 minutes + >>> Connection('redis://', transport_options={ + ... 'visibility_timeout': 1800, # 30 minutes ... }) **NOTE**: Messages that have not been acked will be redelivered @@ -151,7 +163,7 @@ News twice (or more). If you plan on using long ETA/countdowns you should tweak the visibility timeout accordingly:: - BROKER_TRANSPORT_OPTIONS = {"visibility_timeout": 18000} # 5 hours + BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 18000} # 5 hours Setting a long timeout means that it will take a long time for messages to be redelivered in the event of a power failure, @@ -179,8 +191,8 @@ News The number of steps can be configured by setting the ``priority_steps`` transport option, which must be a list of numbers in **sorted order**:: - >>> x = BrokerConnection("redis://", transport_options={ - ... "priority_steps": [0, 2, 4, 6, 8, 9], + >>> x = Connection('redis://', transport_options={ + ... 'priority_steps': [0, 2, 4, 6, 8, 9], ... }) Priorities implemented in this way is not as reliable as @@ -372,7 +384,7 @@ callback-based API later. Example:: - BrokerConnection("sqla+mysql://localhost/db") + Connection('sqla+mysql://localhost/db') * Better support for anonymous queues (Issue #116). @@ -490,8 +502,8 @@ callback-based API later. Example:: - >>> BrokerConnection("sqs://", transport_options={ - ... "polling_interval": 5.0}) + >>> Connection('sqs://', transport_options={ + ... 'polling_interval': 5.0}) The default interval is transport specific, but usually 1.0s (or 5.0s for the Django database transport, which @@ -556,7 +568,7 @@ New Transports to ``kombu.transport.django``:: INSTALLED_APPS = (…, - "kombu.transport.django") + 'kombu.transport.django') If you have previously used django-kombu, then there is no need to recreate the tables, as the old tables will be fully compatible @@ -582,8 +594,8 @@ News The queue prefix can be set using the transport option ``queue_name_prefix``:: - BrokerTransport("SQS://", transport_options={ - "queue_name_prefix": "myapp"}) + BrokerTransport('SQS://', transport_options={ + 'queue_name_prefix': 'myapp'}) Contributed by Nitzan Miron. @@ -592,12 +604,12 @@ News Retry is enabled by the ``reply`` argument, and retry options set by the ``retry_policy`` argument:: - exchange = Exchange("foo") + exchange = Exchange('foo') producer.publish(message, exchange=exchange, retry=True, declare=[exchange], retry_policy={ - "interval_start": 1.0}) + 'interval_start': 1.0}) - See :meth:`~kombu.connection.BrokerConnection.ensure` + See :meth:`~kombu.connection.Connection.ensure` for a list of supported retry policy options. * ``Producer.publish`` now supports a ``declare`` keyword argument. @@ -629,9 +641,9 @@ Fixes >>> from kombu.serialization import registry # by name - >>> registry.disable("pickle") + >>> registry.disable('pickle') # or by mime-type. - >>> registry.disable("application/x-python-serialize") + >>> registry.disable('application/x-python-serialize') .. _version-1.5.0: @@ -656,8 +668,8 @@ Fixes If wanted the dead-letter queue can still be enabled, by using the ``deadletter_queue`` transport option:: - >>> x = BrokerConnection("redis://", - ... transport_options={"deadletter_queue": "ae.undeliver"}) + >>> x = Connection('redis://', + ... transport_options={'deadletter_queue': 'ae.undeliver'}) In addition, an :class:`UndeliverableWarning` is now emitted when the dead-letter queue is enabled and a message ends up there. @@ -730,7 +742,7 @@ Fixes Fix contributed by Rafael Duran Castaneda -* BrokerConnection.Consumer had the wrong signature. +* Connection.Consumer had the wrong signature. Fix contributed by Pavel Skvazh @@ -778,12 +790,12 @@ Fixes The connections default channel will then be used. - In addition shortcut methods has been added to BrokerConnection:: + In addition shortcut methods has been added to Connection:: >>> connection.Producer(exchange) >>> connection.Consumer(queues=..., callbacks=...) -* BrokerConnection has aquired a ``connected`` attribute that +* Connection has aquired a ``connected`` attribute that can be used to check if the connection instance has established a connection. @@ -858,11 +870,11 @@ Fixes * Last release broke after fork for pool reinitialization. * Producer/Consumer now has a ``connection`` attribute, - giving access to the :class:`BrokerConnection` of the + giving access to the :class:`Connection` of the instance. * Pika: Channels now have access to the underlying - :class:`BrokerConnection` instance using ``channel.connection.client``. + :class:`Connection` instance using ``channel.connection.client``. This was previously required by the ``Simple`` classes and is now also required by :class:`Consumer` and :class:`Producer`. @@ -887,7 +899,7 @@ Fixes for example the default broker is expressed as:: - >>> BrokerConnection("amqp://guest:guest@localhost:5672//") + >>> Connection('amqp://guest:guest@localhost:5672//') Transport defaults to amqp, and is not required. user, password, port and virtual_host is also not mandatory and @@ -912,20 +924,20 @@ Fixes * Now comes with default global connection and producer pools. The acquire a connection using the connection parameters - from a :class:`BrokerConnection`:: + from a :class:`Connection`:: - >>> from kombu import BrokerConnection, connections - >>> connection = BrokerConnection("amqp://guest:guest@localhost//") + >>> from kombu import Connection, connections + >>> connection = Connection('amqp://guest:guest@localhost//') >>> with connections[connection].acquire(block=True): ... # do something with connection To acquire a producer using the connection parameters - from a :class:`BrokerConnection`:: + from a :class:`Connection`:: - >>> from kombu import BrokerConnection, producers - >>> connection = BrokerConnection("amqp://guest:guest@localhost//") + >>> from kombu import Connection, producers + >>> connection = Connection('amqp://guest:guest@localhost//') >>> with producers[connection].acquire(block=True): - ... producer.publish({"hello": "world"}, exchange="hello") + ... producer.publish({'hello': 'world'}, exchange='hello') Acquiring a producer will in turn also acquire a connection from the associated pool in ``connections``, so you the number @@ -998,7 +1010,7 @@ Fixes * Fixes MemoryError while importing ctypes on SELinux (Issue #52). -* ``BrokerConnection.autoretry`` is a version of ``ensure`` that works +* ``Connection.autoretry`` is a version of ``ensure`` that works with arbitrary functions (i.e. it does not need an associated object that implements the ``revive`` method. @@ -1016,7 +1028,7 @@ Fixes The connection will be established as needed. -* ``BrokerConnection.ensure`` now supports an ``on_revive`` callback +* ``Connection.ensure`` now supports an ``on_revive`` callback that is applied whenever the connection is re-established. * ``Consumer.consuming_from(queue)`` returns True if the Consumer is @@ -1062,8 +1074,8 @@ Fixes This can be disabled by setting the `supports_fanout` transport option: - >>> BrokerConnection(transport="SQS", - ... transport_options={"supports_fanout": False}) + >>> Connection(transport='SQS', + ... transport_options={'supports_fanout': False}) * SQS: Now properly deletes a message when a message is acked. @@ -1093,9 +1105,9 @@ Fixes Usage: - >>> conn = BrokerConnection(transport="SQS", - ... userid=aws_access_key_id, - ... password=aws_secret_access_key) + >>> conn = Connection(transport='SQS', + ... userid=aws_access_key_id, + ... password=aws_secret_access_key) The environment variables :envvar:`AWS_ACCESS_KEY_ID` and :envvar:`AWS_SECRET_ACCESS_KEY` are also supported. @@ -1104,7 +1116,7 @@ Fixes * amqplib transport: Now supports `login_method` for SSL auth. - :class:`BrokerConnection` now supports the `login_method` + :class:`Connection` now supports the `login_method` keyword argument. Default `login_method` is ``AMQPLAIN``. @@ -1186,8 +1198,8 @@ Important Notes If you need to disable base64 encoding then you can do so via the transport options:: - BrokerConnection(transport="...", - transport_options={"body_encoding": None}) + Connection(transport='...', + transport_options={'body_encoding': None}) **For transport authors**: @@ -1212,20 +1224,20 @@ Important Notes acquired next. I.e. if only a single thread is using the pool this means only a single connection will ever be used. -* BrokerConnection: Cloned connections did not inherit transport_options +* Connection: Cloned connections did not inherit transport_options (``__copy__``). * contrib/requirements is now located in the top directory of the distribution. * MongoDB: Now supports authentication using the ``userid`` and ``password`` - arguments to :class:`BrokerConnection` (Issue #30). + arguments to :class:`Connection` (Issue #30). -* BrokerConnection: Default autentication credentials are now delegated to +* Connection: Default autentication credentials are now delegated to the individual transports. - + This means that the ``userid`` and ``password`` arguments to - BrokerConnection is no longer *guest/guest* by default. + Connection is no longer *guest/guest* by default. The amqplib and pika transports will still have the default credentials. @@ -1265,7 +1277,7 @@ Important Notes * Redis: Creating a connection now ensures the connection is established. - This means ``BrokerConnection.ensure_connection`` works properly with + This means ``Connection.ensure_connection`` works properly with Redis. * consumer_tag argument to ``Queue.consume`` can't be :const:`None` @@ -1274,7 +1286,7 @@ Important Notes A None value is now automatically converted to empty string. An empty string will make the server generate a unique tag. -* BrokerConnection now supports a ``transport_options`` argument. +* Connection now supports a ``transport_options`` argument. This can be used to pass additional arguments to transports. @@ -1308,15 +1320,15 @@ Important Notes **Example Usage**:: $ KOMBU_LOG_DEBUG=1 python - >>> from kombu import BrokerConnection - >>> conn = BrokerConnection() + >>> from kombu import Connection + >>> conn = Connection() >>> channel = conn.channel() Start from server, version: 8.0, properties: {u'product': 'RabbitMQ',.............. } Open OK! known_hosts [] using channel_id: 1 Channel open - >>> channel.queue_declare("myq", passive=True) + >>> channel.queue_declare('myq', passive=True) [Kombu channel:1] queue_declare('myq', passive=True) (u'myq', 0, 1) @@ -124,22 +124,22 @@ Quick overview :: - from kombu import BrokerConnection, Exchange, Queue + from kombu import Connection, Exchange, Queue - media_exchange = Exchange("media", "direct", durable=True) - video_queue = Queue("video", exchange=media_exchange, routing_key="video") + media_exchange = Exchange('media', 'direct', durable=True) + video_queue = Queue('video', exchange=media_exchange, routing_key='video') def process_media(body, message): print body message.ack() # connections - with BrokerConnection("amqp://guest:guest@localhost//") as conn: + with Connection('amqp://guest:guest@localhost//') as conn: # produce - with conn.Producer(serializer="json") as producer: - producer.publish({"name": "/tmp/lolcat1.avi", "size": 1301013}, - exchange=media_exchange, routing_key="video", + with conn.Producer(serializer='json') as producer: + producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013}, + exchange=media_exchange, routing_key='video', declare=[video_queue]) # the declare above, makes sure the video queue is declared @@ -156,8 +156,8 @@ Quick overview conn.drain_events() # Consume from several queues on the same channel: - video_queue = Queue("video", exchange=media_exchange, key="video") - image_queue = Queue("image", exchange=media_exchange, key="image") + video_queue = Queue('video', exchange=media_exchange, key='video') + image_queue = Queue('image', exchange=media_exchange, key='image') with connection.Consumer([video_queue, image_queue], callbacks=[process_media]) as consumer: @@ -175,9 +175,9 @@ Or handle channels manually:: All objects can be used outside of with statements too, just remember to close the objects after use:: - from kombu import BrokerConnection, Consumer, Producer + from kombu import Connection, Consumer, Producer - connection = BrokerConnection() + connection = Connection() # ... connection.close() @@ -204,9 +204,9 @@ that connections default channel. :: - >>> exchange = Exchange("tasks", "direct") + >>> exchange = Exchange('tasks', 'direct') - >>> connection = BrokerConnection() + >>> connection = Connection() >>> bound_exchange = exchange(connection) >>> bound_exchange.delete() diff --git a/docs/reference/kombu.connection.rst b/docs/reference/kombu.connection.rst index 63c6b282..3b856bc5 100644 --- a/docs/reference/kombu.connection.rst +++ b/docs/reference/kombu.connection.rst @@ -10,7 +10,7 @@ Connection ---------- - .. autoclass:: BrokerConnection + .. autoclass:: Connection .. admonition:: Attributes @@ -44,8 +44,8 @@ .. seealso:: - The shortcut methods :meth:`BrokerConnection.Pool` and - :meth:`BrokerConnection.ChannelPool` is the recommended way + The shortcut methods :meth:`Connection.Pool` and + :meth:`Connection.ChannelPool` is the recommended way to instantiate these classes. .. autoclass:: ConnectionPool diff --git a/docs/reference/kombu.pidbox.rst b/docs/reference/kombu.pidbox.rst index f47bfd4c..ebc72cd0 100644 --- a/docs/reference/kombu.pidbox.rst +++ b/docs/reference/kombu.pidbox.rst @@ -28,7 +28,7 @@ .. code-block:: python - >>> connection = kombu.BrokerConnection() + >>> connection = kombu.Connection() >>> state = {"beat": beat, "connection": connection} >>> consumer = mailbox(connection).Node(hostname).listen() diff --git a/docs/userguide/connections.rst b/docs/userguide/connections.rst index a1d7b176..14ff4aad 100644 --- a/docs/userguide/connections.rst +++ b/docs/userguide/connections.rst @@ -15,12 +15,12 @@ and you can even create your own. The default transport is amqplib. Create a connection using the default transport:: - >>> from kombu import BrokerConnection - >>> connection = BrokerConnection("amqp://guest:guest@localhost:5672//") + >>> from kombu import Connection + >>> connection = Connection('amqp://guest:guest@localhost:5672//') The connection will not be established yet, as the connection is established when needed. If you want to explicitly establish the connection -you have to call the :meth:`~kombu.connection.BrokerConnection.connect` +you have to call the :meth:`~kombu.connection.Connection.connect` method:: >>> connection.connect() @@ -49,7 +49,7 @@ Of course, the connection can be used as a context, and you are encouraged to do so as it makes it harder to forget releasing open resources:: - with BrokerConnection() as connection: + with Connection() as connection: # work with connection .. _connection-urls: @@ -87,7 +87,7 @@ which is using the localhost host, default port, user name `guest`, password `guest` and virtual host "/". A connection without arguments is the same as:: - >>> BrokerConnection("amqp://guest:guest@localhost:5672//") + >>> Connection('amqp://guest:guest@localhost:5672//') The default port is transport specific, for AMQP this is 5672. @@ -100,7 +100,7 @@ the redis database number. Keyword arguments ================= -The :class:`BrokerConnection` class supports additional +The :class:`~kombu.connection.Connection` class supports additional keyword arguments, these are: :hostname: Default host name if not provided in the URL. diff --git a/docs/userguide/pools.rst b/docs/userguide/pools.rst index 79c959ed..d921bc46 100644 --- a/docs/userguide/pools.rst +++ b/docs/userguide/pools.rst @@ -31,25 +31,25 @@ instance to support multiple connections in the same app. All connection instances with the same connection parameters will get the same pool:: - >>> from kombu import BrokerConnection + >>> from kombu import Connection >>> from kombu.pools import connections - >>> connections[BrokerConnection("redis://localhost:6379")] + >>> connections[Connection('redis://localhost:6379')] <kombu.connection.ConnectionPool object at 0x101805650> - >>> connections[BrokerConnection("redis://localhost:6379")] + >>> connections[Connection('redis://localhost:6379')] <kombu.connection.ConnectionPool object at 0x101805650> Let's acquire and release a connection: .. code-block:: python - from kombu import BrokerConnection + from kombu import Connection from kombu.pools import connections - connection = BrokerConnection("redis://localhost:6379") + connection = Connection('redis://localhost:6379') with connections[connection].acquire(block=True) as conn: - print("Got connection: %r" % (connection.as_uri(), )) + print('Got connection: %r' % (connection.as_uri(), )) .. note:: @@ -69,11 +69,11 @@ at once you can do that too: .. code-block:: python - from kombu import BrokerConnection + from kombu import Connection from kombu.pools import connections - c1 = BrokerConnection("amqp://") - c2 = BrokerConnection("redis://") + c1 = Connection('amqp://') + c2 = Connection('redis://') with connections[c1].acquire(block=True) as conn1: with connections[c2].acquire(block=True) as conn2: @@ -93,27 +93,27 @@ to the ``news`` exchange: .. code-block:: python - from kombu import BrokerConnection, Exchange + from kombu import Connection, Exchange from kombu.common import maybe_declare from kombu.pools import producers # The exchange we send our news articles to. - news_exchange = Exchange("news") + news_exchange = Exchange('news') # The article we want to send - article = {"title": "No cellular coverage on the tube for 2012", - "ingress": "yadda yadda yadda"} + article = {'title': 'No cellular coverage on the tube for 2012', + 'ingress': 'yadda yadda yadda'} # The broker where our exchange is. - connection = BrokerConnection("amqp://guest:guest@localhost:5672//") + connection = Connection('amqp://guest:guest@localhost:5672//') with producers[connection].acquire(block=True) as producer: # maybe_declare knows what entities have already been declared # so we don't have to do so multiple times in the same process. maybe_declare(news_exchange) - producer.publish(article, routing_key="domestic", - serializer="json", - compression="zlib") + producer.publish(article, routing_key='domestic', + serializer='json', + compression='zlib') .. _default-pool-limits: @@ -153,12 +153,12 @@ instances: .. code-block:: python from kombu import pools - from kombu import BrokerConnection + from kombu import Connection connections = pools.Connection(limit=100) producers = pools.Producers(limit=connections.limit) - connection = BrokerConnection("amqp://guest:guest@localhost:5672//") + connection = Connection('amqp://guest:guest@localhost:5672//') with connections[connection].acquire(block=True): # ... diff --git a/docs/userguide/simple.rst b/docs/userguide/simple.rst index 650b8b28..81aeb641 100644 --- a/docs/userguide/simple.rst +++ b/docs/userguide/simple.rst @@ -18,10 +18,10 @@ two arguments, a connection channel and a name. The name is used as the queue, exchange and routing key. If the need arises, you can specify a :class:`~kombu.entity.Queue` as the name argument instead. -In addition, the :class:`~kombu.connection.BrokerConnection` comes with +In addition, the :class:`~kombu.connection.Connection` comes with shortcuts to create simple queues using the current connection:: - >>> queue = connection.SimpleQueue("myqueue") + >>> queue = connection.SimpleQueue('myqueue') >>> # ... do something with queue >>> queue.close() @@ -56,23 +56,23 @@ to produce and consume logging messages: from socket import gethostname from time import time - from kombu import BrokerConnection + from kombu import Connection class Logger(object): - def __init__(self, connection, queue_name="log_queue", - serializer="json", compression=None): + def __init__(self, connection, queue_name='log_queue', + serializer='json', compression=None): self.queue = connection.SimpleQueue(self.queue_name) self.serializer = serializer self.compression = compression - def log(self, message, level="INFO", context={}): - self.queue.put({"message": message, - "level": level, - "context": context, - "hostname": socket.gethostname(), - "timestamp": time()}, + def log(self, message, level='INFO', context={}): + self.queue.put({'message': message, + 'level': level, + 'context': context, + 'hostname': socket.gethostname(), + 'timestamp': time()}, serializer=self.serializer, compression=self.compression) @@ -87,28 +87,28 @@ to produce and consume logging messages: self.queue.close() - if __name__ == "__main__": + if __name__ == '__main__': from contextlib import closing - with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn: + with Connection('amqp://guest:guest@localhost:5672//') as conn: with closing(Logger(connection)) as logger: # Send message - logger.log("Error happened while encoding video", - level="ERROR", - context={"filename": "cutekitten.mpg"}) + logger.log('Error happened while encoding video', + level='ERROR', + context={'filename': 'cutekitten.mpg'}) # Consume and process message # This is the callback called when a log message is # received. def dump_entry(entry): - date = datetime.fromtimestamp(entry["timestamp"]) - print("[%s %s %s] %s %r" % (date, - entry["hostname"], - entry["level"], - entry["message"], - entry["context"])) + date = datetime.fromtimestamp(entry['timestamp']) + print('[%s %s %s] %s %r' % (date, + entry['hostname'], + entry['level'], + entry['message'], + entry['context'])) # Process a single message using the callback above. logger.process(dump_entry, n=1) diff --git a/examples/complete_receive.py b/examples/complete_receive.py index 416fa2d6..aa3987bf 100644 --- a/examples/complete_receive.py +++ b/examples/complete_receive.py @@ -2,15 +2,16 @@ Example of simple consumer that waits for a single message, acknowledges it and exits. """ + from __future__ import with_statement -from kombu import Connection, Exchange, Queue +from kombu import Connection, Exchange, Queue, Consumer, eventloop from pprint import pformat #: By default messages sent to exchanges are persistent (delivery_mode=2), #: and queues and exchanges are durable. -exchange = Exchange("kombu_demo", type="direct") -queue = Queue("kombu_demo", exchange, routing_key="kombu_demo") +exchange = Exchange('kombu_demo', type='direct') +queue = Queue('kombu_demo', exchange, routing_key='kombu_demo') def pretty(obj): @@ -19,17 +20,23 @@ def pretty(obj): #: This is the callback applied when a message is received. def handle_message(body, message): - print("Received message: %r" % (body, )) - print(" properties:\n%s" % (pretty(message.properties), )) - print(" delivery_info:\n%s" % (pretty(message.delivery_info), )) + print('Received message: %r' % (body, )) + print(' properties:\n%s' % (pretty(message.properties), )) + print(' delivery_info:\n%s' % (pretty(message.delivery_info), )) message.ack() -with Connection("amqp://guest:guest@localhost:5672//") as connection: +#: Create a connection and a channel. +#: If hostname, userid, password and virtual_host is not specified +#: the values below are the default, but listed here so it can +#: be easily changed. +with Connection('amqp://guest:guest@localhost:5672//') as connection: + #: Create consumer using our callback and queue. #: Second argument can also be a list to consume from #: any number of queues. - with connection.Consumer(queue, callbacks=[handle_message]): + with Consumer(connection, queue, callbacks=[handle_message]): + #: This waits for a single event. Note that this event may not #: be a message, or a message that is to be delivered to the consumers #: channel, but any event received on the connection. - connection.drain_events(timeout=10) + eventloop(connection, limit=1, timeout=10.0) diff --git a/examples/complete_receive_manual.py b/examples/complete_receive_manual.py deleted file mode 100644 index bf618e46..00000000 --- a/examples/complete_receive_manual.py +++ /dev/null @@ -1,45 +0,0 @@ -""" -Example of simple consumer that waits for a single message, acknowledges it -and exits. -""" - -from kombu import BrokerConnection, Exchange, Queue, Consumer -from pprint import pformat - -#: By default messages sent to exchanges are persistent (delivery_mode=2), -#: and queues and exchanges are durable. -exchange = Exchange("kombu_demo", type="direct") -queue = Queue("kombu_demo", exchange, routing_key="kombu_demo") - - -def pretty(obj): - return pformat(obj, indent=4) - - -#: This is the callback applied when a message is received. -def handle_message(body, message): - print("Received message: %r" % (body, )) - print(" properties:\n%s" % (pretty(message.properties), )) - print(" delivery_info:\n%s" % (pretty(message.delivery_info), )) - message.ack() - -#: Create a connection and a channel. -#: If hostname, userid, password and virtual_host is not specified -#: the values below are the default, but listed here so it can -#: be easily changed. -connection = BrokerConnection(hostname="localhost", - userid="guest", - password="guest", - virtual_host="/") -channel = connection.channel() - -#: Create consumer using our callback and queue. -#: Second argument can also be a list to consume from -#: any number of queues. -consumer = Consumer(channel, queue, callbacks=[handle_message]) -consumer.consume() - -#: This waits for a single event. Note that this event may not -#: be a message, or a message that is to be delivered to the consumers -#: channel, but any event received on the connection. -connection.drain_events() diff --git a/examples/complete_send.py b/examples/complete_send.py index faa6a1a1..c81f8620 100644 --- a/examples/complete_send.py +++ b/examples/complete_send.py @@ -7,26 +7,26 @@ You can use `complete_receive.py` to receive the message sent. """ from __future__ import with_statement -from kombu import Connection, Exchange, Queue +from kombu import Connection, Producer, Exchange, Queue #: By default messages sent to exchanges are persistent (delivery_mode=2), #: and queues and exchanges are durable. -exchange = Exchange("kombu_demo", type="direct") -queue = Queue("kombu_demo", exchange, routing_key="kombu_demo") +exchange = Exchange('kombu_demo', type='direct') +queue = Queue('kombu_demo', exchange, routing_key='kombu_demo') -with Connection("amqp://guest:guest@localhost:5672//") as connection: +with Connection('amqp://guest:guest@localhost:5672//') as connection: #: Producers are used to publish messages. #: a default exchange and routing key can also be specifed #: as arguments the Producer, but we rather specify this explicitly #: at the publish call. - producer = connection.Producer() + producer = Producer(connection) #: Publish the message using the json serializer (which is the default), #: and zlib compression. The kombu consumer will automatically detect - #: encoding, serializiation and compression used and decode accordingly. - producer.publish({"hello": "world"}, + #: encoding, serialization and compression used and decode accordingly. + producer.publish({'hello': 'world'}, exchange=exchange, - routing_key="kombu_demo", - serializer="json", compression="zlib") + routing_key='kombu_demo', + serializer='json', compression='zlib') diff --git a/examples/complete_send_manual.py b/examples/complete_send_manual.py deleted file mode 100644 index 9e3b3252..00000000 --- a/examples/complete_send_manual.py +++ /dev/null @@ -1,35 +0,0 @@ -""" - -Example producer that sends a single message and exits. - -You can use `complete_receive.py` to receive the message sent. - -""" - -from kombu import BrokerConnection, Exchange, Queue, Producer - -#: By default messages sent to exchanges are persistent (delivery_mode=2), -#: and queues and exchanges are durable. -exchange = Exchange("kombu_demo", type="direct") -queue = Queue("kombu_demo", exchange, routing_key="kombu_demo") - - -#: Create connection and channel. -#: If hostname, userid, password and virtual_host is not specified -#: the values below are the default, but listed here so it can -#: be easily changed. -connection = BrokerConnection(hostname="localhost", - userid="guest", - password="guest", - virtual_host="/") -channel = connection.channel() - -#: Producers are used to publish messages. -#: Routing keys can also be specifed as an argument to `publish`. -producer = Producer(channel, exchange, routing_key="kombu_demo") - -#: Publish the message using the json serializer (which is the default), -#: and zlib compression. The kombu consumer will automatically detect -#: encoding, serializiation and compression used and decode accordingly. -producer.publish({"hello": "world"}, serializer="json", - compression="zlib") diff --git a/examples/simple_eventlet_receive.py b/examples/simple_eventlet_receive.py index cce518cc..a8208650 100644 --- a/examples/simple_eventlet_receive.py +++ b/examples/simple_eventlet_receive.py @@ -6,12 +6,13 @@ You can use `simple_receive.py` (or `complete_receive.py`) to receive the message sent. """ +from __future__ import with_statement + import eventlet -from eventlet import spawn from Queue import Empty -from kombu import BrokerConnection +from kombu import Connection eventlet.monkey_patch() @@ -22,25 +23,21 @@ def wait_many(timeout=1): #: If hostname, userid, password and virtual_host is not specified #: the values below are the default, but listed here so it can #: be easily changed. - connection = BrokerConnection("amqp://guest:guest@localhost:5672//") - - #: SimpleQueue mimics the interface of the Python Queue module. - #: First argument can either be a queue name or a kombu.Queue object. - #: If a name, then the queue will be declared with the name as the queue - #: name, exchange name and routing key. - queue = connection.SimpleQueue("kombu_demo") - - while True: - try: - message = queue.get(block=False, timeout=timeout) - except Empty: - break - else: - spawn(message.ack) - print(message.payload) - - queue.close() - connection.close() - + with Connection('amqp://guest:guest@localhost:5672//') as connection: + + #: SimpleQueue mimics the interface of the Python Queue module. + #: First argument can either be a queue name or a kombu.Queue object. + #: If a name, then the queue will be declared with the name as the + #: queue name, exchange name and routing key. + with connection.SimpleQueue('kombu_demo') as queue: + + while True: + try: + message = queue.get(block=False, timeout=timeout) + except Empty: + break + else: + message.ack() + print(message.payload) spawn(wait_many).wait() diff --git a/examples/simple_eventlet_send.py b/examples/simple_eventlet_send.py index 3d0641e1..f6259bf3 100644 --- a/examples/simple_eventlet_send.py +++ b/examples/simple_eventlet_send.py @@ -6,9 +6,11 @@ You can use `simple_receive.py` (or `complete_receive.py`) to receive the message sent. """ +from __future__ import with_statement + import eventlet -from kombu import BrokerConnection +from kombu import Connection eventlet.monkey_patch() @@ -19,24 +21,22 @@ def send_many(n): #: If hostname, userid, password and virtual_host is not specified #: the values below are the default, but listed here so it can #: be easily changed. - connection = BrokerConnection("amqp://guest:guest@localhost:5672//") + with Connection('amqp://guest:guest@localhost:5672//') as connection: - #: SimpleQueue mimics the interface of the Python Queue module. - #: First argument can either be a queue name or a kombu.Queue object. - #: If a name, then the queue will be declared with the name as the queue - #: name, exchange name and routing key. - queue = connection.SimpleQueue("kombu_demo") + #: SimpleQueue mimics the interface of the Python Queue module. + #: First argument can either be a queue name or a kombu.Queue object. + #: If a name, then the queue will be declared with the name as the + #: queue name, exchange name and routing key. + with connection.SimpleQueue('kombu_demo') as queue: - def send_message(i): - queue.put({"hello": "world%s" % (i, )}) + def send_message(i): + queue.put({'hello': 'world%s' % (i, )}) - pool = eventlet.GreenPool(10) - for i in xrange(n): - pool.spawn(send_message, i) - pool.waitall() + pool = eventlet.GreenPool(10) + for i in xrange(n): + pool.spawn(send_message, i) + pool.waitall() - queue.close() - connection.close() -if __name__ == "__main__": +if __name__ == '__main__': send_many(10) diff --git a/examples/simple_receive.py b/examples/simple_receive.py index 9d53aa35..e9b933b1 100644 --- a/examples/simple_receive.py +++ b/examples/simple_receive.py @@ -4,19 +4,19 @@ Example receiving a message using the SimpleQueue interface. from __future__ import with_statement -from kombu import BrokerConnection +from kombu import Connection #: Create connection #: If hostname, userid, password and virtual_host is not specified #: the values below are the default, but listed here so it can #: be easily changed. -with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn: +with Connection('amqp://guest:guest@localhost:5672//') as conn: #: SimpleQueue mimics the interface of the Python Queue module. #: First argument can either be a queue name or a kombu.Queue object. #: If a name, then the queue will be declared with the name as the queue #: name, exchange name and routing key. - with conn.SimpleQueue("kombu_demo") as queue: + with conn.SimpleQueue('kombu_demo') as queue: message = queue.get(block=True, timeout=10) message.ack() print(message.payload) diff --git a/examples/simple_send.py b/examples/simple_send.py index 76cdb9b5..e76fbcf0 100644 --- a/examples/simple_send.py +++ b/examples/simple_send.py @@ -8,20 +8,20 @@ message sent. """ from __future__ import with_statement -from kombu import BrokerConnection +from kombu import Connection #: Create connection #: If hostname, userid, password and virtual_host is not specified #: the values below are the default, but listed here so it can #: be easily changed. -with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn: +with Connection('amqp://guest:guest@localhost:5672//') as conn: #: SimpleQueue mimics the interface of the Python Queue module. #: First argument can either be a queue name or a kombu.Queue object. #: If a name, then the queue will be declared with the name as the queue #: name, exchange name and routing key. - with conn.SimpleQueue("kombu_demo") as queue: - queue.put({"hello": "world"}, serializer="json", compression="zlib") + with conn.SimpleQueue('kombu_demo') as queue: + queue.put({'hello': 'world'}, serializer='json', compression='zlib') ##### diff --git a/examples/simple_task_queue/client.py b/examples/simple_task_queue/client.py index 1ab6175a..e07b8c45 100644 --- a/examples/simple_task_queue/client.py +++ b/examples/simple_task_queue/client.py @@ -5,25 +5,25 @@ from kombu.pools import producers from queues import task_exchange -priority_to_routing_key = {"high": "hipri", - "mid": "midpri", - "low": "lopri"} +priority_to_routing_key = {'high': 'hipri', + 'mid': 'midpri', + 'low': 'lopri'} -def send_as_task(connection, fun, args=(), kwargs={}, priority="mid"): - payload = {"fun": fun, "args": args, "kwargs": kwargs} +def send_as_task(connection, fun, args=(), kwargs={}, priority='mid'): + payload = {'fun': fun, 'args': args, 'kwargs': kwargs} routing_key = priority_to_routing_key[priority] with producers[connection].acquire(block=True) as producer: maybe_declare(task_exchange, producer.channel) - producer.publish(payload, serializer="pickle", - compression="bzip2", + producer.publish(payload, serializer='pickle', + compression='bzip2', routing_key=routing_key) -if __name__ == "__main__": - from kombu import BrokerConnection +if __name__ == '__main__': + from kombu import Connection from tasks import hello_task - connection = BrokerConnection("amqp://guest:guest@localhost:5672//") - send_as_task(connection, fun=hello_task, args=("Kombu", ), kwargs={}, - priority="high") + connection = Connection('amqp://guest:guest@localhost:5672//') + send_as_task(connection, fun=hello_task, args=('Kombu', ), kwargs={}, + priority='high') diff --git a/examples/simple_task_queue/queues.py b/examples/simple_task_queue/queues.py index 680e7575..602c2b0e 100644 --- a/examples/simple_task_queue/queues.py +++ b/examples/simple_task_queue/queues.py @@ -1,6 +1,6 @@ from kombu import Exchange, Queue -task_exchange = Exchange("tasks", type="direct") -task_queues = [Queue("hipri", task_exchange, routing_key="hipri"), - Queue("midpri", task_exchange, routing_key="midpri"), - Queue("lopri", task_exchange, routing_key="lopri")] +task_exchange = Exchange('tasks', type='direct') +task_queues = [Queue('hipri', task_exchange, routing_key='hipri'), + Queue('midpri', task_exchange, routing_key='midpri'), + Queue('lopri', task_exchange, routing_key='lopri')] diff --git a/examples/simple_task_queue/worker.py b/examples/simple_task_queue/worker.py index 063a6b4d..3d933c4b 100644 --- a/examples/simple_task_queue/worker.py +++ b/examples/simple_task_queue/worker.py @@ -16,23 +16,23 @@ class Worker(ConsumerMixin): callbacks=[self.process_task])] def process_task(self, body, message): - fun = body["fun"] - args = body["args"] - kwargs = body["kwargs"] - self.info("Got task: %s", reprcall(fun.__name__, args, kwargs)) + fun = body['fun'] + args = body['args'] + kwargs = body['kwargs'] + self.info('Got task: %s', reprcall(fun.__name__, args, kwargs)) try: fun(*args, **kwdict(kwargs)) except Exception, exc: - self.error("task raised exception: %r", exc) + self.error('task raised exception: %r', exc) message.ack() -if __name__ == "__main__": - from kombu import BrokerConnection +if __name__ == '__main__': + from kombu import Connection from kombu.utils.debug import setup_logging - setup_logging(loglevel="INFO") + setup_logging(loglevel='INFO') - with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn: + with Connection('amqp://guest:guest@localhost:5672//') as conn: try: Worker(conn).run() except KeyboardInterrupt: - print("bye bye") + print('bye bye') diff --git a/funtests/transport.py b/funtests/transport.py index 76a0e010..5924a72a 100644 --- a/funtests/transport.py +++ b/funtests/transport.py @@ -9,7 +9,7 @@ import weakref from nose import SkipTest -from kombu import BrokerConnection +from kombu import Connection from kombu import Exchange, Queue from kombu.tests.utils import skip_if_quick @@ -104,7 +104,7 @@ class TransportCase(unittest.TestCase): options.setdefault("userid", self.userid) if self.password: options.setdefault("password", self.password) - return BrokerConnection(transport=self.transport, **options) + return Connection(transport=self.transport, **options) def do_connect(self): self.connection = self.get_connection(**self.connection_options) diff --git a/kombu/abstract.py b/kombu/abstract.py index daa4077f..97a50103 100644 --- a/kombu/abstract.py +++ b/kombu/abstract.py @@ -85,7 +85,7 @@ class MaybeChannelBound(Object): def revive(self, channel): """Revive channel after the connection has been re-established. - Used by :meth:`~kombu.connection.BrokerConnection.ensure`. + Used by :meth:`~kombu.Connection.ensure`. """ if self.is_bound: diff --git a/kombu/common.py b/kombu/common.py index 4ecfc2af..42bf0a67 100644 --- a/kombu/common.py +++ b/kombu/common.py @@ -26,7 +26,8 @@ from .utils import uuid __all__ = ['Broadcast', 'maybe_declare', 'uuid', 'itermessages', 'send_reply', 'isend_reply', - 'collect_replies', 'insured', 'ipublish'] + 'collect_replies', 'insured', 'ipublish', 'drain_consumer', + 'eventloop'] insured_logger = Log('kombu.insurance') diff --git a/kombu/connection.py b/kombu/connection.py index 03fdc2eb..ff049c81 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -32,14 +32,13 @@ from .utils.url import parse_url _LOG_CONNECTION = os.environ.get('KOMBU_LOG_CONNECTION', False) _LOG_CHANNEL = os.environ.get('KOMBU_LOG_CHANNEL', False) -__all__ = ['parse_url', 'BrokerConnection', 'Resource', - 'ConnectionPool', 'ChannelPool'] +__all__ = ['Connection', 'ConnectionPool', 'ChannelPool'] URI_PASSTHROUGH = frozenset(['sqla', 'sqlalchemy']) logger = get_logger(__name__) -class BrokerConnection(object): +class Connection(object): """A connection to the broker. :param URL: Connection URL. @@ -552,7 +551,7 @@ class BrokerConnection(object): def __repr__(self): """``x.__repr__() <==> repr(x)``""" - return '<BrokerConnection: %s at 0x%x>' % (self.as_uri(), id(self)) + return '<Connection: %s at 0x%x>' % (self.as_uri(), id(self)) def __copy__(self): """``x.__copy__() <==> copy(x)``""" @@ -634,7 +633,7 @@ class BrokerConnection(object): @property def is_evented(self): return getattr(self.transport, 'on_poll_start', None) -Connection = BrokerConnection +BrokerConnection = Connection class Resource(object): @@ -851,6 +850,6 @@ class ChannelPool(Resource): def maybe_channel(channel): """Returns channel, or returns the default_channel if it's a connection.""" - if isinstance(channel, BrokerConnection): + if isinstance(channel, Connection): return channel.default_channel return channel diff --git a/kombu/mixins.py b/kombu/mixins.py index 25168b03..50f1d499 100644 --- a/kombu/mixins.py +++ b/kombu/mixins.py @@ -33,7 +33,7 @@ class ConsumerMixin(LogMixin): (eventlet/gevent) too. The basic class would need a :attr:`connection` attribute - which must be a :class:`~kombu.connection.BrokerConnection` instance, + which must be a :class:`~kombu.Connection` instance, and define a :meth:`get_consumers` method that returns a list of :class:`kombu.messaging.Consumer` instances to use. Supporting multiple consumers is important so that multiple diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py index eb464855..ae8a726f 100644 --- a/kombu/tests/test_connection.py +++ b/kombu/tests/test_connection.py @@ -5,8 +5,8 @@ import pickle from nose import SkipTest -from kombu.connection import BrokerConnection, Resource, parse_url -from kombu.messaging import Consumer, Producer +from kombu import Connection, Consumer, Producer, parse_url +from kombu.connection import Resource from .mocks import Transport from .utils import TestCase @@ -36,7 +36,7 @@ class test_connection_utils(TestCase): self.assertEqual(result['hostname'], 'example.com/') def test_parse_generated_as_uri(self): - conn = BrokerConnection(self.url) + conn = Connection(self.url) info = conn.info() for k, v in self.expected.items(): self.assertEqual(info[k], v) @@ -46,12 +46,12 @@ class test_connection_utils(TestCase): @skip_if_not_module('pymongo') def test_as_uri_when_mongodb(self): - x = BrokerConnection('mongodb://localhost') + x = Connection('mongodb://localhost') self.assertTrue(x.as_uri()) def test_bogus_scheme(self): with self.assertRaises(KeyError): - BrokerConnection('bogus://localhost:7421').transport + Connection('bogus://localhost:7421').transport def assert_info(self, conn, **fields): info = conn.info() @@ -60,79 +60,77 @@ class test_connection_utils(TestCase): def test_rabbitmq_example_urls(self): # see Appendix A of http://www.rabbitmq.com/uri-spec.html - C = BrokerConnection self.assert_info( - C('amqp://user:pass@host:10000/vhost'), + Connection('amqp://user:pass@host:10000/vhost'), userid='user', password='pass', hostname='host', port=10000, virtual_host='vhost') self.assert_info( - C('amqp://user%61:%61pass@ho%61st:10000/v%2fhost'), + Connection('amqp://user%61:%61pass@ho%61st:10000/v%2fhost'), userid='usera', password='apass', hostname='hoast', port=10000, virtual_host='v/host') self.assert_info( - C('amqp://'), + Connection('amqp://'), userid='guest', password='guest', hostname='localhost', port=5672, virtual_host='/') self.assert_info( - C('amqp://:@/'), + Connection('amqp://:@/'), userid='guest', password='guest', hostname='localhost', port=5672, virtual_host='/') self.assert_info( - C('amqp://user@/'), + Connection('amqp://user@/'), userid='user', password='guest', hostname='localhost', port=5672, virtual_host='/') self.assert_info( - C('amqp://user:pass@/'), + Connection('amqp://user:pass@/'), userid='user', password='pass', hostname='localhost', port=5672, virtual_host='/') self.assert_info( - C('amqp://host'), + Connection('amqp://host'), userid='guest', password='guest', hostname='host', port=5672, virtual_host='/') self.assert_info( - C('amqp://:10000'), + Connection('amqp://:10000'), userid='guest', password='guest', hostname='localhost', port=10000, virtual_host='/') self.assert_info( - C('amqp:///vhost'), + Connection('amqp:///vhost'), userid='guest', password='guest', hostname='localhost', port=5672, virtual_host='vhost') self.assert_info( - C('amqp://host/'), + Connection('amqp://host/'), userid='guest', password='guest', hostname='host', port=5672, virtual_host='/') self.assert_info( - C('amqp://host/%2f'), + Connection('amqp://host/%2f'), userid='guest', password='guest', hostname='host', port=5672, virtual_host='/') def test_url_IPV6(self): - C = BrokerConnection raise SkipTest("urllib can't parse ipv6 urls") self.assert_info( - C('amqp://[::1]'), + Connection('amqp://[::1]'), userid='guest', password='guest', hostname='[::1]', port=5672, virtual_host='/') @@ -141,7 +139,7 @@ class test_connection_utils(TestCase): class test_Connection(TestCase): def setUp(self): - self.conn = BrokerConnection(port=5672, transport=Transport) + self.conn = Connection(port=5672, transport=Transport) def test_establish_connection(self): conn = self.conn @@ -177,7 +175,7 @@ class test_Connection(TestCase): def close_connection(self, connection): raise _CustomError('foo') - conn = BrokerConnection(transport=MyTransport) + conn = Connection(transport=MyTransport) conn.connect() conn.close() self.assertTrue(conn._closed) @@ -190,7 +188,7 @@ class test_Connection(TestCase): def test_close_when_default_channel_close_raises(self): - class Conn(BrokerConnection): + class Conn(Connection): @property def connection_errors(self): @@ -299,7 +297,7 @@ class test_Connection(TestCase): class MyTransport(Transport): channel_errors = (KeyError, ValueError) - conn = BrokerConnection(transport=MyTransport) + conn = Connection(transport=MyTransport) self.assertTupleEqual(conn.channel_errors, (KeyError, ValueError)) def test_connection_errors(self): @@ -307,7 +305,7 @@ class test_Connection(TestCase): class MyTransport(Transport): connection_errors = (KeyError, ValueError) - conn = BrokerConnection(transport=MyTransport) + conn = Connection(transport=MyTransport) self.assertTupleEqual(conn.connection_errors, (KeyError, ValueError)) @@ -316,8 +314,8 @@ class test_Connection_with_transport_options(TestCase): transport_options = {'pool_recycler': 3600, 'echo': True} def setUp(self): - self.conn = BrokerConnection(port=5672, transport=Transport, - transport_options=self.transport_options) + self.conn = Connection(port=5672, transport=Transport, + transport_options=self.transport_options) def test_establish_connection(self): conn = self.conn @@ -433,8 +431,7 @@ class test_ConnectionPool(ResourceCase): abstract = False def create_resource(self, limit, preload): - return BrokerConnection(port=5672, transport=Transport) \ - .Pool(limit, preload) + return Connection(port=5672, transport=Transport).Pool(limit, preload) def test_setup(self): P = self.create_resource(10, 2) @@ -450,7 +447,7 @@ class test_ConnectionPool(ResourceCase): def test_prepare_not_callable(self): P = self.create_resource(None, None) - conn = BrokerConnection('memory://') + conn = Connection('memory://') self.assertIs(P.prepare(conn), conn) def test_acquire_channel(self): @@ -463,7 +460,7 @@ class test_ChannelPool(ResourceCase): abstract = False def create_resource(self, limit, preload): - return BrokerConnection(port=5672, transport=Transport) \ + return Connection(port=5672, transport=Transport) \ .ChannelPool(limit, preload) def test_setup(self): @@ -481,6 +478,6 @@ class test_ChannelPool(ResourceCase): def test_prepare_not_callable(self): P = self.create_resource(10, 0) - conn = BrokerConnection('memory://') + conn = Connection('memory://') chan = conn.default_channel self.assertIs(P.prepare(chan), chan) diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py index 783999a8..bb9f4610 100644 --- a/kombu/tests/test_messaging.py +++ b/kombu/tests/test_messaging.py @@ -5,7 +5,7 @@ import anyjson from mock import patch -from kombu.connection import BrokerConnection +from kombu import Connection from kombu.exceptions import MessageStateError from kombu.messaging import Consumer, Producer from kombu.entity import Exchange, Queue @@ -19,7 +19,7 @@ class test_Producer(TestCase): def setUp(self): self.exchange = Exchange('foo', 'direct') - self.connection = BrokerConnection(transport=Transport) + self.connection = Connection(transport=Transport) self.connection.connect() self.assertTrue(self.connection.connection.connected) self.assertFalse(self.exchange.is_bound) @@ -124,7 +124,7 @@ class test_Producer(TestCase): def test_revive_when_channel_is_connection(self): p = self.connection.Producer() p.exchange = Mock() - new_conn = BrokerConnection('memory://') + new_conn = Connection('memory://') defchan = new_conn.default_channel p.revive(new_conn) @@ -188,7 +188,7 @@ class test_Producer(TestCase): class test_Consumer(TestCase): def setUp(self): - self.connection = BrokerConnection(transport=Transport) + self.connection = Connection(transport=Transport) self.connection.connect() self.assertTrue(self.connection.connection.connected) self.exchange = Exchange('foo', 'direct') diff --git a/kombu/tests/test_pidbox.py b/kombu/tests/test_pidbox.py index 72d974ea..6199efa7 100644 --- a/kombu/tests/test_pidbox.py +++ b/kombu/tests/test_pidbox.py @@ -3,8 +3,8 @@ from __future__ import with_statement import socket +from kombu import Connection from kombu import pidbox -from kombu.connection import BrokerConnection from kombu.utils import uuid from .utils import TestCase @@ -24,7 +24,7 @@ class test_Mailbox(TestCase): return 'COLLECTED' self.mailbox = Mailbox('test_pidbox') - self.connection = BrokerConnection(transport='memory') + self.connection = Connection(transport='memory') self.state = {'var': 1} self.handlers = {'mymethod': self._handler} self.bound = self.mailbox(self.connection) diff --git a/kombu/tests/test_simple.py b/kombu/tests/test_simple.py index 7580e650..26b09080 100644 --- a/kombu/tests/test_simple.py +++ b/kombu/tests/test_simple.py @@ -3,7 +3,7 @@ from __future__ import with_statement from Queue import Empty -from kombu import BrokerConnection, Exchange, Queue +from kombu import Connection, Exchange, Queue from .utils import TestCase from .utils import Mock @@ -25,7 +25,7 @@ class SimpleBase(TestCase): def setUp(self): if not self.abstract: - self.connection = BrokerConnection(transport='memory') + self.connection = Connection(transport='memory') self.q = self.Queue(None, no_ack=True) def tearDown(self): diff --git a/kombu/tests/transport/test_amqplib.py b/kombu/tests/transport/test_amqplib.py index 803af460..6d40a001 100644 --- a/kombu/tests/transport/test_amqplib.py +++ b/kombu/tests/transport/test_amqplib.py @@ -3,7 +3,7 @@ from __future__ import absolute_import import sys from kombu.transport import amqplib -from kombu.connection import BrokerConnection +from kombu.connection import Connection from kombu.tests.utils import TestCase from kombu.tests.utils import mask_modules, Mock @@ -71,7 +71,7 @@ class test_Channel(TestCase): class test_Transport(TestCase): def setUp(self): - self.connection = BrokerConnection('amqplib://') + self.connection = Connection('amqplib://') self.transport = self.connection.transport def test_create_channel(self): @@ -134,7 +134,7 @@ class test_amqplib(TestCase): class Transport(amqplib.Transport): Connection = MockConnection - c = BrokerConnection(port=None, transport=Transport).connect() + c = Connection(port=None, transport=Transport).connect() self.assertEqual(c['host'], '127.0.0.1:%s' % (Transport.default_port, )) @@ -143,5 +143,5 @@ class test_amqplib(TestCase): class Transport(amqplib.Transport): Connection = MockConnection - c = BrokerConnection(port=1337, transport=Transport).connect() + c = Connection(port=1337, transport=Transport).connect() self.assertEqual(c['host'], '127.0.0.1:1337') diff --git a/kombu/tests/transport/test_base.py b/kombu/tests/transport/test_base.py index 691e132c..e369449f 100644 --- a/kombu/tests/transport/test_base.py +++ b/kombu/tests/transport/test_base.py @@ -1,7 +1,7 @@ from __future__ import absolute_import from __future__ import with_statement -from kombu import BrokerConnection, Consumer, Producer, Queue +from kombu import Connection, Consumer, Producer, Queue from kombu.transport.base import Message, StdChannel, Transport from kombu.tests.utils import TestCase @@ -11,7 +11,7 @@ from kombu.tests.utils import Mock class test_StdChannel(TestCase): def setUp(self): - self.conn = BrokerConnection('memory://') + self.conn = Connection('memory://') self.channel = self.conn.channel() self.channel.queues.clear() self.conn.connection.state.clear() @@ -40,7 +40,7 @@ class test_StdChannel(TestCase): class test_Message(TestCase): def setUp(self): - self.conn = BrokerConnection('memory://') + self.conn = Connection('memory://') self.channel = self.conn.channel() self.message = Message(self.channel, delivery_tag=313) diff --git a/kombu/tests/transport/test_memory.py b/kombu/tests/transport/test_memory.py index 2865d6a4..306a9e02 100644 --- a/kombu/tests/transport/test_memory.py +++ b/kombu/tests/transport/test_memory.py @@ -3,7 +3,7 @@ from __future__ import with_statement import socket -from kombu.connection import BrokerConnection +from kombu.connection import Connection from kombu.entity import Exchange, Queue from kombu.messaging import Consumer, Producer @@ -13,7 +13,7 @@ from kombu.tests.utils import TestCase class test_MemoryTransport(TestCase): def setUp(self): - self.c = BrokerConnection(transport='memory') + self.c = Connection(transport='memory') self.e = Exchange('test_transport_memory') self.q = Queue('test_transport_memory', exchange=self.e, diff --git a/kombu/tests/transport/test_mongodb.py b/kombu/tests/transport/test_mongodb.py index 1d4145be..b4aa7618 100644 --- a/kombu/tests/transport/test_mongodb.py +++ b/kombu/tests/transport/test_mongodb.py @@ -2,7 +2,7 @@ from __future__ import absolute_import from nose import SkipTest -from kombu.connection import BrokerConnection +from kombu.connection import Connection from kombu.tests.utils import TestCase, skip_if_not_module @@ -27,23 +27,23 @@ class test_mongodb(TestCase): Connection = MockConnection url = 'mongodb://' - c = BrokerConnection(url, transport=Transport).connect() + c = Connection(url, transport=Transport).connect() client = c.channels[0].client self.assertEquals(client.name, 'kombu_default') self.assertEquals(client.connection.host, '127.0.0.1') url = 'mongodb://localhost' - c = BrokerConnection(url, transport=Transport).connect() + c = Connection(url, transport=Transport).connect() client = c.channels[0].client self.assertEquals(client.name, 'kombu_default') url = 'mongodb://localhost/dbname' - c = BrokerConnection(url, transport=Transport).connect() + c = Connection(url, transport=Transport).connect() client = c.channels[0].client self.assertEquals(client.name, 'dbname') url = 'mongodb://localhost,example.org:29017/dbname' - c = BrokerConnection(url, transport=Transport).connect() + c = Connection(url, transport=Transport).connect() client = c.channels[0].client nodes = client.connection.nodes @@ -53,15 +53,15 @@ class test_mongodb(TestCase): # Passing options breaks kombu's _init_params method # url = 'mongodb://localhost,localhost2:29017/dbname?safe=true' - # c = BrokerConnection(url, transport=Transport).connect() + # c = Connection(url, transport=Transport).connect() # client = c.channels[0].client url = 'mongodb://localhost:27017,localhost2:29017/dbname' - c = BrokerConnection(url, transport=Transport).connect() + c = Connection(url, transport=Transport).connect() client = c.channels[0].client url = 'mongodb://username:password@localhost/dbname' - c = BrokerConnection(url, transport=Transport).connect() + c = Connection(url, transport=Transport).connect() # Assuming there's no user 'username' with password 'password' # configured in mongodb diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py index 3d9fb361..90d4d8b4 100644 --- a/kombu/tests/transport/test_redis.py +++ b/kombu/tests/transport/test_redis.py @@ -9,7 +9,7 @@ from collections import defaultdict from itertools import count from Queue import Empty, Queue as _Queue -from kombu.connection import BrokerConnection +from kombu.connection import Connection from kombu.entity import Exchange, Queue from kombu.exceptions import InconsistencyError, VersionMismatch from kombu.messaging import Consumer, Producer @@ -217,7 +217,7 @@ class Transport(redis.Transport): class test_Channel(TestCase): def setUp(self): - self.connection = BrokerConnection(transport=Transport) + self.connection = Connection(transport=Transport) self.channel = self.connection.channel() def test_basic_consume_when_fanout_queue(self): @@ -468,7 +468,7 @@ class test_Channel(TestCase): class test_Redis(TestCase): def setUp(self): - self.connection = BrokerConnection(transport=Transport) + self.connection = Connection(transport=Transport) self.exchange = Exchange('test_Redis', type='direct') self.queue = Queue('test_Redis', self.exchange, 'test_Redis') @@ -489,7 +489,7 @@ class test_Redis(TestCase): self.assertIsNone(self.queue(channel).get()) def test_publish__consume(self): - connection = BrokerConnection(transport=Transport) + connection = Connection(transport=Transport) channel = connection.channel() producer = Producer(channel, self.exchange, routing_key='test_Redis') consumer = Consumer(channel, self.queue) @@ -526,45 +526,45 @@ class test_Redis(TestCase): channel.close() def test_db_values(self): - c1 = BrokerConnection(virtual_host=1, + c1 = Connection(virtual_host=1, transport=Transport).channel() self.assertEqual(c1.client.db, 1) - c2 = BrokerConnection(virtual_host='1', + c2 = Connection(virtual_host='1', transport=Transport).channel() self.assertEqual(c2.client.db, 1) - c3 = BrokerConnection(virtual_host='/1', + c3 = Connection(virtual_host='/1', transport=Transport).channel() self.assertEqual(c3.client.db, 1) with self.assertRaises(Exception): - BrokerConnection(virtual_host='/foo', - transport=Transport).channel() + Connection(virtual_host='/foo', + transport=Transport).channel() def test_db_port(self): - c1 = BrokerConnection(port=None, transport=Transport).channel() + c1 = Connection(port=None, transport=Transport).channel() self.assertEqual(c1.client.port, Transport.default_port) c1.close() - c2 = BrokerConnection(port=9999, transport=Transport).channel() + c2 = Connection(port=9999, transport=Transport).channel() self.assertEqual(c2.client.port, 9999) c2.close() def test_close_poller_not_active(self): - c = BrokerConnection(transport=Transport).channel() + c = Connection(transport=Transport).channel() cycle = c.connection.cycle c.client.connection c.close() self.assertNotIn(c, cycle._channels) def test_close_ResponseError(self): - c = BrokerConnection(transport=Transport).channel() + c = Connection(transport=Transport).channel() c.client.bgsave_raises_ResponseError = True c.close() def test_close_disconnects(self): - c = BrokerConnection(transport=Transport).channel() + c = Connection(transport=Transport).channel() conn1 = c.client.connection conn2 = c.subclient.connection c.close() @@ -583,7 +583,7 @@ class test_Redis(TestCase): @module_exists(myredis, exceptions) def _do_test(): - conn = BrokerConnection(transport=Transport) + conn = Connection(transport=Transport) chan = conn.channel() self.assertTrue(chan.Client) self.assertTrue(chan.ResponseError) diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py index d4a5fe3f..e08c4c14 100644 --- a/kombu/tests/transport/virtual/test_base.py +++ b/kombu/tests/transport/virtual/test_base.py @@ -5,7 +5,7 @@ import warnings from mock import patch -from kombu.connection import BrokerConnection +from kombu import Connection from kombu.exceptions import StdChannelError from kombu.transport import virtual from kombu.utils import uuid @@ -16,12 +16,11 @@ from kombu.tests.utils import Mock, redirect_stdouts def client(**kwargs): - return BrokerConnection(transport='kombu.transport.virtual.Transport', - **kwargs) + return Connection(transport='kombu.transport.virtual.Transport', **kwargs) def memory_client(): - return BrokerConnection(transport='memory') + return Connection(transport='memory') class test_BrokerState(TestCase): diff --git a/kombu/tests/transport/virtual/test_exchange.py b/kombu/tests/transport/virtual/test_exchange.py index 50498021..d1b2a85a 100644 --- a/kombu/tests/transport/virtual/test_exchange.py +++ b/kombu/tests/transport/virtual/test_exchange.py @@ -1,7 +1,7 @@ from __future__ import absolute_import from __future__ import with_statement -from kombu import BrokerConnection +from kombu import Connection from kombu.transport.virtual import exchange from kombu.tests.mocks import Transport @@ -14,8 +14,7 @@ class ExchangeCase(TestCase): def setUp(self): if self.type: - self.e = self.type(BrokerConnection(transport=Transport) - .channel()) + self.e = self.type(Connection(transport=Transport).channel()) class test_Direct(ExchangeCase): diff --git a/kombu/transport/base.py b/kombu/transport/base.py index 8ae44419..bc934a84 100644 --- a/kombu/transport/base.py +++ b/kombu/transport/base.py @@ -177,7 +177,7 @@ class Transport(object): """Base class for transports.""" Management = Management - #: The :class:`~kombu.connection.BrokerConnection` owning this instance. + #: The :class:`~kombu.Connection` owning this instance. client = None #: Default port used when no port has been specified. diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index 22d345af..d08aa757 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -656,7 +656,7 @@ class Management(base.Management): class Transport(base.Transport): """Virtual transport. - :param client: :class:`~kombu.connection.BrokerConnection` instance + :param client: :class:`~kombu.Connection` instance """ Channel = Channel |