summaryrefslogtreecommitdiff
path: root/docs
diff options
context:
space:
mode:
authorMatus Valo <matusvalo@gmail.com>2020-06-01 11:04:32 +0200
committerAsif Saif Uddin <auvipy@gmail.com>2020-06-01 18:59:14 +0600
commit1be72e3b6a2fb0466ce68380f33c8138f51b6cbf (patch)
treee93906f463f8b9d4a52567547227929ae7bfe9a8 /docs
parent3907e875072706bb33e35455339bc602bc8c79a3 (diff)
downloadkombu-1be72e3b6a2fb0466ce68380f33c8138f51b6cbf.tar.gz
Improved Consumer user guide
Diffstat (limited to 'docs')
-rw-r--r--docs/userguide/consumers.rst81
1 files changed, 68 insertions, 13 deletions
diff --git a/docs/userguide/consumers.rst b/docs/userguide/consumers.rst
index cb943b7b..fca4c0ce 100644
--- a/docs/userguide/consumers.rst
+++ b/docs/userguide/consumers.rst
@@ -9,7 +9,7 @@
Basics
======
-The :class:`Consumer` takes a connection (or channel) and a list of queues to
+The :class:`~kombu.messaging.Consumer` takes a connection (or channel) and a list of queues to
consume from. Several consumers can be mixed to consume from different
channels, as they all bind to the same connection, and ``drain_events`` will
drain events from all channels on that connection.
@@ -22,30 +22,85 @@ drain events from all channels on that connection.
.. code-block:: python
- Consumer(conn, accept=['json', 'pickle', 'msgpack', 'yaml'])
+ >>> Consumer(conn, accept=['json', 'pickle', 'msgpack', 'yaml'])
+You can create a consumer using a Connection. Consumer is consuming from single queue with name `'queue'`:
-Draining events from a single consumer:
+.. code-block:: python
+
+ >>> queue = Queue('queue', routing_key='queue')
+ >>> consumer = connection.Consumer(queue)
+
+You can also instantiate Consumer directly, it takes a channel or a connection as an argument. This consumer also
+consumes from single queue with name `'queue'`:
+
+.. code-block:: python
+
+ >>> queue = Queue('queue', routing_key='queue')
+ >>> with Connection('amqp://') as conn:
+ ... with conn.channel() as channel:
+ ... consumer = Consumer(channel, queue)
+
+Consumer needs to specify handler of received data. This handler specified in form of callback. Callback function is called
+by kombu library every time a new message is received. Callback is called with two parameters ``body`` containing deserialized
+data sent by producer and :class:`~kombu.message.Message` instance ``message``. User is also responsible for acknowledging of message when manual
+acknowledge is set.
+
+.. code-block:: python
+
+ >>> def callback(body, message):
+ ... print(body)
+ ... message.ack()
+
+ >>> consumer.register_callback(callback)
+
+Draining events from a single consumer. Method ``drain_events`` by default blocks indefinitely. This example sets timeout to 1 second:
.. code-block:: python
- with Consumer(connection, queues, accept=['json']):
- connection.drain_events(timeout=1)
+ >>> with consumer:
+ ... connection.drain_events(timeout=1)
+Draining events from several consumers. Each consumer has its own list of queues. Each consumer accepts `'json'` format of data:
+
+.. code-block:: python
-Draining events from several consumers:
+ >>> from kombu.utils.compat import nested
+
+ >>> queues1 = [Queue('queue11', routing_key='queue12')]
+ >>> queues2 = [Queue('queue21', routing_key='queue22')]
+ >>> with connection.channel(), connection.channel() as (channel1, channel2):
+ ... with nested(Consumer(channel1, queues1, accept=['json']),
+ ... Consumer(channel2, queues2, accept=['json'])):
+ ... connection.drain_events(timeout=1)
+
+The full example will look as follows:
.. code-block:: python
- from kombu.utils.compat import nested
+ from kombu import Connection, Consumer, Queue
+
+ def callback(body, message):
+ print('RECEIVED MESSAGE: {0!r}'.format(body))
+ message.ack()
+
+ queue1 = Queue('queue1', routing_key='queue1')
+ queue2 = Queue('queue2', routing_key='queue2')
- with connection.channel(), connection.channel() as (channel1, channel2):
- with nested(Consumer(channel1, queues1, accept=['json']),
- Consumer(channel2, queues2, accept=['json'])):
- connection.drain_events(timeout=1)
+ with Connection('amqp://') as conn:
+ with conn.channel() as channel:
+ consumer = Consumer(conn, [queue1, queue2], accept=['json'])
+ consumer.register_callback(callback)
+ with consumer:
+ conn.drain_events(timeout=1)
+Consumer mixin classes
+======================
-Or using :class:`~kombu.mixins.ConsumerMixin`:
+Kombu provides predefined mixin classes in module :py:mod:`~kombu.mixins`. It contains two classes:
+:class:`~kombu.mixins.ConsumerMixin` for creating consumers and :class:`~kombu.mixins.ConsumerProducerMixin`
+for creating consumers supporting also publishing messages. Consumers can be created just by subclassing
+mixin class and overriding some of the methods:
.. code-block:: python
@@ -97,7 +152,7 @@ and with multiple channels again:
C(connection).run()
-There's also a :class:`~kombu.mixins.ConsumerProducerMixin` for consumers
+The main use of :class:`~kombu.mixins.ConsumerProducerMixin` is to create consumers
that need to also publish messages on a separate connection (e.g. sending rpc
replies, streaming results):