summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRezaSi <rezashiri88@gmail.com>2020-11-17 11:36:43 +0330
committerGitHub <noreply@github.com>2020-11-17 14:06:43 +0600
commitb8594f8a3372fda8f2fbf5d9644e4089ae921637 (patch)
tree0a8ef2f413e3a008662ed18f48996145280c295e
parent1f2392beae003383ad243d4e7bb18fcae0d27cf1 (diff)
downloadkombu-b8594f8a3372fda8f2fbf5d9644e4089ae921637.tar.gz
Add timeout to producer publish (#1269)
* Add timeout and confirm_timeout to producer publish * Remove confirm_timeout and add test for timeout * Fix test Co-authored-by: Reza Shiri <rezashiri@cafebazaar.ir>
-rw-r--r--kombu/messaging.py14
-rw-r--r--t/unit/test_messaging.py8
2 files changed, 18 insertions, 4 deletions
diff --git a/kombu/messaging.py b/kombu/messaging.py
index 71a6cd48..e762c537 100644
--- a/kombu/messaging.py
+++ b/kombu/messaging.py
@@ -115,7 +115,7 @@ class Producer:
mandatory=False, immediate=False, priority=0,
content_type=None, content_encoding=None, serializer=None,
headers=None, compression=None, exchange=None, retry=False,
- retry_policy=None, declare=None, expiration=None,
+ retry_policy=None, declare=None, expiration=None, timeout=None,
**properties):
"""Publish message to the specified exchange.
@@ -144,6 +144,8 @@ class Producer:
supported by :meth:`~kombu.Connection.ensure`.
expiration (float): A TTL in seconds can be specified per message.
Default is no expiration.
+ timeout (float): Set timeout to wait maximum timeout second
+ for message to publish.
**properties (Any): Additional message properties, see AMQP spec.
"""
_publish = self._publish
@@ -175,12 +177,12 @@ class Producer:
return _publish(
body, priority, content_type, content_encoding,
headers, properties, routing_key, mandatory, immediate,
- exchange_name, declare,
+ exchange_name, declare, timeout
)
def _publish(self, body, priority, content_type, content_encoding,
headers, properties, routing_key, mandatory,
- immediate, exchange, declare):
+ immediate, exchange, declare, timeout=None):
channel = self.channel
message = channel.prepare_message(
body, priority, content_type,
@@ -198,6 +200,7 @@ class Producer:
message,
exchange=exchange, routing_key=routing_key,
mandatory=mandatory, immediate=immediate,
+ timeout=timeout
)
def _get_channel(self):
@@ -211,6 +214,7 @@ class Producer:
def _set_channel(self, channel):
self._channel = channel
+
channel = property(_get_channel, _set_channel)
def revive(self, channel):
@@ -237,6 +241,7 @@ class Producer:
def release(self):
pass
+
close = release
def _prepare(self, body, serializer=None, content_type=None,
@@ -358,7 +363,7 @@ class Consumer:
#: Mapping of queues we consume from.
_queues = None
- _tags = count(1) # global
+ _tags = count(1) # global
def __init__(self, channel, queues=None, no_ack=None, auto_declare=None,
callbacks=None, on_decode_error=None, on_message=None,
@@ -483,6 +488,7 @@ class Consumer:
for tag in self._active_tags.values():
cancel(tag)
self._active_tags.clear()
+
close = cancel
def cancel_by_queue(self, queue):
diff --git a/t/unit/test_messaging.py b/t/unit/test_messaging.py
index 52c09ad3..dc673e07 100644
--- a/t/unit/test_messaging.py
+++ b/t/unit/test_messaging.py
@@ -137,6 +137,14 @@ class test_Producer:
properties = p._channel.prepare_message.call_args[0][5]
assert properties['expiration'] == '10000'
+ def test_publish_with_timeout(self):
+ p = self.connection.Producer()
+ p.channel = Mock()
+ p.channel.connection.client.declared_entities = set()
+ p.publish('test_timeout', exchange=Exchange('foo'), timeout=1)
+ timeout = p._channel.basic_publish.call_args[1]['timeout']
+ assert timeout == 1
+
def test_publish_with_reply_to(self):
p = self.connection.Producer()
p.channel = Mock()