diff options
author | RezaSi <rezashiri88@gmail.com> | 2020-11-17 11:36:43 +0330 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-17 14:06:43 +0600 |
commit | b8594f8a3372fda8f2fbf5d9644e4089ae921637 (patch) | |
tree | 0a8ef2f413e3a008662ed18f48996145280c295e | |
parent | 1f2392beae003383ad243d4e7bb18fcae0d27cf1 (diff) | |
download | kombu-b8594f8a3372fda8f2fbf5d9644e4089ae921637.tar.gz |
Add timeout to producer publish (#1269)
* Add timeout and confirm_timeout to producer publish
* Remove confirm_timeout and add test for timeout
* Fix test
Co-authored-by: Reza Shiri <rezashiri@cafebazaar.ir>
-rw-r--r-- | kombu/messaging.py | 14 | ||||
-rw-r--r-- | t/unit/test_messaging.py | 8 |
2 files changed, 18 insertions, 4 deletions
diff --git a/kombu/messaging.py b/kombu/messaging.py index 71a6cd48..e762c537 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -115,7 +115,7 @@ class Producer: mandatory=False, immediate=False, priority=0, content_type=None, content_encoding=None, serializer=None, headers=None, compression=None, exchange=None, retry=False, - retry_policy=None, declare=None, expiration=None, + retry_policy=None, declare=None, expiration=None, timeout=None, **properties): """Publish message to the specified exchange. @@ -144,6 +144,8 @@ class Producer: supported by :meth:`~kombu.Connection.ensure`. expiration (float): A TTL in seconds can be specified per message. Default is no expiration. + timeout (float): Set timeout to wait maximum timeout second + for message to publish. **properties (Any): Additional message properties, see AMQP spec. """ _publish = self._publish @@ -175,12 +177,12 @@ class Producer: return _publish( body, priority, content_type, content_encoding, headers, properties, routing_key, mandatory, immediate, - exchange_name, declare, + exchange_name, declare, timeout ) def _publish(self, body, priority, content_type, content_encoding, headers, properties, routing_key, mandatory, - immediate, exchange, declare): + immediate, exchange, declare, timeout=None): channel = self.channel message = channel.prepare_message( body, priority, content_type, @@ -198,6 +200,7 @@ class Producer: message, exchange=exchange, routing_key=routing_key, mandatory=mandatory, immediate=immediate, + timeout=timeout ) def _get_channel(self): @@ -211,6 +214,7 @@ class Producer: def _set_channel(self, channel): self._channel = channel + channel = property(_get_channel, _set_channel) def revive(self, channel): @@ -237,6 +241,7 @@ class Producer: def release(self): pass + close = release def _prepare(self, body, serializer=None, content_type=None, @@ -358,7 +363,7 @@ class Consumer: #: Mapping of queues we consume from. _queues = None - _tags = count(1) # global + _tags = count(1) # global def __init__(self, channel, queues=None, no_ack=None, auto_declare=None, callbacks=None, on_decode_error=None, on_message=None, @@ -483,6 +488,7 @@ class Consumer: for tag in self._active_tags.values(): cancel(tag) self._active_tags.clear() + close = cancel def cancel_by_queue(self, queue): diff --git a/t/unit/test_messaging.py b/t/unit/test_messaging.py index 52c09ad3..dc673e07 100644 --- a/t/unit/test_messaging.py +++ b/t/unit/test_messaging.py @@ -137,6 +137,14 @@ class test_Producer: properties = p._channel.prepare_message.call_args[0][5] assert properties['expiration'] == '10000' + def test_publish_with_timeout(self): + p = self.connection.Producer() + p.channel = Mock() + p.channel.connection.client.declared_entities = set() + p.publish('test_timeout', exchange=Exchange('foo'), timeout=1) + timeout = p._channel.basic_publish.call_args[1]['timeout'] + assert timeout == 1 + def test_publish_with_reply_to(self): p = self.connection.Producer() p.channel = Mock() |