diff options
author | Asif Saif Uddin (Auvi) <auvipy@gmail.com> | 2020-06-03 09:58:20 +0600 |
---|---|---|
committer | Asif Saif Uddin (Auvi) <auvipy@gmail.com> | 2020-06-03 09:58:20 +0600 |
commit | 0f4a2af4a8767b7e0a4da46f65cc8e7ffd829f9b (patch) | |
tree | d9a1ed11973b2baa6e5469c487e0c757ea9ab092 | |
parent | b858bfb97bdfd5a790bb8435ef1dab7672b4530f (diff) | |
parent | ff7c1e304177e9eacc032fd0b875fe7ac0e6229e (diff) | |
download | kombu-0f4a2af4a8767b7e0a4da46f65cc8e7ffd829f9b.tar.gz |
Merge branch 'master' of https://github.com/celery/kombu
-rw-r--r-- | docs/userguide/consumers.rst | 81 | ||||
-rw-r--r-- | kombu/connection.py | 5 | ||||
-rw-r--r-- | t/unit/test_connection.py | 6 |
3 files changed, 77 insertions, 15 deletions
diff --git a/docs/userguide/consumers.rst b/docs/userguide/consumers.rst index cb943b7b..fca4c0ce 100644 --- a/docs/userguide/consumers.rst +++ b/docs/userguide/consumers.rst @@ -9,7 +9,7 @@ Basics ====== -The :class:`Consumer` takes a connection (or channel) and a list of queues to +The :class:`~kombu.messaging.Consumer` takes a connection (or channel) and a list of queues to consume from. Several consumers can be mixed to consume from different channels, as they all bind to the same connection, and ``drain_events`` will drain events from all channels on that connection. @@ -22,30 +22,85 @@ drain events from all channels on that connection. .. code-block:: python - Consumer(conn, accept=['json', 'pickle', 'msgpack', 'yaml']) + >>> Consumer(conn, accept=['json', 'pickle', 'msgpack', 'yaml']) +You can create a consumer using a Connection. Consumer is consuming from single queue with name `'queue'`: -Draining events from a single consumer: +.. code-block:: python + + >>> queue = Queue('queue', routing_key='queue') + >>> consumer = connection.Consumer(queue) + +You can also instantiate Consumer directly, it takes a channel or a connection as an argument. This consumer also +consumes from single queue with name `'queue'`: + +.. code-block:: python + + >>> queue = Queue('queue', routing_key='queue') + >>> with Connection('amqp://') as conn: + ... with conn.channel() as channel: + ... consumer = Consumer(channel, queue) + +Consumer needs to specify handler of received data. This handler specified in form of callback. Callback function is called +by kombu library every time a new message is received. Callback is called with two parameters ``body`` containing deserialized +data sent by producer and :class:`~kombu.message.Message` instance ``message``. User is also responsible for acknowledging of message when manual +acknowledge is set. + +.. code-block:: python + + >>> def callback(body, message): + ... print(body) + ... message.ack() + + >>> consumer.register_callback(callback) + +Draining events from a single consumer. Method ``drain_events`` by default blocks indefinitely. This example sets timeout to 1 second: .. code-block:: python - with Consumer(connection, queues, accept=['json']): - connection.drain_events(timeout=1) + >>> with consumer: + ... connection.drain_events(timeout=1) +Draining events from several consumers. Each consumer has its own list of queues. Each consumer accepts `'json'` format of data: + +.. code-block:: python -Draining events from several consumers: + >>> from kombu.utils.compat import nested + + >>> queues1 = [Queue('queue11', routing_key='queue12')] + >>> queues2 = [Queue('queue21', routing_key='queue22')] + >>> with connection.channel(), connection.channel() as (channel1, channel2): + ... with nested(Consumer(channel1, queues1, accept=['json']), + ... Consumer(channel2, queues2, accept=['json'])): + ... connection.drain_events(timeout=1) + +The full example will look as follows: .. code-block:: python - from kombu.utils.compat import nested + from kombu import Connection, Consumer, Queue + + def callback(body, message): + print('RECEIVED MESSAGE: {0!r}'.format(body)) + message.ack() + + queue1 = Queue('queue1', routing_key='queue1') + queue2 = Queue('queue2', routing_key='queue2') - with connection.channel(), connection.channel() as (channel1, channel2): - with nested(Consumer(channel1, queues1, accept=['json']), - Consumer(channel2, queues2, accept=['json'])): - connection.drain_events(timeout=1) + with Connection('amqp://') as conn: + with conn.channel() as channel: + consumer = Consumer(conn, [queue1, queue2], accept=['json']) + consumer.register_callback(callback) + with consumer: + conn.drain_events(timeout=1) +Consumer mixin classes +====================== -Or using :class:`~kombu.mixins.ConsumerMixin`: +Kombu provides predefined mixin classes in module :py:mod:`~kombu.mixins`. It contains two classes: +:class:`~kombu.mixins.ConsumerMixin` for creating consumers and :class:`~kombu.mixins.ConsumerProducerMixin` +for creating consumers supporting also publishing messages. Consumers can be created just by subclassing +mixin class and overriding some of the methods: .. code-block:: python @@ -97,7 +152,7 @@ and with multiple channels again: C(connection).run() -There's also a :class:`~kombu.mixins.ConsumerProducerMixin` for consumers +The main use of :class:`~kombu.mixins.ConsumerProducerMixin` is to create consumers that need to also publish messages on a separate connection (e.g. sending rpc replies, streaming results): diff --git a/kombu/connection.py b/kombu/connection.py index 989a5868..65dcb52b 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -434,12 +434,13 @@ class Connection(object): if not reraise_as_library_errors: ctx = self._dummy_context with ctx(): - return retry_over_time( + self._connection = self._connection or retry_over_time( self._connection_factory, self.recoverable_connection_errors, (), {}, on_error, max_retries, interval_start, interval_step, interval_max, callback, timeout=timeout ) + return self._connection @contextmanager def _reraise_as_library_errors( @@ -860,7 +861,7 @@ class Connection(object): if not self._closed: if not self.connected: conn_opts = self._extract_failover_opts() - self._connection = self._ensure_connection(**conn_opts) + self._ensure_connection(**conn_opts) return self._connection def _connection_factory(self): diff --git a/t/unit/test_connection.py b/t/unit/test_connection.py index 017dfae4..13834113 100644 --- a/t/unit/test_connection.py +++ b/t/unit/test_connection.py @@ -131,7 +131,9 @@ class test_Connection: def test_establish_connection(self): conn = self.conn + assert not conn.connected conn.connect() + assert conn.connected assert conn.connection.connected assert conn.host == 'localhost:5672' channel = conn.channel() @@ -142,6 +144,10 @@ class test_Connection: assert not _connection.connected assert isinstance(conn.transport, Transport) + def test_reuse_connection(self): + conn = self.conn + assert conn.connect() is conn.connection is conn.connect() + def test_connect_no_transport_options(self): conn = self.conn conn._ensure_connection = Mock() |