summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRezaSi <rezashiri88@gmail.com>2020-11-08 19:51:13 +0330
committerGitHub <noreply@github.com>2020-11-08 18:21:13 +0200
commitaec7daab8dacc4a6c2b265fea9a4d15648432e69 (patch)
treee333a090aa5237b85a64c452997bb4f40a339f3f
parentb3f371333ff7df793628809e8abdbf43fd4391ba (diff)
downloadpy-amqp-aec7daab8dacc4a6c2b265fea9a4d15648432e69.tar.gz
Basic publish confirm timeout fix (#343)
* Reuse the timeout for publishing to wait for a response. * Added the confirm_timeout keyword argument. If a timeout was specified and confirm_timeout was not use the timeout. Otherwise, use the confirm_timeout. * Fix unit test. * Add document for timeout and confirm_timeout in _basic_publish Co-authored-by: Omer Katz <omer.drow@gmail.com> Co-authored-by: Reza Shiri <rezashiri@cafebazaar.ir>
-rw-r--r--amqp/channel.py34
-rw-r--r--t/unit/test_channel.py4
2 files changed, 32 insertions, 6 deletions
diff --git a/amqp/channel.py b/amqp/channel.py
index 8918210..b28d5cf 100644
--- a/amqp/channel.py
+++ b/amqp/channel.py
@@ -1674,6 +1674,7 @@ class Channel(AbstractChannel):
def _basic_publish(self, msg, exchange='', routing_key='',
mandatory=False, immediate=False, timeout=None,
+ confirm_timeout=None,
argsig='Bssbb'):
"""Publish a message.
@@ -1683,9 +1684,11 @@ class Channel(AbstractChannel):
transaction, if any, is committed.
When channel is in confirm mode (when Connection parameter
- confirm_publish is set to True), each message is confirmed. When
- broker rejects published message (e.g. due internal broker
- constrains), MessageNacked exception is raised.
+ confirm_publish is set to True), each message is confirmed.
+ When broker rejects published message (e.g. due internal broker
+ constrains), MessageNacked exception is raised and
+ set confirm_timeout to wait maximum confirm_timeout second
+ for message to confirm.
PARAMETERS:
exchange: shortstr
@@ -1743,12 +1746,28 @@ class Channel(AbstractChannel):
RULE:
The server SHOULD implement the immediate flag.
+
+ timeout: short
+
+ timeout for publish
+
+ Set timeout to wait maximum timeout second
+ for message to publish.
+
+ confirm_timeout: short
+
+ confirm_timeout for publish in confirm mode
+
+ When the channel is in confirm mode set
+ confirm_timeout to wait maximum confirm_timeout
+ second for message to confirm.
+
"""
if not self.connection:
raise RecoverableConnectionError(
'basic_publish: connection closed')
- capabilities = self.connection.\
+ capabilities = self.connection. \
client_properties.get('capabilities', {})
if capabilities.get('connection.blocked', False):
try:
@@ -1765,9 +1784,11 @@ class Channel(AbstractChannel):
)
except socket.timeout:
raise RecoverableChannelError('basic_publish: timed out')
+
basic_publish = _basic_publish
def basic_publish_confirm(self, *args, **kwargs):
+ confirm_timeout = kwargs.pop('confirm_timeout', None)
def confirm_handler(method, *args):
# When RMQ nacks message we are raising MessageNacked exception
@@ -1779,7 +1800,10 @@ class Channel(AbstractChannel):
self.confirm_select()
ret = self._basic_publish(*args, **kwargs)
# Waiting for confirmation of message.
- self.wait([spec.Basic.Ack, spec.Basic.Nack], callback=confirm_handler)
+ timeout = confirm_timeout or kwargs.get('timeout', None)
+ self.wait([spec.Basic.Ack, spec.Basic.Nack],
+ callback=confirm_handler,
+ timeout=timeout)
return ret
def basic_qos(self, prefetch_size, prefetch_count, a_global,
diff --git a/t/unit/test_channel.py b/t/unit/test_channel.py
index c8f1f9a..096ffcc 100644
--- a/t/unit/test_channel.py
+++ b/t/unit/test_channel.py
@@ -403,7 +403,9 @@ class test_Channel:
self.c._basic_publish.assert_called_with(1, 2, arg=1)
assert ret is self.c._basic_publish()
self.c.wait.assert_called_with(
- [spec.Basic.Ack, spec.Basic.Nack], callback=ANY
+ [spec.Basic.Ack, spec.Basic.Nack],
+ callback=ANY,
+ timeout=None
)
self.c.basic_publish_confirm(1, 2, arg=1)