summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatus Valo <matusvalo@users.noreply.github.com>2018-10-19 17:30:48 +0200
committerAsif Saif Uddin <auvipy@gmail.com>2018-10-19 21:30:48 +0600
commitf63f8fe9efbe3357741f32541ea2d7b063f7df09 (patch)
tree9ce8a17b163ffa61229902e46fd9fbc90ea34bc2
parent77be4f202ad03a0b9292d8743633e3390f87f9a7 (diff)
downloadpy-amqp-f63f8fe9efbe3357741f32541ea2d7b063f7df09.tar.gz
Handle negative acknowledgments sent by RMQ (#208)
* Handle negative acknowledgments sent by RMQ * Make flake8 happy * Update documentation * Fix redundant parameter in _on_basic_nack + added unittests * Increase coverage of unittests * Move MessageNacked to exceptions.py * Make flake8 happy * Make flake8 happy
-rw-r--r--amqp/abstract_channel.py11
-rw-r--r--amqp/channel.py22
-rw-r--r--amqp/connection.py4
-rw-r--r--amqp/exceptions.py6
-rw-r--r--t/unit/test_abstract_channel.py6
-rw-r--r--t/unit/test_channel.py63
6 files changed, 97 insertions, 15 deletions
diff --git a/amqp/abstract_channel.py b/amqp/abstract_channel.py
index aaad554..138d472 100644
--- a/amqp/abstract_channel.py
+++ b/amqp/abstract_channel.py
@@ -106,17 +106,13 @@ class AbstractChannel(object):
try:
listeners = [self._callbacks[method_sig]]
except KeyError:
- listeners = None
+ listeners = []
+ one_shot = None
try:
one_shot = self._pending.pop(method_sig)
except KeyError:
if not listeners:
return
- else:
- if listeners is None:
- listeners = [one_shot]
- else:
- listeners.append(one_shot)
args = []
if amqp_method.args:
@@ -127,6 +123,9 @@ class AbstractChannel(object):
for listener in listeners:
listener(*args)
+ if one_shot:
+ one_shot(method_sig, *args)
+
#: Placeholder, the concrete implementations will have to
#: supply their own versions of _METHOD_MAP
_METHODS = {}
diff --git a/amqp/channel.py b/amqp/channel.py
index 24255bc..db7b8cc 100644
--- a/amqp/channel.py
+++ b/amqp/channel.py
@@ -13,7 +13,7 @@ from . import spec
from .abstract_channel import AbstractChannel
from .exceptions import (ChannelError, ConsumerCancelled,
RecoverableChannelError, RecoverableConnectionError,
- error_for_code)
+ error_for_code, MessageNacked)
from .five import Queue
from .protocol import queue_declare_ok_t
@@ -93,6 +93,7 @@ class Channel(AbstractChannel):
spec.method(spec.Tx.SelectOk),
spec.method(spec.Confirm.SelectOk),
spec.method(spec.Basic.Ack, 'Lb'),
+ spec.method(spec.Basic.Nack, 'Lb'),
}
_METHODS = {m.method_sig: m for m in _METHODS}
@@ -138,6 +139,7 @@ class Channel(AbstractChannel):
spec.Basic.Deliver: self._on_basic_deliver,
spec.Basic.Return: self._on_basic_return,
spec.Basic.Ack: self._on_basic_ack,
+ spec.Basic.Nack: self._on_basic_nack,
})
def collect(self):
@@ -1665,6 +1667,11 @@ class Channel(AbstractChannel):
configuration and distributed to any active consumers when the
transaction, if any, is committed.
+ When channel is in confirm mode (when Connection parameter
+ confirm_publish is set to True), each message is confirmed. When
+ broker rejects published message (e.g. due internal broker
+ constrains), MessageNacked exception is raised.
+
PARAMETERS:
exchange: shortstr
@@ -1736,11 +1743,18 @@ class Channel(AbstractChannel):
basic_publish = _basic_publish
def basic_publish_confirm(self, *args, **kwargs):
+
+ def confirm_handler(method, *args):
+ # When RMQ nacks message we are raising MessageNacked exception
+ if method == spec.Basic.Nack:
+ raise MessageNacked()
+
if not self._confirm_selected:
self._confirm_selected = True
self.confirm_select()
ret = self._basic_publish(*args, **kwargs)
- self.wait(spec.Basic.Ack)
+ # Waiting for confirmation of message.
+ self.wait([spec.Basic.Ack, spec.Basic.Nack], callback=confirm_handler)
return ret
def basic_qos(self, prefetch_size, prefetch_count, a_global,
@@ -2037,3 +2051,7 @@ class Channel(AbstractChannel):
def _on_basic_ack(self, delivery_tag, multiple):
for callback in self.events['basic_ack']:
callback(delivery_tag, multiple)
+
+ def _on_basic_nack(self, delivery_tag, multiple):
+ for callback in self.events['basic_nack']:
+ callback(delivery_tag, multiple)
diff --git a/amqp/connection.py b/amqp/connection.py
index ea56a2d..6e47798 100644
--- a/amqp/connection.py
+++ b/amqp/connection.py
@@ -97,6 +97,10 @@ class Connection(AbstractChannel):
The "socket_settings" parameter is a dictionary defining tcp
settings which will be applied as socket options.
+
+ When "confirm_publish" is set to True, the channel is put to
+ confirm mode. In this mode, each published message is
+ confirmed using Publisher confirms RabbitMQ extention.
"""
Channel = Channel
diff --git a/amqp/exceptions.py b/amqp/exceptions.py
index c64df14..ed767b4 100644
--- a/amqp/exceptions.py
+++ b/amqp/exceptions.py
@@ -15,7 +15,7 @@ __all__ = [
'ResourceLocked', 'PreconditionFailed', 'FrameError', 'FrameSyntaxError',
'InvalidCommand', 'ChannelNotOpen', 'UnexpectedFrame', 'ResourceError',
'NotAllowed', 'AMQPNotImplementedError', 'InternalError',
-
+ 'MessageNacked',
'AMQPDeprecationWarning',
]
@@ -24,6 +24,10 @@ class AMQPDeprecationWarning(UserWarning):
"""Warning for deprecated things."""
+class MessageNacked(Exception):
+ """Message was nacked by broker."""
+
+
@python_2_unicode_compatible
class AMQPError(Exception):
"""Base class for all AMQP exceptions."""
diff --git a/t/unit/test_abstract_channel.py b/t/unit/test_abstract_channel.py
index ca38e2d..233d1f6 100644
--- a/t/unit/test_abstract_channel.py
+++ b/t/unit/test_abstract_channel.py
@@ -97,14 +97,14 @@ class test_AbstractChannel:
self.method.args = None
p = self.c._pending[(50, 61)] = Mock(name='oneshot')
self.c.dispatch_method((50, 61), 'payload', self.content)
- p.assert_called_with(self.content)
+ p.assert_called_with((50, 61), self.content)
def test_dispatch_method__one_shot_no_content(self):
self.method.args = None
self.method.content = None
p = self.c._pending[(50, 61)] = Mock(name='oneshot')
self.c.dispatch_method((50, 61), 'payload', self.content)
- p.assert_called_with()
+ p.assert_called_with((50, 61))
assert not self.c._pending
def test_dispatch_method__listeners(self):
@@ -121,6 +121,6 @@ class test_AbstractChannel:
p2 = self.c._pending[(50, 61)] = Mock(name='oneshot')
self.c.dispatch_method((50, 61), 'payload', self.content)
p1.assert_called_with(1, 2, 3, self.content)
- p2.assert_called_with(1, 2, 3, self.content)
+ p2.assert_called_with((50, 61), 1, 2, 3, self.content)
assert not self.c._pending
assert self.c._callbacks[(50, 61)]
diff --git a/t/unit/test_channel.py b/t/unit/test_channel.py
index 7fc0aac..d6faff4 100644
--- a/t/unit/test_channel.py
+++ b/t/unit/test_channel.py
@@ -1,11 +1,13 @@
from __future__ import absolute_import, unicode_literals
import pytest
-from case import ContextMock, Mock, patch
+from case import ContextMock, Mock, patch, ANY
from amqp import spec
+from amqp.platform import pack
+from amqp.serialization import dumps
from amqp.channel import Channel
-from amqp.exceptions import ConsumerCancelled, NotFound
+from amqp.exceptions import ConsumerCancelled, NotFound, MessageNacked
class test_Channel:
@@ -334,7 +336,56 @@ class test_Channel:
assert self.c._confirm_selected
self.c._basic_publish.assert_called_with(1, 2, arg=1)
assert ret is self.c._basic_publish()
- self.c.wait.assert_called_with(spec.Basic.Ack)
+ self.c.wait.assert_called_with(
+ [spec.Basic.Ack, spec.Basic.Nack], callback=ANY
+ )
+ self.c.basic_publish_confirm(1, 2, arg=1)
+
+ def test_basic_publish_confirm_nack(self):
+ # test checking whether library is handling correctly Nack confirms
+ # sent from RabbitMQ. Library must raise MessageNacked when server
+ # sent Nack message.
+
+ # Nack frame construction
+ args = dumps('Lb', (1, False))
+ frame = (b''.join([pack('>HH', *spec.Basic.Nack), args]))
+
+ def wait(method, *args, **kwargs):
+ # Simple mock simulating registering callbacks of real wait method
+ for m in method:
+ self.c._pending[m] = kwargs['callback']
+
+ self.c._basic_publish = Mock(name='_basic_publish')
+ self.c.wait = Mock(name='wait', side_effect=wait)
+
+ self.c.basic_publish_confirm(1, 2, arg=1)
+
+ with pytest.raises(MessageNacked):
+ # Inject Nack to message handler
+ self.c.dispatch_method(
+ spec.Basic.Nack, frame, None
+ )
+
+ def test_basic_publsh_confirm_callback(self):
+
+ def wait_nack(method, *args, **kwargs):
+ kwargs['callback'](spec.Basic.Nack)
+
+ def wait_ack(method, *args, **kwargs):
+ kwargs['callback'](spec.Basic.Ack)
+
+ self.c._basic_publish = Mock(name='_basic_publish')
+ self.c.wait = Mock(name='wait_nack', side_effect=wait_nack)
+
+ with pytest.raises(MessageNacked):
+ # when callback is called with spec.Basic.Nack it must raise
+ # MessageNacked exception
+ self.c.basic_publish_confirm(1, 2, arg=1)
+
+ self.c.wait = Mock(name='wait_ack', side_effect=wait_ack)
+
+ # when callback is called with spec.Basic.Ack
+ # it must nost raise exception
self.c.basic_publish_confirm(1, 2, arg=1)
def test_basic_qos(self):
@@ -405,3 +456,9 @@ class test_Channel:
self.c.events['basic_ack'].add(callback)
self.c._on_basic_ack(123, True)
callback.assert_called_with(123, True)
+
+ def test_on_basic_nack(self):
+ callback = Mock(name='callback')
+ self.c.events['basic_nack'].add(callback)
+ self.c._on_basic_nack(123, True)
+ callback.assert_called_with(123, True)