diff options
author | Matus Valo <matusvalo@gmail.com> | 2020-06-01 11:04:32 +0200 |
---|---|---|
committer | Asif Saif Uddin <auvipy@gmail.com> | 2020-06-01 18:59:14 +0600 |
commit | 1be72e3b6a2fb0466ce68380f33c8138f51b6cbf (patch) | |
tree | e93906f463f8b9d4a52567547227929ae7bfe9a8 /docs | |
parent | 3907e875072706bb33e35455339bc602bc8c79a3 (diff) | |
download | kombu-1be72e3b6a2fb0466ce68380f33c8138f51b6cbf.tar.gz |
Improved Consumer user guide
Diffstat (limited to 'docs')
-rw-r--r-- | docs/userguide/consumers.rst | 81 |
1 files changed, 68 insertions, 13 deletions
diff --git a/docs/userguide/consumers.rst b/docs/userguide/consumers.rst index cb943b7b..fca4c0ce 100644 --- a/docs/userguide/consumers.rst +++ b/docs/userguide/consumers.rst @@ -9,7 +9,7 @@ Basics ====== -The :class:`Consumer` takes a connection (or channel) and a list of queues to +The :class:`~kombu.messaging.Consumer` takes a connection (or channel) and a list of queues to consume from. Several consumers can be mixed to consume from different channels, as they all bind to the same connection, and ``drain_events`` will drain events from all channels on that connection. @@ -22,30 +22,85 @@ drain events from all channels on that connection. .. code-block:: python - Consumer(conn, accept=['json', 'pickle', 'msgpack', 'yaml']) + >>> Consumer(conn, accept=['json', 'pickle', 'msgpack', 'yaml']) +You can create a consumer using a Connection. Consumer is consuming from single queue with name `'queue'`: -Draining events from a single consumer: +.. code-block:: python + + >>> queue = Queue('queue', routing_key='queue') + >>> consumer = connection.Consumer(queue) + +You can also instantiate Consumer directly, it takes a channel or a connection as an argument. This consumer also +consumes from single queue with name `'queue'`: + +.. code-block:: python + + >>> queue = Queue('queue', routing_key='queue') + >>> with Connection('amqp://') as conn: + ... with conn.channel() as channel: + ... consumer = Consumer(channel, queue) + +Consumer needs to specify handler of received data. This handler specified in form of callback. Callback function is called +by kombu library every time a new message is received. Callback is called with two parameters ``body`` containing deserialized +data sent by producer and :class:`~kombu.message.Message` instance ``message``. User is also responsible for acknowledging of message when manual +acknowledge is set. + +.. code-block:: python + + >>> def callback(body, message): + ... print(body) + ... message.ack() + + >>> consumer.register_callback(callback) + +Draining events from a single consumer. Method ``drain_events`` by default blocks indefinitely. This example sets timeout to 1 second: .. code-block:: python - with Consumer(connection, queues, accept=['json']): - connection.drain_events(timeout=1) + >>> with consumer: + ... connection.drain_events(timeout=1) +Draining events from several consumers. Each consumer has its own list of queues. Each consumer accepts `'json'` format of data: + +.. code-block:: python -Draining events from several consumers: + >>> from kombu.utils.compat import nested + + >>> queues1 = [Queue('queue11', routing_key='queue12')] + >>> queues2 = [Queue('queue21', routing_key='queue22')] + >>> with connection.channel(), connection.channel() as (channel1, channel2): + ... with nested(Consumer(channel1, queues1, accept=['json']), + ... Consumer(channel2, queues2, accept=['json'])): + ... connection.drain_events(timeout=1) + +The full example will look as follows: .. code-block:: python - from kombu.utils.compat import nested + from kombu import Connection, Consumer, Queue + + def callback(body, message): + print('RECEIVED MESSAGE: {0!r}'.format(body)) + message.ack() + + queue1 = Queue('queue1', routing_key='queue1') + queue2 = Queue('queue2', routing_key='queue2') - with connection.channel(), connection.channel() as (channel1, channel2): - with nested(Consumer(channel1, queues1, accept=['json']), - Consumer(channel2, queues2, accept=['json'])): - connection.drain_events(timeout=1) + with Connection('amqp://') as conn: + with conn.channel() as channel: + consumer = Consumer(conn, [queue1, queue2], accept=['json']) + consumer.register_callback(callback) + with consumer: + conn.drain_events(timeout=1) +Consumer mixin classes +====================== -Or using :class:`~kombu.mixins.ConsumerMixin`: +Kombu provides predefined mixin classes in module :py:mod:`~kombu.mixins`. It contains two classes: +:class:`~kombu.mixins.ConsumerMixin` for creating consumers and :class:`~kombu.mixins.ConsumerProducerMixin` +for creating consumers supporting also publishing messages. Consumers can be created just by subclassing +mixin class and overriding some of the methods: .. code-block:: python @@ -97,7 +152,7 @@ and with multiple channels again: C(connection).run() -There's also a :class:`~kombu.mixins.ConsumerProducerMixin` for consumers +The main use of :class:`~kombu.mixins.ConsumerProducerMixin` is to create consumers that need to also publish messages on a separate connection (e.g. sending rpc replies, streaming results): |