summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCarlos Corbacho <carlos@strangeworlds.co.uk>2021-12-22 04:50:42 +0000
committerGitHub <noreply@github.com>2021-12-22 10:50:42 +0600
commitf54e47d2ea8fe1546fcb058b9efd78c6f1a8ca74 (patch)
tree50e0970ed765494dca6be126cfdd5c55c82b62e7
parent1390bc0b6758a38b4a5329889db0785ea831309d (diff)
downloadpy-amqp-v2.6.tar.gz
Backport basic publish confirm fix to 2.6 (#360)v2.6
* Reuse the timeout for publishing to wait for a response. * Added the confirm_timeout keyword argument. If a timeout was specified and confirm_timeout was not use the timeout. Otherwise, use the confirm_timeout. * Fix unit test. * Basic publish confirm timeout fix (#343) * Reuse the timeout for publishing to wait for a response. * Added the confirm_timeout keyword argument. If a timeout was specified and confirm_timeout was not use the timeout. Otherwise, use the confirm_timeout. * Fix unit test. * Add document for timeout and confirm_timeout in _basic_publish Co-authored-by: Omer Katz <omer.drow@gmail.com> Co-authored-by: Reza Shiri <rezashiri@cafebazaar.ir> * Bump version to 2.6.2 for new release Co-authored-by: Omer Katz <omer.drow@gmail.com> Co-authored-by: RezaSi <rezashiri88@gmail.com> Co-authored-by: Reza Shiri <rezashiri@cafebazaar.ir> Co-authored-by: Carlos Corbacho <carlos.corbacho@complyadvantage.com>
-rw-r--r--.bumpversion.cfg2
-rw-r--r--Changelog11
-rw-r--r--README.rst2
-rw-r--r--amqp/__init__.py2
-rw-r--r--amqp/channel.py34
-rw-r--r--docs/includes/introduction.txt2
-rw-r--r--t/unit/test_channel.py4
7 files changed, 47 insertions, 10 deletions
diff --git a/.bumpversion.cfg b/.bumpversion.cfg
index 86b6e49..2df8172 100644
--- a/.bumpversion.cfg
+++ b/.bumpversion.cfg
@@ -1,5 +1,5 @@
[bumpversion]
-current_version = 2.6.1
+current_version = 2.6.2
commit = True
tag = True
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(?P<releaselevel>[a-z]+)?
diff --git a/Changelog b/Changelog
index 2d63132..08a873b 100644
--- a/Changelog
+++ b/Changelog
@@ -5,6 +5,17 @@ py-amqp is fork of amqplib used by Kombu containing additional features and impr
The previous amqplib changelog is here:
http://code.google.com/p/py-amqplib/source/browse/CHANGES
+.. _version-2.6.2:
+
+2.6.2
+=====
+:release-date:
+:release-by:
+
+- Fix infinite wait when using confirm_publish
+
+ Contributed by **Omer Katz** & **RezaSi**
+
.. _version-2.6.0:
2.6.1
diff --git a/README.rst b/README.rst
index 7290c67..5c69837 100644
--- a/README.rst
+++ b/README.rst
@@ -4,7 +4,7 @@
|build-status| |coverage| |license| |wheel| |pyversion| |pyimp|
-:Version: 2.6.1
+:Version: 2.6.2
:Web: https://amqp.readthedocs.io/
:Download: https://pypi.org/project/amqp/
:Source: http://github.com/celery/py-amqp/
diff --git a/amqp/__init__.py b/amqp/__init__.py
index d15799e..45f95e1 100644
--- a/amqp/__init__.py
+++ b/amqp/__init__.py
@@ -6,7 +6,7 @@ import re
from collections import namedtuple
-__version__ = '2.6.1'
+__version__ = '2.6.2'
__author__ = 'Barry Pederson'
__maintainer__ = 'Asif Saif Uddin, Matus Valo'
__contact__ = 'pyamqp@celeryproject.org'
diff --git a/amqp/channel.py b/amqp/channel.py
index 6c20102..8718c17 100644
--- a/amqp/channel.py
+++ b/amqp/channel.py
@@ -1676,6 +1676,7 @@ class Channel(AbstractChannel):
def _basic_publish(self, msg, exchange='', routing_key='',
mandatory=False, immediate=False, timeout=None,
+ confirm_timeout=None,
argsig='Bssbb'):
"""Publish a message.
@@ -1685,9 +1686,11 @@ class Channel(AbstractChannel):
transaction, if any, is committed.
When channel is in confirm mode (when Connection parameter
- confirm_publish is set to True), each message is confirmed. When
- broker rejects published message (e.g. due internal broker
- constrains), MessageNacked exception is raised.
+ confirm_publish is set to True), each message is confirmed.
+ When broker rejects published message (e.g. due internal broker
+ constrains), MessageNacked exception is raised and
+ set confirm_timeout to wait maximum confirm_timeout second
+ for message to confirm.
PARAMETERS:
exchange: shortstr
@@ -1745,12 +1748,28 @@ class Channel(AbstractChannel):
RULE:
The server SHOULD implement the immediate flag.
+
+ timeout: short
+
+ timeout for publish
+
+ Set timeout to wait maximum timeout second
+ for message to publish.
+
+ confirm_timeout: short
+
+ confirm_timeout for publish in confirm mode
+
+ When the channel is in confirm mode set
+ confirm_timeout to wait maximum confirm_timeout
+ second for message to confirm.
+
"""
if not self.connection:
raise RecoverableConnectionError(
'basic_publish: connection closed')
- capabilities = self.connection.\
+ capabilities = self.connection. \
client_properties.get('capabilities', {})
if capabilities.get('connection.blocked', False):
try:
@@ -1767,9 +1786,11 @@ class Channel(AbstractChannel):
)
except socket.timeout:
raise RecoverableChannelError('basic_publish: timed out')
+
basic_publish = _basic_publish
def basic_publish_confirm(self, *args, **kwargs):
+ confirm_timeout = kwargs.pop('confirm_timeout', None)
def confirm_handler(method, *args):
# When RMQ nacks message we are raising MessageNacked exception
@@ -1781,7 +1802,10 @@ class Channel(AbstractChannel):
self.confirm_select()
ret = self._basic_publish(*args, **kwargs)
# Waiting for confirmation of message.
- self.wait([spec.Basic.Ack, spec.Basic.Nack], callback=confirm_handler)
+ timeout = confirm_timeout or kwargs.get('timeout', None)
+ self.wait([spec.Basic.Ack, spec.Basic.Nack],
+ callback=confirm_handler,
+ timeout=timeout)
return ret
def basic_qos(self, prefetch_size, prefetch_count, a_global,
diff --git a/docs/includes/introduction.txt b/docs/includes/introduction.txt
index 021e0a0..00851d6 100644
--- a/docs/includes/introduction.txt
+++ b/docs/includes/introduction.txt
@@ -1,4 +1,4 @@
-:Version: 2.6.1
+:Version: 2.6.2
:Web: https://amqp.readthedocs.io/
:Download: https://pypi.org/project/amqp/
:Source: http://github.com/celery/py-amqp/
diff --git a/t/unit/test_channel.py b/t/unit/test_channel.py
index 1e05442..12228d1 100644
--- a/t/unit/test_channel.py
+++ b/t/unit/test_channel.py
@@ -404,7 +404,9 @@ class test_Channel:
self.c._basic_publish.assert_called_with(1, 2, arg=1)
assert ret is self.c._basic_publish()
self.c.wait.assert_called_with(
- [spec.Basic.Ack, spec.Basic.Nack], callback=ANY
+ [spec.Basic.Ack, spec.Basic.Nack],
+ callback=ANY,
+ timeout=None
)
self.c.basic_publish_confirm(1, 2, arg=1)