summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsif Saif Uddin (Auvi) <auvipy@gmail.com>2020-06-03 09:58:20 +0600
committerAsif Saif Uddin (Auvi) <auvipy@gmail.com>2020-06-03 09:58:20 +0600
commit0f4a2af4a8767b7e0a4da46f65cc8e7ffd829f9b (patch)
treed9a1ed11973b2baa6e5469c487e0c757ea9ab092
parentb858bfb97bdfd5a790bb8435ef1dab7672b4530f (diff)
parentff7c1e304177e9eacc032fd0b875fe7ac0e6229e (diff)
downloadkombu-0f4a2af4a8767b7e0a4da46f65cc8e7ffd829f9b.tar.gz
Merge branch 'master' of https://github.com/celery/kombu
-rw-r--r--docs/userguide/consumers.rst81
-rw-r--r--kombu/connection.py5
-rw-r--r--t/unit/test_connection.py6
3 files changed, 77 insertions, 15 deletions
diff --git a/docs/userguide/consumers.rst b/docs/userguide/consumers.rst
index cb943b7b..fca4c0ce 100644
--- a/docs/userguide/consumers.rst
+++ b/docs/userguide/consumers.rst
@@ -9,7 +9,7 @@
Basics
======
-The :class:`Consumer` takes a connection (or channel) and a list of queues to
+The :class:`~kombu.messaging.Consumer` takes a connection (or channel) and a list of queues to
consume from. Several consumers can be mixed to consume from different
channels, as they all bind to the same connection, and ``drain_events`` will
drain events from all channels on that connection.
@@ -22,30 +22,85 @@ drain events from all channels on that connection.
.. code-block:: python
- Consumer(conn, accept=['json', 'pickle', 'msgpack', 'yaml'])
+ >>> Consumer(conn, accept=['json', 'pickle', 'msgpack', 'yaml'])
+You can create a consumer using a Connection. Consumer is consuming from single queue with name `'queue'`:
-Draining events from a single consumer:
+.. code-block:: python
+
+ >>> queue = Queue('queue', routing_key='queue')
+ >>> consumer = connection.Consumer(queue)
+
+You can also instantiate Consumer directly, it takes a channel or a connection as an argument. This consumer also
+consumes from single queue with name `'queue'`:
+
+.. code-block:: python
+
+ >>> queue = Queue('queue', routing_key='queue')
+ >>> with Connection('amqp://') as conn:
+ ... with conn.channel() as channel:
+ ... consumer = Consumer(channel, queue)
+
+Consumer needs to specify handler of received data. This handler specified in form of callback. Callback function is called
+by kombu library every time a new message is received. Callback is called with two parameters ``body`` containing deserialized
+data sent by producer and :class:`~kombu.message.Message` instance ``message``. User is also responsible for acknowledging of message when manual
+acknowledge is set.
+
+.. code-block:: python
+
+ >>> def callback(body, message):
+ ... print(body)
+ ... message.ack()
+
+ >>> consumer.register_callback(callback)
+
+Draining events from a single consumer. Method ``drain_events`` by default blocks indefinitely. This example sets timeout to 1 second:
.. code-block:: python
- with Consumer(connection, queues, accept=['json']):
- connection.drain_events(timeout=1)
+ >>> with consumer:
+ ... connection.drain_events(timeout=1)
+Draining events from several consumers. Each consumer has its own list of queues. Each consumer accepts `'json'` format of data:
+
+.. code-block:: python
-Draining events from several consumers:
+ >>> from kombu.utils.compat import nested
+
+ >>> queues1 = [Queue('queue11', routing_key='queue12')]
+ >>> queues2 = [Queue('queue21', routing_key='queue22')]
+ >>> with connection.channel(), connection.channel() as (channel1, channel2):
+ ... with nested(Consumer(channel1, queues1, accept=['json']),
+ ... Consumer(channel2, queues2, accept=['json'])):
+ ... connection.drain_events(timeout=1)
+
+The full example will look as follows:
.. code-block:: python
- from kombu.utils.compat import nested
+ from kombu import Connection, Consumer, Queue
+
+ def callback(body, message):
+ print('RECEIVED MESSAGE: {0!r}'.format(body))
+ message.ack()
+
+ queue1 = Queue('queue1', routing_key='queue1')
+ queue2 = Queue('queue2', routing_key='queue2')
- with connection.channel(), connection.channel() as (channel1, channel2):
- with nested(Consumer(channel1, queues1, accept=['json']),
- Consumer(channel2, queues2, accept=['json'])):
- connection.drain_events(timeout=1)
+ with Connection('amqp://') as conn:
+ with conn.channel() as channel:
+ consumer = Consumer(conn, [queue1, queue2], accept=['json'])
+ consumer.register_callback(callback)
+ with consumer:
+ conn.drain_events(timeout=1)
+Consumer mixin classes
+======================
-Or using :class:`~kombu.mixins.ConsumerMixin`:
+Kombu provides predefined mixin classes in module :py:mod:`~kombu.mixins`. It contains two classes:
+:class:`~kombu.mixins.ConsumerMixin` for creating consumers and :class:`~kombu.mixins.ConsumerProducerMixin`
+for creating consumers supporting also publishing messages. Consumers can be created just by subclassing
+mixin class and overriding some of the methods:
.. code-block:: python
@@ -97,7 +152,7 @@ and with multiple channels again:
C(connection).run()
-There's also a :class:`~kombu.mixins.ConsumerProducerMixin` for consumers
+The main use of :class:`~kombu.mixins.ConsumerProducerMixin` is to create consumers
that need to also publish messages on a separate connection (e.g. sending rpc
replies, streaming results):
diff --git a/kombu/connection.py b/kombu/connection.py
index 989a5868..65dcb52b 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -434,12 +434,13 @@ class Connection(object):
if not reraise_as_library_errors:
ctx = self._dummy_context
with ctx():
- return retry_over_time(
+ self._connection = self._connection or retry_over_time(
self._connection_factory, self.recoverable_connection_errors,
(), {}, on_error, max_retries,
interval_start, interval_step, interval_max,
callback, timeout=timeout
)
+ return self._connection
@contextmanager
def _reraise_as_library_errors(
@@ -860,7 +861,7 @@ class Connection(object):
if not self._closed:
if not self.connected:
conn_opts = self._extract_failover_opts()
- self._connection = self._ensure_connection(**conn_opts)
+ self._ensure_connection(**conn_opts)
return self._connection
def _connection_factory(self):
diff --git a/t/unit/test_connection.py b/t/unit/test_connection.py
index 017dfae4..13834113 100644
--- a/t/unit/test_connection.py
+++ b/t/unit/test_connection.py
@@ -131,7 +131,9 @@ class test_Connection:
def test_establish_connection(self):
conn = self.conn
+ assert not conn.connected
conn.connect()
+ assert conn.connected
assert conn.connection.connected
assert conn.host == 'localhost:5672'
channel = conn.channel()
@@ -142,6 +144,10 @@ class test_Connection:
assert not _connection.connected
assert isinstance(conn.transport, Transport)
+ def test_reuse_connection(self):
+ conn = self.conn
+ assert conn.connect() is conn.connection is conn.connect()
+
def test_connect_no_transport_options(self):
conn = self.conn
conn._ensure_connection = Mock()