diff options
author | RezaSi <rezashiri88@gmail.com> | 2020-11-08 19:51:13 +0330 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-08 18:21:13 +0200 |
commit | aec7daab8dacc4a6c2b265fea9a4d15648432e69 (patch) | |
tree | e333a090aa5237b85a64c452997bb4f40a339f3f /amqp | |
parent | b3f371333ff7df793628809e8abdbf43fd4391ba (diff) | |
download | py-amqp-aec7daab8dacc4a6c2b265fea9a4d15648432e69.tar.gz |
Basic publish confirm timeout fix (#343)
* Reuse the timeout for publishing to wait for a response.
* Added the confirm_timeout keyword argument.
If a timeout was specified and confirm_timeout was not use the timeout.
Otherwise, use the confirm_timeout.
* Fix unit test.
* Add document for timeout and confirm_timeout in _basic_publish
Co-authored-by: Omer Katz <omer.drow@gmail.com>
Co-authored-by: Reza Shiri <rezashiri@cafebazaar.ir>
Diffstat (limited to 'amqp')
-rw-r--r-- | amqp/channel.py | 34 |
1 files changed, 29 insertions, 5 deletions
diff --git a/amqp/channel.py b/amqp/channel.py index 8918210..b28d5cf 100644 --- a/amqp/channel.py +++ b/amqp/channel.py @@ -1674,6 +1674,7 @@ class Channel(AbstractChannel): def _basic_publish(self, msg, exchange='', routing_key='', mandatory=False, immediate=False, timeout=None, + confirm_timeout=None, argsig='Bssbb'): """Publish a message. @@ -1683,9 +1684,11 @@ class Channel(AbstractChannel): transaction, if any, is committed. When channel is in confirm mode (when Connection parameter - confirm_publish is set to True), each message is confirmed. When - broker rejects published message (e.g. due internal broker - constrains), MessageNacked exception is raised. + confirm_publish is set to True), each message is confirmed. + When broker rejects published message (e.g. due internal broker + constrains), MessageNacked exception is raised and + set confirm_timeout to wait maximum confirm_timeout second + for message to confirm. PARAMETERS: exchange: shortstr @@ -1743,12 +1746,28 @@ class Channel(AbstractChannel): RULE: The server SHOULD implement the immediate flag. + + timeout: short + + timeout for publish + + Set timeout to wait maximum timeout second + for message to publish. + + confirm_timeout: short + + confirm_timeout for publish in confirm mode + + When the channel is in confirm mode set + confirm_timeout to wait maximum confirm_timeout + second for message to confirm. + """ if not self.connection: raise RecoverableConnectionError( 'basic_publish: connection closed') - capabilities = self.connection.\ + capabilities = self.connection. \ client_properties.get('capabilities', {}) if capabilities.get('connection.blocked', False): try: @@ -1765,9 +1784,11 @@ class Channel(AbstractChannel): ) except socket.timeout: raise RecoverableChannelError('basic_publish: timed out') + basic_publish = _basic_publish def basic_publish_confirm(self, *args, **kwargs): + confirm_timeout = kwargs.pop('confirm_timeout', None) def confirm_handler(method, *args): # When RMQ nacks message we are raising MessageNacked exception @@ -1779,7 +1800,10 @@ class Channel(AbstractChannel): self.confirm_select() ret = self._basic_publish(*args, **kwargs) # Waiting for confirmation of message. - self.wait([spec.Basic.Ack, spec.Basic.Nack], callback=confirm_handler) + timeout = confirm_timeout or kwargs.get('timeout', None) + self.wait([spec.Basic.Ack, spec.Basic.Nack], + callback=confirm_handler, + timeout=timeout) return ret def basic_qos(self, prefetch_size, prefetch_count, a_global, |