diff options
author | Matus Valo <matusvalo@users.noreply.github.com> | 2018-10-19 17:30:48 +0200 |
---|---|---|
committer | Asif Saif Uddin <auvipy@gmail.com> | 2018-10-19 21:30:48 +0600 |
commit | f63f8fe9efbe3357741f32541ea2d7b063f7df09 (patch) | |
tree | 9ce8a17b163ffa61229902e46fd9fbc90ea34bc2 | |
parent | 77be4f202ad03a0b9292d8743633e3390f87f9a7 (diff) | |
download | py-amqp-f63f8fe9efbe3357741f32541ea2d7b063f7df09.tar.gz |
Handle negative acknowledgments sent by RMQ (#208)
* Handle negative acknowledgments sent by RMQ
* Make flake8 happy
* Update documentation
* Fix redundant parameter in _on_basic_nack + added unittests
* Increase coverage of unittests
* Move MessageNacked to exceptions.py
* Make flake8 happy
* Make flake8 happy
-rw-r--r-- | amqp/abstract_channel.py | 11 | ||||
-rw-r--r-- | amqp/channel.py | 22 | ||||
-rw-r--r-- | amqp/connection.py | 4 | ||||
-rw-r--r-- | amqp/exceptions.py | 6 | ||||
-rw-r--r-- | t/unit/test_abstract_channel.py | 6 | ||||
-rw-r--r-- | t/unit/test_channel.py | 63 |
6 files changed, 97 insertions, 15 deletions
diff --git a/amqp/abstract_channel.py b/amqp/abstract_channel.py index aaad554..138d472 100644 --- a/amqp/abstract_channel.py +++ b/amqp/abstract_channel.py @@ -106,17 +106,13 @@ class AbstractChannel(object): try: listeners = [self._callbacks[method_sig]] except KeyError: - listeners = None + listeners = [] + one_shot = None try: one_shot = self._pending.pop(method_sig) except KeyError: if not listeners: return - else: - if listeners is None: - listeners = [one_shot] - else: - listeners.append(one_shot) args = [] if amqp_method.args: @@ -127,6 +123,9 @@ class AbstractChannel(object): for listener in listeners: listener(*args) + if one_shot: + one_shot(method_sig, *args) + #: Placeholder, the concrete implementations will have to #: supply their own versions of _METHOD_MAP _METHODS = {} diff --git a/amqp/channel.py b/amqp/channel.py index 24255bc..db7b8cc 100644 --- a/amqp/channel.py +++ b/amqp/channel.py @@ -13,7 +13,7 @@ from . import spec from .abstract_channel import AbstractChannel from .exceptions import (ChannelError, ConsumerCancelled, RecoverableChannelError, RecoverableConnectionError, - error_for_code) + error_for_code, MessageNacked) from .five import Queue from .protocol import queue_declare_ok_t @@ -93,6 +93,7 @@ class Channel(AbstractChannel): spec.method(spec.Tx.SelectOk), spec.method(spec.Confirm.SelectOk), spec.method(spec.Basic.Ack, 'Lb'), + spec.method(spec.Basic.Nack, 'Lb'), } _METHODS = {m.method_sig: m for m in _METHODS} @@ -138,6 +139,7 @@ class Channel(AbstractChannel): spec.Basic.Deliver: self._on_basic_deliver, spec.Basic.Return: self._on_basic_return, spec.Basic.Ack: self._on_basic_ack, + spec.Basic.Nack: self._on_basic_nack, }) def collect(self): @@ -1665,6 +1667,11 @@ class Channel(AbstractChannel): configuration and distributed to any active consumers when the transaction, if any, is committed. + When channel is in confirm mode (when Connection parameter + confirm_publish is set to True), each message is confirmed. When + broker rejects published message (e.g. due internal broker + constrains), MessageNacked exception is raised. + PARAMETERS: exchange: shortstr @@ -1736,11 +1743,18 @@ class Channel(AbstractChannel): basic_publish = _basic_publish def basic_publish_confirm(self, *args, **kwargs): + + def confirm_handler(method, *args): + # When RMQ nacks message we are raising MessageNacked exception + if method == spec.Basic.Nack: + raise MessageNacked() + if not self._confirm_selected: self._confirm_selected = True self.confirm_select() ret = self._basic_publish(*args, **kwargs) - self.wait(spec.Basic.Ack) + # Waiting for confirmation of message. + self.wait([spec.Basic.Ack, spec.Basic.Nack], callback=confirm_handler) return ret def basic_qos(self, prefetch_size, prefetch_count, a_global, @@ -2037,3 +2051,7 @@ class Channel(AbstractChannel): def _on_basic_ack(self, delivery_tag, multiple): for callback in self.events['basic_ack']: callback(delivery_tag, multiple) + + def _on_basic_nack(self, delivery_tag, multiple): + for callback in self.events['basic_nack']: + callback(delivery_tag, multiple) diff --git a/amqp/connection.py b/amqp/connection.py index ea56a2d..6e47798 100644 --- a/amqp/connection.py +++ b/amqp/connection.py @@ -97,6 +97,10 @@ class Connection(AbstractChannel): The "socket_settings" parameter is a dictionary defining tcp settings which will be applied as socket options. + + When "confirm_publish" is set to True, the channel is put to + confirm mode. In this mode, each published message is + confirmed using Publisher confirms RabbitMQ extention. """ Channel = Channel diff --git a/amqp/exceptions.py b/amqp/exceptions.py index c64df14..ed767b4 100644 --- a/amqp/exceptions.py +++ b/amqp/exceptions.py @@ -15,7 +15,7 @@ __all__ = [ 'ResourceLocked', 'PreconditionFailed', 'FrameError', 'FrameSyntaxError', 'InvalidCommand', 'ChannelNotOpen', 'UnexpectedFrame', 'ResourceError', 'NotAllowed', 'AMQPNotImplementedError', 'InternalError', - + 'MessageNacked', 'AMQPDeprecationWarning', ] @@ -24,6 +24,10 @@ class AMQPDeprecationWarning(UserWarning): """Warning for deprecated things.""" +class MessageNacked(Exception): + """Message was nacked by broker.""" + + @python_2_unicode_compatible class AMQPError(Exception): """Base class for all AMQP exceptions.""" diff --git a/t/unit/test_abstract_channel.py b/t/unit/test_abstract_channel.py index ca38e2d..233d1f6 100644 --- a/t/unit/test_abstract_channel.py +++ b/t/unit/test_abstract_channel.py @@ -97,14 +97,14 @@ class test_AbstractChannel: self.method.args = None p = self.c._pending[(50, 61)] = Mock(name='oneshot') self.c.dispatch_method((50, 61), 'payload', self.content) - p.assert_called_with(self.content) + p.assert_called_with((50, 61), self.content) def test_dispatch_method__one_shot_no_content(self): self.method.args = None self.method.content = None p = self.c._pending[(50, 61)] = Mock(name='oneshot') self.c.dispatch_method((50, 61), 'payload', self.content) - p.assert_called_with() + p.assert_called_with((50, 61)) assert not self.c._pending def test_dispatch_method__listeners(self): @@ -121,6 +121,6 @@ class test_AbstractChannel: p2 = self.c._pending[(50, 61)] = Mock(name='oneshot') self.c.dispatch_method((50, 61), 'payload', self.content) p1.assert_called_with(1, 2, 3, self.content) - p2.assert_called_with(1, 2, 3, self.content) + p2.assert_called_with((50, 61), 1, 2, 3, self.content) assert not self.c._pending assert self.c._callbacks[(50, 61)] diff --git a/t/unit/test_channel.py b/t/unit/test_channel.py index 7fc0aac..d6faff4 100644 --- a/t/unit/test_channel.py +++ b/t/unit/test_channel.py @@ -1,11 +1,13 @@ from __future__ import absolute_import, unicode_literals import pytest -from case import ContextMock, Mock, patch +from case import ContextMock, Mock, patch, ANY from amqp import spec +from amqp.platform import pack +from amqp.serialization import dumps from amqp.channel import Channel -from amqp.exceptions import ConsumerCancelled, NotFound +from amqp.exceptions import ConsumerCancelled, NotFound, MessageNacked class test_Channel: @@ -334,7 +336,56 @@ class test_Channel: assert self.c._confirm_selected self.c._basic_publish.assert_called_with(1, 2, arg=1) assert ret is self.c._basic_publish() - self.c.wait.assert_called_with(spec.Basic.Ack) + self.c.wait.assert_called_with( + [spec.Basic.Ack, spec.Basic.Nack], callback=ANY + ) + self.c.basic_publish_confirm(1, 2, arg=1) + + def test_basic_publish_confirm_nack(self): + # test checking whether library is handling correctly Nack confirms + # sent from RabbitMQ. Library must raise MessageNacked when server + # sent Nack message. + + # Nack frame construction + args = dumps('Lb', (1, False)) + frame = (b''.join([pack('>HH', *spec.Basic.Nack), args])) + + def wait(method, *args, **kwargs): + # Simple mock simulating registering callbacks of real wait method + for m in method: + self.c._pending[m] = kwargs['callback'] + + self.c._basic_publish = Mock(name='_basic_publish') + self.c.wait = Mock(name='wait', side_effect=wait) + + self.c.basic_publish_confirm(1, 2, arg=1) + + with pytest.raises(MessageNacked): + # Inject Nack to message handler + self.c.dispatch_method( + spec.Basic.Nack, frame, None + ) + + def test_basic_publsh_confirm_callback(self): + + def wait_nack(method, *args, **kwargs): + kwargs['callback'](spec.Basic.Nack) + + def wait_ack(method, *args, **kwargs): + kwargs['callback'](spec.Basic.Ack) + + self.c._basic_publish = Mock(name='_basic_publish') + self.c.wait = Mock(name='wait_nack', side_effect=wait_nack) + + with pytest.raises(MessageNacked): + # when callback is called with spec.Basic.Nack it must raise + # MessageNacked exception + self.c.basic_publish_confirm(1, 2, arg=1) + + self.c.wait = Mock(name='wait_ack', side_effect=wait_ack) + + # when callback is called with spec.Basic.Ack + # it must nost raise exception self.c.basic_publish_confirm(1, 2, arg=1) def test_basic_qos(self): @@ -405,3 +456,9 @@ class test_Channel: self.c.events['basic_ack'].add(callback) self.c._on_basic_ack(123, True) callback.assert_called_with(123, True) + + def test_on_basic_nack(self): + callback = Mock(name='callback') + self.c.events['basic_nack'].add(callback) + self.c._on_basic_nack(123, True) + callback.assert_called_with(123, True) |