diff options
author | Carlos Corbacho <carlos@strangeworlds.co.uk> | 2021-12-22 04:50:42 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-22 10:50:42 +0600 |
commit | f54e47d2ea8fe1546fcb058b9efd78c6f1a8ca74 (patch) | |
tree | 50e0970ed765494dca6be126cfdd5c55c82b62e7 | |
parent | 1390bc0b6758a38b4a5329889db0785ea831309d (diff) | |
download | py-amqp-v2.6.tar.gz |
Backport basic publish confirm fix to 2.6 (#360)v2.6
* Reuse the timeout for publishing to wait for a response.
* Added the confirm_timeout keyword argument.
If a timeout was specified and confirm_timeout was not use the timeout.
Otherwise, use the confirm_timeout.
* Fix unit test.
* Basic publish confirm timeout fix (#343)
* Reuse the timeout for publishing to wait for a response.
* Added the confirm_timeout keyword argument.
If a timeout was specified and confirm_timeout was not use the timeout.
Otherwise, use the confirm_timeout.
* Fix unit test.
* Add document for timeout and confirm_timeout in _basic_publish
Co-authored-by: Omer Katz <omer.drow@gmail.com>
Co-authored-by: Reza Shiri <rezashiri@cafebazaar.ir>
* Bump version to 2.6.2 for new release
Co-authored-by: Omer Katz <omer.drow@gmail.com>
Co-authored-by: RezaSi <rezashiri88@gmail.com>
Co-authored-by: Reza Shiri <rezashiri@cafebazaar.ir>
Co-authored-by: Carlos Corbacho <carlos.corbacho@complyadvantage.com>
-rw-r--r-- | .bumpversion.cfg | 2 | ||||
-rw-r--r-- | Changelog | 11 | ||||
-rw-r--r-- | README.rst | 2 | ||||
-rw-r--r-- | amqp/__init__.py | 2 | ||||
-rw-r--r-- | amqp/channel.py | 34 | ||||
-rw-r--r-- | docs/includes/introduction.txt | 2 | ||||
-rw-r--r-- | t/unit/test_channel.py | 4 |
7 files changed, 47 insertions, 10 deletions
diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 86b6e49..2df8172 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 2.6.1 +current_version = 2.6.2 commit = True tag = True parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(?P<releaselevel>[a-z]+)? @@ -5,6 +5,17 @@ py-amqp is fork of amqplib used by Kombu containing additional features and impr The previous amqplib changelog is here: http://code.google.com/p/py-amqplib/source/browse/CHANGES +.. _version-2.6.2: + +2.6.2 +===== +:release-date: +:release-by: + +- Fix infinite wait when using confirm_publish + + Contributed by **Omer Katz** & **RezaSi** + .. _version-2.6.0: 2.6.1 @@ -4,7 +4,7 @@ |build-status| |coverage| |license| |wheel| |pyversion| |pyimp| -:Version: 2.6.1 +:Version: 2.6.2 :Web: https://amqp.readthedocs.io/ :Download: https://pypi.org/project/amqp/ :Source: http://github.com/celery/py-amqp/ diff --git a/amqp/__init__.py b/amqp/__init__.py index d15799e..45f95e1 100644 --- a/amqp/__init__.py +++ b/amqp/__init__.py @@ -6,7 +6,7 @@ import re from collections import namedtuple -__version__ = '2.6.1' +__version__ = '2.6.2' __author__ = 'Barry Pederson' __maintainer__ = 'Asif Saif Uddin, Matus Valo' __contact__ = 'pyamqp@celeryproject.org' diff --git a/amqp/channel.py b/amqp/channel.py index 6c20102..8718c17 100644 --- a/amqp/channel.py +++ b/amqp/channel.py @@ -1676,6 +1676,7 @@ class Channel(AbstractChannel): def _basic_publish(self, msg, exchange='', routing_key='', mandatory=False, immediate=False, timeout=None, + confirm_timeout=None, argsig='Bssbb'): """Publish a message. @@ -1685,9 +1686,11 @@ class Channel(AbstractChannel): transaction, if any, is committed. When channel is in confirm mode (when Connection parameter - confirm_publish is set to True), each message is confirmed. When - broker rejects published message (e.g. due internal broker - constrains), MessageNacked exception is raised. + confirm_publish is set to True), each message is confirmed. + When broker rejects published message (e.g. due internal broker + constrains), MessageNacked exception is raised and + set confirm_timeout to wait maximum confirm_timeout second + for message to confirm. PARAMETERS: exchange: shortstr @@ -1745,12 +1748,28 @@ class Channel(AbstractChannel): RULE: The server SHOULD implement the immediate flag. + + timeout: short + + timeout for publish + + Set timeout to wait maximum timeout second + for message to publish. + + confirm_timeout: short + + confirm_timeout for publish in confirm mode + + When the channel is in confirm mode set + confirm_timeout to wait maximum confirm_timeout + second for message to confirm. + """ if not self.connection: raise RecoverableConnectionError( 'basic_publish: connection closed') - capabilities = self.connection.\ + capabilities = self.connection. \ client_properties.get('capabilities', {}) if capabilities.get('connection.blocked', False): try: @@ -1767,9 +1786,11 @@ class Channel(AbstractChannel): ) except socket.timeout: raise RecoverableChannelError('basic_publish: timed out') + basic_publish = _basic_publish def basic_publish_confirm(self, *args, **kwargs): + confirm_timeout = kwargs.pop('confirm_timeout', None) def confirm_handler(method, *args): # When RMQ nacks message we are raising MessageNacked exception @@ -1781,7 +1802,10 @@ class Channel(AbstractChannel): self.confirm_select() ret = self._basic_publish(*args, **kwargs) # Waiting for confirmation of message. - self.wait([spec.Basic.Ack, spec.Basic.Nack], callback=confirm_handler) + timeout = confirm_timeout or kwargs.get('timeout', None) + self.wait([spec.Basic.Ack, spec.Basic.Nack], + callback=confirm_handler, + timeout=timeout) return ret def basic_qos(self, prefetch_size, prefetch_count, a_global, diff --git a/docs/includes/introduction.txt b/docs/includes/introduction.txt index 021e0a0..00851d6 100644 --- a/docs/includes/introduction.txt +++ b/docs/includes/introduction.txt @@ -1,4 +1,4 @@ -:Version: 2.6.1 +:Version: 2.6.2 :Web: https://amqp.readthedocs.io/ :Download: https://pypi.org/project/amqp/ :Source: http://github.com/celery/py-amqp/ diff --git a/t/unit/test_channel.py b/t/unit/test_channel.py index 1e05442..12228d1 100644 --- a/t/unit/test_channel.py +++ b/t/unit/test_channel.py @@ -404,7 +404,9 @@ class test_Channel: self.c._basic_publish.assert_called_with(1, 2, arg=1) assert ret is self.c._basic_publish() self.c.wait.assert_called_with( - [spec.Basic.Ack, spec.Basic.Nack], callback=ANY + [spec.Basic.Ack, spec.Basic.Nack], + callback=ANY, + timeout=None ) self.c.basic_publish_confirm(1, 2, arg=1) |