summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2012-06-24 16:32:17 +0100
committerAsk Solem <ask@celeryproject.org>2012-06-24 16:32:17 +0100
commitfa816f6dc920aeb7b68b75f98fa1656da57d05c8 (patch)
treec2f2d076cc62ea62405fb6f267accab61d1f67c9
parent4922c4aaea77be7a32a7d904104b055159e0da3e (diff)
downloadkombu-fa816f6dc920aeb7b68b75f98fa1656da57d05c8.tar.gz
BrokerConnection is now Connection in docs
-rw-r--r--Changelog116
-rw-r--r--README.rst26
-rw-r--r--docs/reference/kombu.connection.rst6
-rw-r--r--docs/reference/kombu.pidbox.rst2
-rw-r--r--docs/userguide/connections.rst12
-rw-r--r--docs/userguide/pools.rst38
-rw-r--r--docs/userguide/simple.rst44
-rw-r--r--examples/complete_receive.py25
-rw-r--r--examples/complete_receive_manual.py45
-rw-r--r--examples/complete_send.py18
-rw-r--r--examples/complete_send_manual.py35
-rw-r--r--examples/simple_eventlet_receive.py41
-rw-r--r--examples/simple_eventlet_send.py32
-rw-r--r--examples/simple_receive.py6
-rw-r--r--examples/simple_send.py8
-rw-r--r--examples/simple_task_queue/client.py24
-rw-r--r--examples/simple_task_queue/queues.py8
-rw-r--r--examples/simple_task_queue/worker.py20
-rw-r--r--funtests/transport.py4
-rw-r--r--kombu/abstract.py2
-rw-r--r--kombu/common.py3
-rw-r--r--kombu/connection.py11
-rw-r--r--kombu/mixins.py2
-rw-r--r--kombu/tests/test_connection.py59
-rw-r--r--kombu/tests/test_messaging.py8
-rw-r--r--kombu/tests/test_pidbox.py4
-rw-r--r--kombu/tests/test_simple.py4
-rw-r--r--kombu/tests/transport/test_amqplib.py8
-rw-r--r--kombu/tests/transport/test_base.py6
-rw-r--r--kombu/tests/transport/test_memory.py4
-rw-r--r--kombu/tests/transport/test_mongodb.py16
-rw-r--r--kombu/tests/transport/test_redis.py30
-rw-r--r--kombu/tests/transport/virtual/test_base.py7
-rw-r--r--kombu/tests/transport/virtual/test_exchange.py5
-rw-r--r--kombu/transport/base.py2
-rw-r--r--kombu/transport/virtual/__init__.py2
36 files changed, 307 insertions, 376 deletions
diff --git a/Changelog b/Changelog
index bfc4b539..4f064248 100644
--- a/Changelog
+++ b/Changelog
@@ -1,7 +1,19 @@
+.. _changelog:
+
================
Change history
================
+.. _version-2.2.3:
+
+2.2.3
+=====
+:release-date: TBA
+
+- Connection.clone()
+
+.. _version-2.2.2:
+
2.2.2
=====
:release-date: 2012-06-22 02:30 P.M BST
@@ -42,7 +54,7 @@
- Exchange & Queue can now be bound to connections (which will use the default
channel):
- >>> exchange = Exchange("name")
+ >>> exchange = Exchange('name')
>>> bound_exchange = exchange(connection)
>>> bound_exchange.declare()
@@ -140,8 +152,8 @@ News
The timeout is set to one hour by default, but
can be changed by configuring a transport option:
- >>> BrokerConnection("redis://", transport_options={
- ... "visibility_timeout": 1800, # 30 minutes
+ >>> Connection('redis://', transport_options={
+ ... 'visibility_timeout': 1800, # 30 minutes
... })
**NOTE**: Messages that have not been acked will be redelivered
@@ -151,7 +163,7 @@ News
twice (or more). If you plan on using long ETA/countdowns you
should tweak the visibility timeout accordingly::
- BROKER_TRANSPORT_OPTIONS = {"visibility_timeout": 18000} # 5 hours
+ BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 18000} # 5 hours
Setting a long timeout means that it will take a long time
for messages to be redelivered in the event of a power failure,
@@ -179,8 +191,8 @@ News
The number of steps can be configured by setting the ``priority_steps``
transport option, which must be a list of numbers in **sorted order**::
- >>> x = BrokerConnection("redis://", transport_options={
- ... "priority_steps": [0, 2, 4, 6, 8, 9],
+ >>> x = Connection('redis://', transport_options={
+ ... 'priority_steps': [0, 2, 4, 6, 8, 9],
... })
Priorities implemented in this way is not as reliable as
@@ -372,7 +384,7 @@ callback-based API later.
Example::
- BrokerConnection("sqla+mysql://localhost/db")
+ Connection('sqla+mysql://localhost/db')
* Better support for anonymous queues (Issue #116).
@@ -490,8 +502,8 @@ callback-based API later.
Example::
- >>> BrokerConnection("sqs://", transport_options={
- ... "polling_interval": 5.0})
+ >>> Connection('sqs://', transport_options={
+ ... 'polling_interval': 5.0})
The default interval is transport specific, but usually
1.0s (or 5.0s for the Django database transport, which
@@ -556,7 +568,7 @@ New Transports
to ``kombu.transport.django``::
INSTALLED_APPS = (…,
- "kombu.transport.django")
+ 'kombu.transport.django')
If you have previously used django-kombu, then there is no need
to recreate the tables, as the old tables will be fully compatible
@@ -582,8 +594,8 @@ News
The queue prefix can be set using the transport option
``queue_name_prefix``::
- BrokerTransport("SQS://", transport_options={
- "queue_name_prefix": "myapp"})
+ BrokerTransport('SQS://', transport_options={
+ 'queue_name_prefix': 'myapp'})
Contributed by Nitzan Miron.
@@ -592,12 +604,12 @@ News
Retry is enabled by the ``reply`` argument, and retry options
set by the ``retry_policy`` argument::
- exchange = Exchange("foo")
+ exchange = Exchange('foo')
producer.publish(message, exchange=exchange, retry=True,
declare=[exchange], retry_policy={
- "interval_start": 1.0})
+ 'interval_start': 1.0})
- See :meth:`~kombu.connection.BrokerConnection.ensure`
+ See :meth:`~kombu.connection.Connection.ensure`
for a list of supported retry policy options.
* ``Producer.publish`` now supports a ``declare`` keyword argument.
@@ -629,9 +641,9 @@ Fixes
>>> from kombu.serialization import registry
# by name
- >>> registry.disable("pickle")
+ >>> registry.disable('pickle')
# or by mime-type.
- >>> registry.disable("application/x-python-serialize")
+ >>> registry.disable('application/x-python-serialize')
.. _version-1.5.0:
@@ -656,8 +668,8 @@ Fixes
If wanted the dead-letter queue can still be enabled, by using
the ``deadletter_queue`` transport option::
- >>> x = BrokerConnection("redis://",
- ... transport_options={"deadletter_queue": "ae.undeliver"})
+ >>> x = Connection('redis://',
+ ... transport_options={'deadletter_queue': 'ae.undeliver'})
In addition, an :class:`UndeliverableWarning` is now emitted when
the dead-letter queue is enabled and a message ends up there.
@@ -730,7 +742,7 @@ Fixes
Fix contributed by Rafael Duran Castaneda
-* BrokerConnection.Consumer had the wrong signature.
+* Connection.Consumer had the wrong signature.
Fix contributed by Pavel Skvazh
@@ -778,12 +790,12 @@ Fixes
The connections default channel will then be used.
- In addition shortcut methods has been added to BrokerConnection::
+ In addition shortcut methods has been added to Connection::
>>> connection.Producer(exchange)
>>> connection.Consumer(queues=..., callbacks=...)
-* BrokerConnection has aquired a ``connected`` attribute that
+* Connection has aquired a ``connected`` attribute that
can be used to check if the connection instance has established
a connection.
@@ -858,11 +870,11 @@ Fixes
* Last release broke after fork for pool reinitialization.
* Producer/Consumer now has a ``connection`` attribute,
- giving access to the :class:`BrokerConnection` of the
+ giving access to the :class:`Connection` of the
instance.
* Pika: Channels now have access to the underlying
- :class:`BrokerConnection` instance using ``channel.connection.client``.
+ :class:`Connection` instance using ``channel.connection.client``.
This was previously required by the ``Simple`` classes and is now
also required by :class:`Consumer` and :class:`Producer`.
@@ -887,7 +899,7 @@ Fixes
for example the default broker is expressed as::
- >>> BrokerConnection("amqp://guest:guest@localhost:5672//")
+ >>> Connection('amqp://guest:guest@localhost:5672//')
Transport defaults to amqp, and is not required.
user, password, port and virtual_host is also not mandatory and
@@ -912,20 +924,20 @@ Fixes
* Now comes with default global connection and producer pools.
The acquire a connection using the connection parameters
- from a :class:`BrokerConnection`::
+ from a :class:`Connection`::
- >>> from kombu import BrokerConnection, connections
- >>> connection = BrokerConnection("amqp://guest:guest@localhost//")
+ >>> from kombu import Connection, connections
+ >>> connection = Connection('amqp://guest:guest@localhost//')
>>> with connections[connection].acquire(block=True):
... # do something with connection
To acquire a producer using the connection parameters
- from a :class:`BrokerConnection`::
+ from a :class:`Connection`::
- >>> from kombu import BrokerConnection, producers
- >>> connection = BrokerConnection("amqp://guest:guest@localhost//")
+ >>> from kombu import Connection, producers
+ >>> connection = Connection('amqp://guest:guest@localhost//')
>>> with producers[connection].acquire(block=True):
- ... producer.publish({"hello": "world"}, exchange="hello")
+ ... producer.publish({'hello': 'world'}, exchange='hello')
Acquiring a producer will in turn also acquire a connection
from the associated pool in ``connections``, so you the number
@@ -998,7 +1010,7 @@ Fixes
* Fixes MemoryError while importing ctypes on SELinux (Issue #52).
-* ``BrokerConnection.autoretry`` is a version of ``ensure`` that works
+* ``Connection.autoretry`` is a version of ``ensure`` that works
with arbitrary functions (i.e. it does not need an associated object
that implements the ``revive`` method.
@@ -1016,7 +1028,7 @@ Fixes
The connection will be established as needed.
-* ``BrokerConnection.ensure`` now supports an ``on_revive`` callback
+* ``Connection.ensure`` now supports an ``on_revive`` callback
that is applied whenever the connection is re-established.
* ``Consumer.consuming_from(queue)`` returns True if the Consumer is
@@ -1062,8 +1074,8 @@ Fixes
This can be disabled by setting the `supports_fanout` transport option:
- >>> BrokerConnection(transport="SQS",
- ... transport_options={"supports_fanout": False})
+ >>> Connection(transport='SQS',
+ ... transport_options={'supports_fanout': False})
* SQS: Now properly deletes a message when a message is acked.
@@ -1093,9 +1105,9 @@ Fixes
Usage:
- >>> conn = BrokerConnection(transport="SQS",
- ... userid=aws_access_key_id,
- ... password=aws_secret_access_key)
+ >>> conn = Connection(transport='SQS',
+ ... userid=aws_access_key_id,
+ ... password=aws_secret_access_key)
The environment variables :envvar:`AWS_ACCESS_KEY_ID` and
:envvar:`AWS_SECRET_ACCESS_KEY` are also supported.
@@ -1104,7 +1116,7 @@ Fixes
* amqplib transport: Now supports `login_method` for SSL auth.
- :class:`BrokerConnection` now supports the `login_method`
+ :class:`Connection` now supports the `login_method`
keyword argument.
Default `login_method` is ``AMQPLAIN``.
@@ -1186,8 +1198,8 @@ Important Notes
If you need to disable base64 encoding then you can do so
via the transport options::
- BrokerConnection(transport="...",
- transport_options={"body_encoding": None})
+ Connection(transport='...',
+ transport_options={'body_encoding': None})
**For transport authors**:
@@ -1212,20 +1224,20 @@ Important Notes
acquired next. I.e. if only a single thread is using the pool
this means only a single connection will ever be used.
-* BrokerConnection: Cloned connections did not inherit transport_options
+* Connection: Cloned connections did not inherit transport_options
(``__copy__``).
* contrib/requirements is now located in the top directory
of the distribution.
* MongoDB: Now supports authentication using the ``userid`` and ``password``
- arguments to :class:`BrokerConnection` (Issue #30).
+ arguments to :class:`Connection` (Issue #30).
-* BrokerConnection: Default autentication credentials are now delegated to
+* Connection: Default autentication credentials are now delegated to
the individual transports.
-
+
This means that the ``userid`` and ``password`` arguments to
- BrokerConnection is no longer *guest/guest* by default.
+ Connection is no longer *guest/guest* by default.
The amqplib and pika transports will still have the default
credentials.
@@ -1265,7 +1277,7 @@ Important Notes
* Redis: Creating a connection now ensures the connection is established.
- This means ``BrokerConnection.ensure_connection`` works properly with
+ This means ``Connection.ensure_connection`` works properly with
Redis.
* consumer_tag argument to ``Queue.consume`` can't be :const:`None`
@@ -1274,7 +1286,7 @@ Important Notes
A None value is now automatically converted to empty string.
An empty string will make the server generate a unique tag.
-* BrokerConnection now supports a ``transport_options`` argument.
+* Connection now supports a ``transport_options`` argument.
This can be used to pass additional arguments to transports.
@@ -1308,15 +1320,15 @@ Important Notes
**Example Usage**::
$ KOMBU_LOG_DEBUG=1 python
- >>> from kombu import BrokerConnection
- >>> conn = BrokerConnection()
+ >>> from kombu import Connection
+ >>> conn = Connection()
>>> channel = conn.channel()
Start from server, version: 8.0, properties:
{u'product': 'RabbitMQ',.............. }
Open OK! known_hosts []
using channel_id: 1
Channel open
- >>> channel.queue_declare("myq", passive=True)
+ >>> channel.queue_declare('myq', passive=True)
[Kombu channel:1] queue_declare('myq', passive=True)
(u'myq', 0, 1)
diff --git a/README.rst b/README.rst
index 57a5a96a..e9cb62db 100644
--- a/README.rst
+++ b/README.rst
@@ -124,22 +124,22 @@ Quick overview
::
- from kombu import BrokerConnection, Exchange, Queue
+ from kombu import Connection, Exchange, Queue
- media_exchange = Exchange("media", "direct", durable=True)
- video_queue = Queue("video", exchange=media_exchange, routing_key="video")
+ media_exchange = Exchange('media', 'direct', durable=True)
+ video_queue = Queue('video', exchange=media_exchange, routing_key='video')
def process_media(body, message):
print body
message.ack()
# connections
- with BrokerConnection("amqp://guest:guest@localhost//") as conn:
+ with Connection('amqp://guest:guest@localhost//') as conn:
# produce
- with conn.Producer(serializer="json") as producer:
- producer.publish({"name": "/tmp/lolcat1.avi", "size": 1301013},
- exchange=media_exchange, routing_key="video",
+ with conn.Producer(serializer='json') as producer:
+ producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
+ exchange=media_exchange, routing_key='video',
declare=[video_queue])
# the declare above, makes sure the video queue is declared
@@ -156,8 +156,8 @@ Quick overview
conn.drain_events()
# Consume from several queues on the same channel:
- video_queue = Queue("video", exchange=media_exchange, key="video")
- image_queue = Queue("image", exchange=media_exchange, key="image")
+ video_queue = Queue('video', exchange=media_exchange, key='video')
+ image_queue = Queue('image', exchange=media_exchange, key='image')
with connection.Consumer([video_queue, image_queue],
callbacks=[process_media]) as consumer:
@@ -175,9 +175,9 @@ Or handle channels manually::
All objects can be used outside of with statements too,
just remember to close the objects after use::
- from kombu import BrokerConnection, Consumer, Producer
+ from kombu import Connection, Consumer, Producer
- connection = BrokerConnection()
+ connection = Connection()
# ...
connection.close()
@@ -204,9 +204,9 @@ that connections default channel.
::
- >>> exchange = Exchange("tasks", "direct")
+ >>> exchange = Exchange('tasks', 'direct')
- >>> connection = BrokerConnection()
+ >>> connection = Connection()
>>> bound_exchange = exchange(connection)
>>> bound_exchange.delete()
diff --git a/docs/reference/kombu.connection.rst b/docs/reference/kombu.connection.rst
index 63c6b282..3b856bc5 100644
--- a/docs/reference/kombu.connection.rst
+++ b/docs/reference/kombu.connection.rst
@@ -10,7 +10,7 @@
Connection
----------
- .. autoclass:: BrokerConnection
+ .. autoclass:: Connection
.. admonition:: Attributes
@@ -44,8 +44,8 @@
.. seealso::
- The shortcut methods :meth:`BrokerConnection.Pool` and
- :meth:`BrokerConnection.ChannelPool` is the recommended way
+ The shortcut methods :meth:`Connection.Pool` and
+ :meth:`Connection.ChannelPool` is the recommended way
to instantiate these classes.
.. autoclass:: ConnectionPool
diff --git a/docs/reference/kombu.pidbox.rst b/docs/reference/kombu.pidbox.rst
index f47bfd4c..ebc72cd0 100644
--- a/docs/reference/kombu.pidbox.rst
+++ b/docs/reference/kombu.pidbox.rst
@@ -28,7 +28,7 @@
.. code-block:: python
- >>> connection = kombu.BrokerConnection()
+ >>> connection = kombu.Connection()
>>> state = {"beat": beat,
"connection": connection}
>>> consumer = mailbox(connection).Node(hostname).listen()
diff --git a/docs/userguide/connections.rst b/docs/userguide/connections.rst
index a1d7b176..14ff4aad 100644
--- a/docs/userguide/connections.rst
+++ b/docs/userguide/connections.rst
@@ -15,12 +15,12 @@ and you can even create your own. The default transport is amqplib.
Create a connection using the default transport::
- >>> from kombu import BrokerConnection
- >>> connection = BrokerConnection("amqp://guest:guest@localhost:5672//")
+ >>> from kombu import Connection
+ >>> connection = Connection('amqp://guest:guest@localhost:5672//')
The connection will not be established yet, as the connection is established
when needed. If you want to explicitly establish the connection
-you have to call the :meth:`~kombu.connection.BrokerConnection.connect`
+you have to call the :meth:`~kombu.connection.Connection.connect`
method::
>>> connection.connect()
@@ -49,7 +49,7 @@ Of course, the connection can be used as a context, and you are
encouraged to do so as it makes it harder to forget releasing open
resources::
- with BrokerConnection() as connection:
+ with Connection() as connection:
# work with connection
.. _connection-urls:
@@ -87,7 +87,7 @@ which is using the localhost host, default port, user name `guest`,
password `guest` and virtual host "/". A connection without arguments
is the same as::
- >>> BrokerConnection("amqp://guest:guest@localhost:5672//")
+ >>> Connection('amqp://guest:guest@localhost:5672//')
The default port is transport specific, for AMQP this is 5672.
@@ -100,7 +100,7 @@ the redis database number.
Keyword arguments
=================
-The :class:`BrokerConnection` class supports additional
+The :class:`~kombu.connection.Connection` class supports additional
keyword arguments, these are:
:hostname: Default host name if not provided in the URL.
diff --git a/docs/userguide/pools.rst b/docs/userguide/pools.rst
index 79c959ed..d921bc46 100644
--- a/docs/userguide/pools.rst
+++ b/docs/userguide/pools.rst
@@ -31,25 +31,25 @@ instance to support multiple connections in the same app.
All connection instances with the same connection parameters will
get the same pool::
- >>> from kombu import BrokerConnection
+ >>> from kombu import Connection
>>> from kombu.pools import connections
- >>> connections[BrokerConnection("redis://localhost:6379")]
+ >>> connections[Connection('redis://localhost:6379')]
<kombu.connection.ConnectionPool object at 0x101805650>
- >>> connections[BrokerConnection("redis://localhost:6379")]
+ >>> connections[Connection('redis://localhost:6379')]
<kombu.connection.ConnectionPool object at 0x101805650>
Let's acquire and release a connection:
.. code-block:: python
- from kombu import BrokerConnection
+ from kombu import Connection
from kombu.pools import connections
- connection = BrokerConnection("redis://localhost:6379")
+ connection = Connection('redis://localhost:6379')
with connections[connection].acquire(block=True) as conn:
- print("Got connection: %r" % (connection.as_uri(), ))
+ print('Got connection: %r' % (connection.as_uri(), ))
.. note::
@@ -69,11 +69,11 @@ at once you can do that too:
.. code-block:: python
- from kombu import BrokerConnection
+ from kombu import Connection
from kombu.pools import connections
- c1 = BrokerConnection("amqp://")
- c2 = BrokerConnection("redis://")
+ c1 = Connection('amqp://')
+ c2 = Connection('redis://')
with connections[c1].acquire(block=True) as conn1:
with connections[c2].acquire(block=True) as conn2:
@@ -93,27 +93,27 @@ to the ``news`` exchange:
.. code-block:: python
- from kombu import BrokerConnection, Exchange
+ from kombu import Connection, Exchange
from kombu.common import maybe_declare
from kombu.pools import producers
# The exchange we send our news articles to.
- news_exchange = Exchange("news")
+ news_exchange = Exchange('news')
# The article we want to send
- article = {"title": "No cellular coverage on the tube for 2012",
- "ingress": "yadda yadda yadda"}
+ article = {'title': 'No cellular coverage on the tube for 2012',
+ 'ingress': 'yadda yadda yadda'}
# The broker where our exchange is.
- connection = BrokerConnection("amqp://guest:guest@localhost:5672//")
+ connection = Connection('amqp://guest:guest@localhost:5672//')
with producers[connection].acquire(block=True) as producer:
# maybe_declare knows what entities have already been declared
# so we don't have to do so multiple times in the same process.
maybe_declare(news_exchange)
- producer.publish(article, routing_key="domestic",
- serializer="json",
- compression="zlib")
+ producer.publish(article, routing_key='domestic',
+ serializer='json',
+ compression='zlib')
.. _default-pool-limits:
@@ -153,12 +153,12 @@ instances:
.. code-block:: python
from kombu import pools
- from kombu import BrokerConnection
+ from kombu import Connection
connections = pools.Connection(limit=100)
producers = pools.Producers(limit=connections.limit)
- connection = BrokerConnection("amqp://guest:guest@localhost:5672//")
+ connection = Connection('amqp://guest:guest@localhost:5672//')
with connections[connection].acquire(block=True):
# ...
diff --git a/docs/userguide/simple.rst b/docs/userguide/simple.rst
index 650b8b28..81aeb641 100644
--- a/docs/userguide/simple.rst
+++ b/docs/userguide/simple.rst
@@ -18,10 +18,10 @@ two arguments, a connection channel and a name. The name is used as the
queue, exchange and routing key. If the need arises, you can specify
a :class:`~kombu.entity.Queue` as the name argument instead.
-In addition, the :class:`~kombu.connection.BrokerConnection` comes with
+In addition, the :class:`~kombu.connection.Connection` comes with
shortcuts to create simple queues using the current connection::
- >>> queue = connection.SimpleQueue("myqueue")
+ >>> queue = connection.SimpleQueue('myqueue')
>>> # ... do something with queue
>>> queue.close()
@@ -56,23 +56,23 @@ to produce and consume logging messages:
from socket import gethostname
from time import time
- from kombu import BrokerConnection
+ from kombu import Connection
class Logger(object):
- def __init__(self, connection, queue_name="log_queue",
- serializer="json", compression=None):
+ def __init__(self, connection, queue_name='log_queue',
+ serializer='json', compression=None):
self.queue = connection.SimpleQueue(self.queue_name)
self.serializer = serializer
self.compression = compression
- def log(self, message, level="INFO", context={}):
- self.queue.put({"message": message,
- "level": level,
- "context": context,
- "hostname": socket.gethostname(),
- "timestamp": time()},
+ def log(self, message, level='INFO', context={}):
+ self.queue.put({'message': message,
+ 'level': level,
+ 'context': context,
+ 'hostname': socket.gethostname(),
+ 'timestamp': time()},
serializer=self.serializer,
compression=self.compression)
@@ -87,28 +87,28 @@ to produce and consume logging messages:
self.queue.close()
- if __name__ == "__main__":
+ if __name__ == '__main__':
from contextlib import closing
- with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn:
+ with Connection('amqp://guest:guest@localhost:5672//') as conn:
with closing(Logger(connection)) as logger:
# Send message
- logger.log("Error happened while encoding video",
- level="ERROR",
- context={"filename": "cutekitten.mpg"})
+ logger.log('Error happened while encoding video',
+ level='ERROR',
+ context={'filename': 'cutekitten.mpg'})
# Consume and process message
# This is the callback called when a log message is
# received.
def dump_entry(entry):
- date = datetime.fromtimestamp(entry["timestamp"])
- print("[%s %s %s] %s %r" % (date,
- entry["hostname"],
- entry["level"],
- entry["message"],
- entry["context"]))
+ date = datetime.fromtimestamp(entry['timestamp'])
+ print('[%s %s %s] %s %r' % (date,
+ entry['hostname'],
+ entry['level'],
+ entry['message'],
+ entry['context']))
# Process a single message using the callback above.
logger.process(dump_entry, n=1)
diff --git a/examples/complete_receive.py b/examples/complete_receive.py
index 416fa2d6..aa3987bf 100644
--- a/examples/complete_receive.py
+++ b/examples/complete_receive.py
@@ -2,15 +2,16 @@
Example of simple consumer that waits for a single message, acknowledges it
and exits.
"""
+
from __future__ import with_statement
-from kombu import Connection, Exchange, Queue
+from kombu import Connection, Exchange, Queue, Consumer, eventloop
from pprint import pformat
#: By default messages sent to exchanges are persistent (delivery_mode=2),
#: and queues and exchanges are durable.
-exchange = Exchange("kombu_demo", type="direct")
-queue = Queue("kombu_demo", exchange, routing_key="kombu_demo")
+exchange = Exchange('kombu_demo', type='direct')
+queue = Queue('kombu_demo', exchange, routing_key='kombu_demo')
def pretty(obj):
@@ -19,17 +20,23 @@ def pretty(obj):
#: This is the callback applied when a message is received.
def handle_message(body, message):
- print("Received message: %r" % (body, ))
- print(" properties:\n%s" % (pretty(message.properties), ))
- print(" delivery_info:\n%s" % (pretty(message.delivery_info), ))
+ print('Received message: %r' % (body, ))
+ print(' properties:\n%s' % (pretty(message.properties), ))
+ print(' delivery_info:\n%s' % (pretty(message.delivery_info), ))
message.ack()
-with Connection("amqp://guest:guest@localhost:5672//") as connection:
+#: Create a connection and a channel.
+#: If hostname, userid, password and virtual_host is not specified
+#: the values below are the default, but listed here so it can
+#: be easily changed.
+with Connection('amqp://guest:guest@localhost:5672//') as connection:
+
#: Create consumer using our callback and queue.
#: Second argument can also be a list to consume from
#: any number of queues.
- with connection.Consumer(queue, callbacks=[handle_message]):
+ with Consumer(connection, queue, callbacks=[handle_message]):
+
#: This waits for a single event. Note that this event may not
#: be a message, or a message that is to be delivered to the consumers
#: channel, but any event received on the connection.
- connection.drain_events(timeout=10)
+ eventloop(connection, limit=1, timeout=10.0)
diff --git a/examples/complete_receive_manual.py b/examples/complete_receive_manual.py
deleted file mode 100644
index bf618e46..00000000
--- a/examples/complete_receive_manual.py
+++ /dev/null
@@ -1,45 +0,0 @@
-"""
-Example of simple consumer that waits for a single message, acknowledges it
-and exits.
-"""
-
-from kombu import BrokerConnection, Exchange, Queue, Consumer
-from pprint import pformat
-
-#: By default messages sent to exchanges are persistent (delivery_mode=2),
-#: and queues and exchanges are durable.
-exchange = Exchange("kombu_demo", type="direct")
-queue = Queue("kombu_demo", exchange, routing_key="kombu_demo")
-
-
-def pretty(obj):
- return pformat(obj, indent=4)
-
-
-#: This is the callback applied when a message is received.
-def handle_message(body, message):
- print("Received message: %r" % (body, ))
- print(" properties:\n%s" % (pretty(message.properties), ))
- print(" delivery_info:\n%s" % (pretty(message.delivery_info), ))
- message.ack()
-
-#: Create a connection and a channel.
-#: If hostname, userid, password and virtual_host is not specified
-#: the values below are the default, but listed here so it can
-#: be easily changed.
-connection = BrokerConnection(hostname="localhost",
- userid="guest",
- password="guest",
- virtual_host="/")
-channel = connection.channel()
-
-#: Create consumer using our callback and queue.
-#: Second argument can also be a list to consume from
-#: any number of queues.
-consumer = Consumer(channel, queue, callbacks=[handle_message])
-consumer.consume()
-
-#: This waits for a single event. Note that this event may not
-#: be a message, or a message that is to be delivered to the consumers
-#: channel, but any event received on the connection.
-connection.drain_events()
diff --git a/examples/complete_send.py b/examples/complete_send.py
index faa6a1a1..c81f8620 100644
--- a/examples/complete_send.py
+++ b/examples/complete_send.py
@@ -7,26 +7,26 @@ You can use `complete_receive.py` to receive the message sent.
"""
from __future__ import with_statement
-from kombu import Connection, Exchange, Queue
+from kombu import Connection, Producer, Exchange, Queue
#: By default messages sent to exchanges are persistent (delivery_mode=2),
#: and queues and exchanges are durable.
-exchange = Exchange("kombu_demo", type="direct")
-queue = Queue("kombu_demo", exchange, routing_key="kombu_demo")
+exchange = Exchange('kombu_demo', type='direct')
+queue = Queue('kombu_demo', exchange, routing_key='kombu_demo')
-with Connection("amqp://guest:guest@localhost:5672//") as connection:
+with Connection('amqp://guest:guest@localhost:5672//') as connection:
#: Producers are used to publish messages.
#: a default exchange and routing key can also be specifed
#: as arguments the Producer, but we rather specify this explicitly
#: at the publish call.
- producer = connection.Producer()
+ producer = Producer(connection)
#: Publish the message using the json serializer (which is the default),
#: and zlib compression. The kombu consumer will automatically detect
- #: encoding, serializiation and compression used and decode accordingly.
- producer.publish({"hello": "world"},
+ #: encoding, serialization and compression used and decode accordingly.
+ producer.publish({'hello': 'world'},
exchange=exchange,
- routing_key="kombu_demo",
- serializer="json", compression="zlib")
+ routing_key='kombu_demo',
+ serializer='json', compression='zlib')
diff --git a/examples/complete_send_manual.py b/examples/complete_send_manual.py
deleted file mode 100644
index 9e3b3252..00000000
--- a/examples/complete_send_manual.py
+++ /dev/null
@@ -1,35 +0,0 @@
-"""
-
-Example producer that sends a single message and exits.
-
-You can use `complete_receive.py` to receive the message sent.
-
-"""
-
-from kombu import BrokerConnection, Exchange, Queue, Producer
-
-#: By default messages sent to exchanges are persistent (delivery_mode=2),
-#: and queues and exchanges are durable.
-exchange = Exchange("kombu_demo", type="direct")
-queue = Queue("kombu_demo", exchange, routing_key="kombu_demo")
-
-
-#: Create connection and channel.
-#: If hostname, userid, password and virtual_host is not specified
-#: the values below are the default, but listed here so it can
-#: be easily changed.
-connection = BrokerConnection(hostname="localhost",
- userid="guest",
- password="guest",
- virtual_host="/")
-channel = connection.channel()
-
-#: Producers are used to publish messages.
-#: Routing keys can also be specifed as an argument to `publish`.
-producer = Producer(channel, exchange, routing_key="kombu_demo")
-
-#: Publish the message using the json serializer (which is the default),
-#: and zlib compression. The kombu consumer will automatically detect
-#: encoding, serializiation and compression used and decode accordingly.
-producer.publish({"hello": "world"}, serializer="json",
- compression="zlib")
diff --git a/examples/simple_eventlet_receive.py b/examples/simple_eventlet_receive.py
index cce518cc..a8208650 100644
--- a/examples/simple_eventlet_receive.py
+++ b/examples/simple_eventlet_receive.py
@@ -6,12 +6,13 @@ You can use `simple_receive.py` (or `complete_receive.py`) to receive the
message sent.
"""
+from __future__ import with_statement
+
import eventlet
-from eventlet import spawn
from Queue import Empty
-from kombu import BrokerConnection
+from kombu import Connection
eventlet.monkey_patch()
@@ -22,25 +23,21 @@ def wait_many(timeout=1):
#: If hostname, userid, password and virtual_host is not specified
#: the values below are the default, but listed here so it can
#: be easily changed.
- connection = BrokerConnection("amqp://guest:guest@localhost:5672//")
-
- #: SimpleQueue mimics the interface of the Python Queue module.
- #: First argument can either be a queue name or a kombu.Queue object.
- #: If a name, then the queue will be declared with the name as the queue
- #: name, exchange name and routing key.
- queue = connection.SimpleQueue("kombu_demo")
-
- while True:
- try:
- message = queue.get(block=False, timeout=timeout)
- except Empty:
- break
- else:
- spawn(message.ack)
- print(message.payload)
-
- queue.close()
- connection.close()
-
+ with Connection('amqp://guest:guest@localhost:5672//') as connection:
+
+ #: SimpleQueue mimics the interface of the Python Queue module.
+ #: First argument can either be a queue name or a kombu.Queue object.
+ #: If a name, then the queue will be declared with the name as the
+ #: queue name, exchange name and routing key.
+ with connection.SimpleQueue('kombu_demo') as queue:
+
+ while True:
+ try:
+ message = queue.get(block=False, timeout=timeout)
+ except Empty:
+ break
+ else:
+ message.ack()
+ print(message.payload)
spawn(wait_many).wait()
diff --git a/examples/simple_eventlet_send.py b/examples/simple_eventlet_send.py
index 3d0641e1..f6259bf3 100644
--- a/examples/simple_eventlet_send.py
+++ b/examples/simple_eventlet_send.py
@@ -6,9 +6,11 @@ You can use `simple_receive.py` (or `complete_receive.py`) to receive the
message sent.
"""
+from __future__ import with_statement
+
import eventlet
-from kombu import BrokerConnection
+from kombu import Connection
eventlet.monkey_patch()
@@ -19,24 +21,22 @@ def send_many(n):
#: If hostname, userid, password and virtual_host is not specified
#: the values below are the default, but listed here so it can
#: be easily changed.
- connection = BrokerConnection("amqp://guest:guest@localhost:5672//")
+ with Connection('amqp://guest:guest@localhost:5672//') as connection:
- #: SimpleQueue mimics the interface of the Python Queue module.
- #: First argument can either be a queue name or a kombu.Queue object.
- #: If a name, then the queue will be declared with the name as the queue
- #: name, exchange name and routing key.
- queue = connection.SimpleQueue("kombu_demo")
+ #: SimpleQueue mimics the interface of the Python Queue module.
+ #: First argument can either be a queue name or a kombu.Queue object.
+ #: If a name, then the queue will be declared with the name as the
+ #: queue name, exchange name and routing key.
+ with connection.SimpleQueue('kombu_demo') as queue:
- def send_message(i):
- queue.put({"hello": "world%s" % (i, )})
+ def send_message(i):
+ queue.put({'hello': 'world%s' % (i, )})
- pool = eventlet.GreenPool(10)
- for i in xrange(n):
- pool.spawn(send_message, i)
- pool.waitall()
+ pool = eventlet.GreenPool(10)
+ for i in xrange(n):
+ pool.spawn(send_message, i)
+ pool.waitall()
- queue.close()
- connection.close()
-if __name__ == "__main__":
+if __name__ == '__main__':
send_many(10)
diff --git a/examples/simple_receive.py b/examples/simple_receive.py
index 9d53aa35..e9b933b1 100644
--- a/examples/simple_receive.py
+++ b/examples/simple_receive.py
@@ -4,19 +4,19 @@ Example receiving a message using the SimpleQueue interface.
from __future__ import with_statement
-from kombu import BrokerConnection
+from kombu import Connection
#: Create connection
#: If hostname, userid, password and virtual_host is not specified
#: the values below are the default, but listed here so it can
#: be easily changed.
-with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn:
+with Connection('amqp://guest:guest@localhost:5672//') as conn:
#: SimpleQueue mimics the interface of the Python Queue module.
#: First argument can either be a queue name or a kombu.Queue object.
#: If a name, then the queue will be declared with the name as the queue
#: name, exchange name and routing key.
- with conn.SimpleQueue("kombu_demo") as queue:
+ with conn.SimpleQueue('kombu_demo') as queue:
message = queue.get(block=True, timeout=10)
message.ack()
print(message.payload)
diff --git a/examples/simple_send.py b/examples/simple_send.py
index 76cdb9b5..e76fbcf0 100644
--- a/examples/simple_send.py
+++ b/examples/simple_send.py
@@ -8,20 +8,20 @@ message sent.
"""
from __future__ import with_statement
-from kombu import BrokerConnection
+from kombu import Connection
#: Create connection
#: If hostname, userid, password and virtual_host is not specified
#: the values below are the default, but listed here so it can
#: be easily changed.
-with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn:
+with Connection('amqp://guest:guest@localhost:5672//') as conn:
#: SimpleQueue mimics the interface of the Python Queue module.
#: First argument can either be a queue name or a kombu.Queue object.
#: If a name, then the queue will be declared with the name as the queue
#: name, exchange name and routing key.
- with conn.SimpleQueue("kombu_demo") as queue:
- queue.put({"hello": "world"}, serializer="json", compression="zlib")
+ with conn.SimpleQueue('kombu_demo') as queue:
+ queue.put({'hello': 'world'}, serializer='json', compression='zlib')
#####
diff --git a/examples/simple_task_queue/client.py b/examples/simple_task_queue/client.py
index 1ab6175a..e07b8c45 100644
--- a/examples/simple_task_queue/client.py
+++ b/examples/simple_task_queue/client.py
@@ -5,25 +5,25 @@ from kombu.pools import producers
from queues import task_exchange
-priority_to_routing_key = {"high": "hipri",
- "mid": "midpri",
- "low": "lopri"}
+priority_to_routing_key = {'high': 'hipri',
+ 'mid': 'midpri',
+ 'low': 'lopri'}
-def send_as_task(connection, fun, args=(), kwargs={}, priority="mid"):
- payload = {"fun": fun, "args": args, "kwargs": kwargs}
+def send_as_task(connection, fun, args=(), kwargs={}, priority='mid'):
+ payload = {'fun': fun, 'args': args, 'kwargs': kwargs}
routing_key = priority_to_routing_key[priority]
with producers[connection].acquire(block=True) as producer:
maybe_declare(task_exchange, producer.channel)
- producer.publish(payload, serializer="pickle",
- compression="bzip2",
+ producer.publish(payload, serializer='pickle',
+ compression='bzip2',
routing_key=routing_key)
-if __name__ == "__main__":
- from kombu import BrokerConnection
+if __name__ == '__main__':
+ from kombu import Connection
from tasks import hello_task
- connection = BrokerConnection("amqp://guest:guest@localhost:5672//")
- send_as_task(connection, fun=hello_task, args=("Kombu", ), kwargs={},
- priority="high")
+ connection = Connection('amqp://guest:guest@localhost:5672//')
+ send_as_task(connection, fun=hello_task, args=('Kombu', ), kwargs={},
+ priority='high')
diff --git a/examples/simple_task_queue/queues.py b/examples/simple_task_queue/queues.py
index 680e7575..602c2b0e 100644
--- a/examples/simple_task_queue/queues.py
+++ b/examples/simple_task_queue/queues.py
@@ -1,6 +1,6 @@
from kombu import Exchange, Queue
-task_exchange = Exchange("tasks", type="direct")
-task_queues = [Queue("hipri", task_exchange, routing_key="hipri"),
- Queue("midpri", task_exchange, routing_key="midpri"),
- Queue("lopri", task_exchange, routing_key="lopri")]
+task_exchange = Exchange('tasks', type='direct')
+task_queues = [Queue('hipri', task_exchange, routing_key='hipri'),
+ Queue('midpri', task_exchange, routing_key='midpri'),
+ Queue('lopri', task_exchange, routing_key='lopri')]
diff --git a/examples/simple_task_queue/worker.py b/examples/simple_task_queue/worker.py
index 063a6b4d..3d933c4b 100644
--- a/examples/simple_task_queue/worker.py
+++ b/examples/simple_task_queue/worker.py
@@ -16,23 +16,23 @@ class Worker(ConsumerMixin):
callbacks=[self.process_task])]
def process_task(self, body, message):
- fun = body["fun"]
- args = body["args"]
- kwargs = body["kwargs"]
- self.info("Got task: %s", reprcall(fun.__name__, args, kwargs))
+ fun = body['fun']
+ args = body['args']
+ kwargs = body['kwargs']
+ self.info('Got task: %s', reprcall(fun.__name__, args, kwargs))
try:
fun(*args, **kwdict(kwargs))
except Exception, exc:
- self.error("task raised exception: %r", exc)
+ self.error('task raised exception: %r', exc)
message.ack()
-if __name__ == "__main__":
- from kombu import BrokerConnection
+if __name__ == '__main__':
+ from kombu import Connection
from kombu.utils.debug import setup_logging
- setup_logging(loglevel="INFO")
+ setup_logging(loglevel='INFO')
- with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn:
+ with Connection('amqp://guest:guest@localhost:5672//') as conn:
try:
Worker(conn).run()
except KeyboardInterrupt:
- print("bye bye")
+ print('bye bye')
diff --git a/funtests/transport.py b/funtests/transport.py
index 76a0e010..5924a72a 100644
--- a/funtests/transport.py
+++ b/funtests/transport.py
@@ -9,7 +9,7 @@ import weakref
from nose import SkipTest
-from kombu import BrokerConnection
+from kombu import Connection
from kombu import Exchange, Queue
from kombu.tests.utils import skip_if_quick
@@ -104,7 +104,7 @@ class TransportCase(unittest.TestCase):
options.setdefault("userid", self.userid)
if self.password:
options.setdefault("password", self.password)
- return BrokerConnection(transport=self.transport, **options)
+ return Connection(transport=self.transport, **options)
def do_connect(self):
self.connection = self.get_connection(**self.connection_options)
diff --git a/kombu/abstract.py b/kombu/abstract.py
index daa4077f..97a50103 100644
--- a/kombu/abstract.py
+++ b/kombu/abstract.py
@@ -85,7 +85,7 @@ class MaybeChannelBound(Object):
def revive(self, channel):
"""Revive channel after the connection has been re-established.
- Used by :meth:`~kombu.connection.BrokerConnection.ensure`.
+ Used by :meth:`~kombu.Connection.ensure`.
"""
if self.is_bound:
diff --git a/kombu/common.py b/kombu/common.py
index 4ecfc2af..42bf0a67 100644
--- a/kombu/common.py
+++ b/kombu/common.py
@@ -26,7 +26,8 @@ from .utils import uuid
__all__ = ['Broadcast', 'maybe_declare', 'uuid',
'itermessages', 'send_reply', 'isend_reply',
- 'collect_replies', 'insured', 'ipublish']
+ 'collect_replies', 'insured', 'ipublish', 'drain_consumer',
+ 'eventloop']
insured_logger = Log('kombu.insurance')
diff --git a/kombu/connection.py b/kombu/connection.py
index 03fdc2eb..ff049c81 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -32,14 +32,13 @@ from .utils.url import parse_url
_LOG_CONNECTION = os.environ.get('KOMBU_LOG_CONNECTION', False)
_LOG_CHANNEL = os.environ.get('KOMBU_LOG_CHANNEL', False)
-__all__ = ['parse_url', 'BrokerConnection', 'Resource',
- 'ConnectionPool', 'ChannelPool']
+__all__ = ['Connection', 'ConnectionPool', 'ChannelPool']
URI_PASSTHROUGH = frozenset(['sqla', 'sqlalchemy'])
logger = get_logger(__name__)
-class BrokerConnection(object):
+class Connection(object):
"""A connection to the broker.
:param URL: Connection URL.
@@ -552,7 +551,7 @@ class BrokerConnection(object):
def __repr__(self):
"""``x.__repr__() <==> repr(x)``"""
- return '<BrokerConnection: %s at 0x%x>' % (self.as_uri(), id(self))
+ return '<Connection: %s at 0x%x>' % (self.as_uri(), id(self))
def __copy__(self):
"""``x.__copy__() <==> copy(x)``"""
@@ -634,7 +633,7 @@ class BrokerConnection(object):
@property
def is_evented(self):
return getattr(self.transport, 'on_poll_start', None)
-Connection = BrokerConnection
+BrokerConnection = Connection
class Resource(object):
@@ -851,6 +850,6 @@ class ChannelPool(Resource):
def maybe_channel(channel):
"""Returns channel, or returns the default_channel if it's a
connection."""
- if isinstance(channel, BrokerConnection):
+ if isinstance(channel, Connection):
return channel.default_channel
return channel
diff --git a/kombu/mixins.py b/kombu/mixins.py
index 25168b03..50f1d499 100644
--- a/kombu/mixins.py
+++ b/kombu/mixins.py
@@ -33,7 +33,7 @@ class ConsumerMixin(LogMixin):
(eventlet/gevent) too.
The basic class would need a :attr:`connection` attribute
- which must be a :class:`~kombu.connection.BrokerConnection` instance,
+ which must be a :class:`~kombu.Connection` instance,
and define a :meth:`get_consumers` method that returns a list
of :class:`kombu.messaging.Consumer` instances to use.
Supporting multiple consumers is important so that multiple
diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py
index eb464855..ae8a726f 100644
--- a/kombu/tests/test_connection.py
+++ b/kombu/tests/test_connection.py
@@ -5,8 +5,8 @@ import pickle
from nose import SkipTest
-from kombu.connection import BrokerConnection, Resource, parse_url
-from kombu.messaging import Consumer, Producer
+from kombu import Connection, Consumer, Producer, parse_url
+from kombu.connection import Resource
from .mocks import Transport
from .utils import TestCase
@@ -36,7 +36,7 @@ class test_connection_utils(TestCase):
self.assertEqual(result['hostname'], 'example.com/')
def test_parse_generated_as_uri(self):
- conn = BrokerConnection(self.url)
+ conn = Connection(self.url)
info = conn.info()
for k, v in self.expected.items():
self.assertEqual(info[k], v)
@@ -46,12 +46,12 @@ class test_connection_utils(TestCase):
@skip_if_not_module('pymongo')
def test_as_uri_when_mongodb(self):
- x = BrokerConnection('mongodb://localhost')
+ x = Connection('mongodb://localhost')
self.assertTrue(x.as_uri())
def test_bogus_scheme(self):
with self.assertRaises(KeyError):
- BrokerConnection('bogus://localhost:7421').transport
+ Connection('bogus://localhost:7421').transport
def assert_info(self, conn, **fields):
info = conn.info()
@@ -60,79 +60,77 @@ class test_connection_utils(TestCase):
def test_rabbitmq_example_urls(self):
# see Appendix A of http://www.rabbitmq.com/uri-spec.html
- C = BrokerConnection
self.assert_info(
- C('amqp://user:pass@host:10000/vhost'),
+ Connection('amqp://user:pass@host:10000/vhost'),
userid='user', password='pass', hostname='host',
port=10000, virtual_host='vhost')
self.assert_info(
- C('amqp://user%61:%61pass@ho%61st:10000/v%2fhost'),
+ Connection('amqp://user%61:%61pass@ho%61st:10000/v%2fhost'),
userid='usera', password='apass',
hostname='hoast', port=10000,
virtual_host='v/host')
self.assert_info(
- C('amqp://'),
+ Connection('amqp://'),
userid='guest', password='guest',
hostname='localhost', port=5672,
virtual_host='/')
self.assert_info(
- C('amqp://:@/'),
+ Connection('amqp://:@/'),
userid='guest', password='guest',
hostname='localhost', port=5672,
virtual_host='/')
self.assert_info(
- C('amqp://user@/'),
+ Connection('amqp://user@/'),
userid='user', password='guest',
hostname='localhost', port=5672,
virtual_host='/')
self.assert_info(
- C('amqp://user:pass@/'),
+ Connection('amqp://user:pass@/'),
userid='user', password='pass',
hostname='localhost', port=5672,
virtual_host='/')
self.assert_info(
- C('amqp://host'),
+ Connection('amqp://host'),
userid='guest', password='guest',
hostname='host', port=5672,
virtual_host='/')
self.assert_info(
- C('amqp://:10000'),
+ Connection('amqp://:10000'),
userid='guest', password='guest',
hostname='localhost', port=10000,
virtual_host='/')
self.assert_info(
- C('amqp:///vhost'),
+ Connection('amqp:///vhost'),
userid='guest', password='guest',
hostname='localhost', port=5672,
virtual_host='vhost')
self.assert_info(
- C('amqp://host/'),
+ Connection('amqp://host/'),
userid='guest', password='guest',
hostname='host', port=5672,
virtual_host='/')
self.assert_info(
- C('amqp://host/%2f'),
+ Connection('amqp://host/%2f'),
userid='guest', password='guest',
hostname='host', port=5672,
virtual_host='/')
def test_url_IPV6(self):
- C = BrokerConnection
raise SkipTest("urllib can't parse ipv6 urls")
self.assert_info(
- C('amqp://[::1]'),
+ Connection('amqp://[::1]'),
userid='guest', password='guest',
hostname='[::1]', port=5672,
virtual_host='/')
@@ -141,7 +139,7 @@ class test_connection_utils(TestCase):
class test_Connection(TestCase):
def setUp(self):
- self.conn = BrokerConnection(port=5672, transport=Transport)
+ self.conn = Connection(port=5672, transport=Transport)
def test_establish_connection(self):
conn = self.conn
@@ -177,7 +175,7 @@ class test_Connection(TestCase):
def close_connection(self, connection):
raise _CustomError('foo')
- conn = BrokerConnection(transport=MyTransport)
+ conn = Connection(transport=MyTransport)
conn.connect()
conn.close()
self.assertTrue(conn._closed)
@@ -190,7 +188,7 @@ class test_Connection(TestCase):
def test_close_when_default_channel_close_raises(self):
- class Conn(BrokerConnection):
+ class Conn(Connection):
@property
def connection_errors(self):
@@ -299,7 +297,7 @@ class test_Connection(TestCase):
class MyTransport(Transport):
channel_errors = (KeyError, ValueError)
- conn = BrokerConnection(transport=MyTransport)
+ conn = Connection(transport=MyTransport)
self.assertTupleEqual(conn.channel_errors, (KeyError, ValueError))
def test_connection_errors(self):
@@ -307,7 +305,7 @@ class test_Connection(TestCase):
class MyTransport(Transport):
connection_errors = (KeyError, ValueError)
- conn = BrokerConnection(transport=MyTransport)
+ conn = Connection(transport=MyTransport)
self.assertTupleEqual(conn.connection_errors, (KeyError, ValueError))
@@ -316,8 +314,8 @@ class test_Connection_with_transport_options(TestCase):
transport_options = {'pool_recycler': 3600, 'echo': True}
def setUp(self):
- self.conn = BrokerConnection(port=5672, transport=Transport,
- transport_options=self.transport_options)
+ self.conn = Connection(port=5672, transport=Transport,
+ transport_options=self.transport_options)
def test_establish_connection(self):
conn = self.conn
@@ -433,8 +431,7 @@ class test_ConnectionPool(ResourceCase):
abstract = False
def create_resource(self, limit, preload):
- return BrokerConnection(port=5672, transport=Transport) \
- .Pool(limit, preload)
+ return Connection(port=5672, transport=Transport).Pool(limit, preload)
def test_setup(self):
P = self.create_resource(10, 2)
@@ -450,7 +447,7 @@ class test_ConnectionPool(ResourceCase):
def test_prepare_not_callable(self):
P = self.create_resource(None, None)
- conn = BrokerConnection('memory://')
+ conn = Connection('memory://')
self.assertIs(P.prepare(conn), conn)
def test_acquire_channel(self):
@@ -463,7 +460,7 @@ class test_ChannelPool(ResourceCase):
abstract = False
def create_resource(self, limit, preload):
- return BrokerConnection(port=5672, transport=Transport) \
+ return Connection(port=5672, transport=Transport) \
.ChannelPool(limit, preload)
def test_setup(self):
@@ -481,6 +478,6 @@ class test_ChannelPool(ResourceCase):
def test_prepare_not_callable(self):
P = self.create_resource(10, 0)
- conn = BrokerConnection('memory://')
+ conn = Connection('memory://')
chan = conn.default_channel
self.assertIs(P.prepare(chan), chan)
diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py
index 783999a8..bb9f4610 100644
--- a/kombu/tests/test_messaging.py
+++ b/kombu/tests/test_messaging.py
@@ -5,7 +5,7 @@ import anyjson
from mock import patch
-from kombu.connection import BrokerConnection
+from kombu import Connection
from kombu.exceptions import MessageStateError
from kombu.messaging import Consumer, Producer
from kombu.entity import Exchange, Queue
@@ -19,7 +19,7 @@ class test_Producer(TestCase):
def setUp(self):
self.exchange = Exchange('foo', 'direct')
- self.connection = BrokerConnection(transport=Transport)
+ self.connection = Connection(transport=Transport)
self.connection.connect()
self.assertTrue(self.connection.connection.connected)
self.assertFalse(self.exchange.is_bound)
@@ -124,7 +124,7 @@ class test_Producer(TestCase):
def test_revive_when_channel_is_connection(self):
p = self.connection.Producer()
p.exchange = Mock()
- new_conn = BrokerConnection('memory://')
+ new_conn = Connection('memory://')
defchan = new_conn.default_channel
p.revive(new_conn)
@@ -188,7 +188,7 @@ class test_Producer(TestCase):
class test_Consumer(TestCase):
def setUp(self):
- self.connection = BrokerConnection(transport=Transport)
+ self.connection = Connection(transport=Transport)
self.connection.connect()
self.assertTrue(self.connection.connection.connected)
self.exchange = Exchange('foo', 'direct')
diff --git a/kombu/tests/test_pidbox.py b/kombu/tests/test_pidbox.py
index 72d974ea..6199efa7 100644
--- a/kombu/tests/test_pidbox.py
+++ b/kombu/tests/test_pidbox.py
@@ -3,8 +3,8 @@ from __future__ import with_statement
import socket
+from kombu import Connection
from kombu import pidbox
-from kombu.connection import BrokerConnection
from kombu.utils import uuid
from .utils import TestCase
@@ -24,7 +24,7 @@ class test_Mailbox(TestCase):
return 'COLLECTED'
self.mailbox = Mailbox('test_pidbox')
- self.connection = BrokerConnection(transport='memory')
+ self.connection = Connection(transport='memory')
self.state = {'var': 1}
self.handlers = {'mymethod': self._handler}
self.bound = self.mailbox(self.connection)
diff --git a/kombu/tests/test_simple.py b/kombu/tests/test_simple.py
index 7580e650..26b09080 100644
--- a/kombu/tests/test_simple.py
+++ b/kombu/tests/test_simple.py
@@ -3,7 +3,7 @@ from __future__ import with_statement
from Queue import Empty
-from kombu import BrokerConnection, Exchange, Queue
+from kombu import Connection, Exchange, Queue
from .utils import TestCase
from .utils import Mock
@@ -25,7 +25,7 @@ class SimpleBase(TestCase):
def setUp(self):
if not self.abstract:
- self.connection = BrokerConnection(transport='memory')
+ self.connection = Connection(transport='memory')
self.q = self.Queue(None, no_ack=True)
def tearDown(self):
diff --git a/kombu/tests/transport/test_amqplib.py b/kombu/tests/transport/test_amqplib.py
index 803af460..6d40a001 100644
--- a/kombu/tests/transport/test_amqplib.py
+++ b/kombu/tests/transport/test_amqplib.py
@@ -3,7 +3,7 @@ from __future__ import absolute_import
import sys
from kombu.transport import amqplib
-from kombu.connection import BrokerConnection
+from kombu.connection import Connection
from kombu.tests.utils import TestCase
from kombu.tests.utils import mask_modules, Mock
@@ -71,7 +71,7 @@ class test_Channel(TestCase):
class test_Transport(TestCase):
def setUp(self):
- self.connection = BrokerConnection('amqplib://')
+ self.connection = Connection('amqplib://')
self.transport = self.connection.transport
def test_create_channel(self):
@@ -134,7 +134,7 @@ class test_amqplib(TestCase):
class Transport(amqplib.Transport):
Connection = MockConnection
- c = BrokerConnection(port=None, transport=Transport).connect()
+ c = Connection(port=None, transport=Transport).connect()
self.assertEqual(c['host'],
'127.0.0.1:%s' % (Transport.default_port, ))
@@ -143,5 +143,5 @@ class test_amqplib(TestCase):
class Transport(amqplib.Transport):
Connection = MockConnection
- c = BrokerConnection(port=1337, transport=Transport).connect()
+ c = Connection(port=1337, transport=Transport).connect()
self.assertEqual(c['host'], '127.0.0.1:1337')
diff --git a/kombu/tests/transport/test_base.py b/kombu/tests/transport/test_base.py
index 691e132c..e369449f 100644
--- a/kombu/tests/transport/test_base.py
+++ b/kombu/tests/transport/test_base.py
@@ -1,7 +1,7 @@
from __future__ import absolute_import
from __future__ import with_statement
-from kombu import BrokerConnection, Consumer, Producer, Queue
+from kombu import Connection, Consumer, Producer, Queue
from kombu.transport.base import Message, StdChannel, Transport
from kombu.tests.utils import TestCase
@@ -11,7 +11,7 @@ from kombu.tests.utils import Mock
class test_StdChannel(TestCase):
def setUp(self):
- self.conn = BrokerConnection('memory://')
+ self.conn = Connection('memory://')
self.channel = self.conn.channel()
self.channel.queues.clear()
self.conn.connection.state.clear()
@@ -40,7 +40,7 @@ class test_StdChannel(TestCase):
class test_Message(TestCase):
def setUp(self):
- self.conn = BrokerConnection('memory://')
+ self.conn = Connection('memory://')
self.channel = self.conn.channel()
self.message = Message(self.channel, delivery_tag=313)
diff --git a/kombu/tests/transport/test_memory.py b/kombu/tests/transport/test_memory.py
index 2865d6a4..306a9e02 100644
--- a/kombu/tests/transport/test_memory.py
+++ b/kombu/tests/transport/test_memory.py
@@ -3,7 +3,7 @@ from __future__ import with_statement
import socket
-from kombu.connection import BrokerConnection
+from kombu.connection import Connection
from kombu.entity import Exchange, Queue
from kombu.messaging import Consumer, Producer
@@ -13,7 +13,7 @@ from kombu.tests.utils import TestCase
class test_MemoryTransport(TestCase):
def setUp(self):
- self.c = BrokerConnection(transport='memory')
+ self.c = Connection(transport='memory')
self.e = Exchange('test_transport_memory')
self.q = Queue('test_transport_memory',
exchange=self.e,
diff --git a/kombu/tests/transport/test_mongodb.py b/kombu/tests/transport/test_mongodb.py
index 1d4145be..b4aa7618 100644
--- a/kombu/tests/transport/test_mongodb.py
+++ b/kombu/tests/transport/test_mongodb.py
@@ -2,7 +2,7 @@ from __future__ import absolute_import
from nose import SkipTest
-from kombu.connection import BrokerConnection
+from kombu.connection import Connection
from kombu.tests.utils import TestCase, skip_if_not_module
@@ -27,23 +27,23 @@ class test_mongodb(TestCase):
Connection = MockConnection
url = 'mongodb://'
- c = BrokerConnection(url, transport=Transport).connect()
+ c = Connection(url, transport=Transport).connect()
client = c.channels[0].client
self.assertEquals(client.name, 'kombu_default')
self.assertEquals(client.connection.host, '127.0.0.1')
url = 'mongodb://localhost'
- c = BrokerConnection(url, transport=Transport).connect()
+ c = Connection(url, transport=Transport).connect()
client = c.channels[0].client
self.assertEquals(client.name, 'kombu_default')
url = 'mongodb://localhost/dbname'
- c = BrokerConnection(url, transport=Transport).connect()
+ c = Connection(url, transport=Transport).connect()
client = c.channels[0].client
self.assertEquals(client.name, 'dbname')
url = 'mongodb://localhost,example.org:29017/dbname'
- c = BrokerConnection(url, transport=Transport).connect()
+ c = Connection(url, transport=Transport).connect()
client = c.channels[0].client
nodes = client.connection.nodes
@@ -53,15 +53,15 @@ class test_mongodb(TestCase):
# Passing options breaks kombu's _init_params method
# url = 'mongodb://localhost,localhost2:29017/dbname?safe=true'
- # c = BrokerConnection(url, transport=Transport).connect()
+ # c = Connection(url, transport=Transport).connect()
# client = c.channels[0].client
url = 'mongodb://localhost:27017,localhost2:29017/dbname'
- c = BrokerConnection(url, transport=Transport).connect()
+ c = Connection(url, transport=Transport).connect()
client = c.channels[0].client
url = 'mongodb://username:password@localhost/dbname'
- c = BrokerConnection(url, transport=Transport).connect()
+ c = Connection(url, transport=Transport).connect()
# Assuming there's no user 'username' with password 'password'
# configured in mongodb
diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py
index 3d9fb361..90d4d8b4 100644
--- a/kombu/tests/transport/test_redis.py
+++ b/kombu/tests/transport/test_redis.py
@@ -9,7 +9,7 @@ from collections import defaultdict
from itertools import count
from Queue import Empty, Queue as _Queue
-from kombu.connection import BrokerConnection
+from kombu.connection import Connection
from kombu.entity import Exchange, Queue
from kombu.exceptions import InconsistencyError, VersionMismatch
from kombu.messaging import Consumer, Producer
@@ -217,7 +217,7 @@ class Transport(redis.Transport):
class test_Channel(TestCase):
def setUp(self):
- self.connection = BrokerConnection(transport=Transport)
+ self.connection = Connection(transport=Transport)
self.channel = self.connection.channel()
def test_basic_consume_when_fanout_queue(self):
@@ -468,7 +468,7 @@ class test_Channel(TestCase):
class test_Redis(TestCase):
def setUp(self):
- self.connection = BrokerConnection(transport=Transport)
+ self.connection = Connection(transport=Transport)
self.exchange = Exchange('test_Redis', type='direct')
self.queue = Queue('test_Redis', self.exchange, 'test_Redis')
@@ -489,7 +489,7 @@ class test_Redis(TestCase):
self.assertIsNone(self.queue(channel).get())
def test_publish__consume(self):
- connection = BrokerConnection(transport=Transport)
+ connection = Connection(transport=Transport)
channel = connection.channel()
producer = Producer(channel, self.exchange, routing_key='test_Redis')
consumer = Consumer(channel, self.queue)
@@ -526,45 +526,45 @@ class test_Redis(TestCase):
channel.close()
def test_db_values(self):
- c1 = BrokerConnection(virtual_host=1,
+ c1 = Connection(virtual_host=1,
transport=Transport).channel()
self.assertEqual(c1.client.db, 1)
- c2 = BrokerConnection(virtual_host='1',
+ c2 = Connection(virtual_host='1',
transport=Transport).channel()
self.assertEqual(c2.client.db, 1)
- c3 = BrokerConnection(virtual_host='/1',
+ c3 = Connection(virtual_host='/1',
transport=Transport).channel()
self.assertEqual(c3.client.db, 1)
with self.assertRaises(Exception):
- BrokerConnection(virtual_host='/foo',
- transport=Transport).channel()
+ Connection(virtual_host='/foo',
+ transport=Transport).channel()
def test_db_port(self):
- c1 = BrokerConnection(port=None, transport=Transport).channel()
+ c1 = Connection(port=None, transport=Transport).channel()
self.assertEqual(c1.client.port, Transport.default_port)
c1.close()
- c2 = BrokerConnection(port=9999, transport=Transport).channel()
+ c2 = Connection(port=9999, transport=Transport).channel()
self.assertEqual(c2.client.port, 9999)
c2.close()
def test_close_poller_not_active(self):
- c = BrokerConnection(transport=Transport).channel()
+ c = Connection(transport=Transport).channel()
cycle = c.connection.cycle
c.client.connection
c.close()
self.assertNotIn(c, cycle._channels)
def test_close_ResponseError(self):
- c = BrokerConnection(transport=Transport).channel()
+ c = Connection(transport=Transport).channel()
c.client.bgsave_raises_ResponseError = True
c.close()
def test_close_disconnects(self):
- c = BrokerConnection(transport=Transport).channel()
+ c = Connection(transport=Transport).channel()
conn1 = c.client.connection
conn2 = c.subclient.connection
c.close()
@@ -583,7 +583,7 @@ class test_Redis(TestCase):
@module_exists(myredis, exceptions)
def _do_test():
- conn = BrokerConnection(transport=Transport)
+ conn = Connection(transport=Transport)
chan = conn.channel()
self.assertTrue(chan.Client)
self.assertTrue(chan.ResponseError)
diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py
index d4a5fe3f..e08c4c14 100644
--- a/kombu/tests/transport/virtual/test_base.py
+++ b/kombu/tests/transport/virtual/test_base.py
@@ -5,7 +5,7 @@ import warnings
from mock import patch
-from kombu.connection import BrokerConnection
+from kombu import Connection
from kombu.exceptions import StdChannelError
from kombu.transport import virtual
from kombu.utils import uuid
@@ -16,12 +16,11 @@ from kombu.tests.utils import Mock, redirect_stdouts
def client(**kwargs):
- return BrokerConnection(transport='kombu.transport.virtual.Transport',
- **kwargs)
+ return Connection(transport='kombu.transport.virtual.Transport', **kwargs)
def memory_client():
- return BrokerConnection(transport='memory')
+ return Connection(transport='memory')
class test_BrokerState(TestCase):
diff --git a/kombu/tests/transport/virtual/test_exchange.py b/kombu/tests/transport/virtual/test_exchange.py
index 50498021..d1b2a85a 100644
--- a/kombu/tests/transport/virtual/test_exchange.py
+++ b/kombu/tests/transport/virtual/test_exchange.py
@@ -1,7 +1,7 @@
from __future__ import absolute_import
from __future__ import with_statement
-from kombu import BrokerConnection
+from kombu import Connection
from kombu.transport.virtual import exchange
from kombu.tests.mocks import Transport
@@ -14,8 +14,7 @@ class ExchangeCase(TestCase):
def setUp(self):
if self.type:
- self.e = self.type(BrokerConnection(transport=Transport)
- .channel())
+ self.e = self.type(Connection(transport=Transport).channel())
class test_Direct(ExchangeCase):
diff --git a/kombu/transport/base.py b/kombu/transport/base.py
index 8ae44419..bc934a84 100644
--- a/kombu/transport/base.py
+++ b/kombu/transport/base.py
@@ -177,7 +177,7 @@ class Transport(object):
"""Base class for transports."""
Management = Management
- #: The :class:`~kombu.connection.BrokerConnection` owning this instance.
+ #: The :class:`~kombu.Connection` owning this instance.
client = None
#: Default port used when no port has been specified.
diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py
index 22d345af..d08aa757 100644
--- a/kombu/transport/virtual/__init__.py
+++ b/kombu/transport/virtual/__init__.py
@@ -656,7 +656,7 @@ class Management(base.Management):
class Transport(base.Transport):
"""Virtual transport.
- :param client: :class:`~kombu.connection.BrokerConnection` instance
+ :param client: :class:`~kombu.Connection` instance
"""
Channel = Channel