summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2015-09-16 12:19:38 -0700
committerAsk Solem <ask@celeryproject.org>2015-09-16 12:19:38 -0700
commite5f3e1ef9b2c934856fb4cd4fa6d581223827bf4 (patch)
treebd837d1c800a40c56aae56b1fc0ced1b44c71060
parentf04e98540d9ae3c0233214e7b0f5978049b91f0b (diff)
downloadpy-amqp-e5f3e1ef9b2c934856fb4cd4fa6d581223827bf4.tar.gz
basic_publish now raises NotConfirmed on basic.nack
-rw-r--r--amqp/channel.py21
-rw-r--r--amqp/exceptions.py6
-rw-r--r--amqp/tests/test_channel.py35
3 files changed, 57 insertions, 5 deletions
diff --git a/amqp/channel.py b/amqp/channel.py
index 896588e..b0a6b5a 100644
--- a/amqp/channel.py
+++ b/amqp/channel.py
@@ -23,7 +23,8 @@ from warnings import warn
from .abstract_channel import AbstractChannel
from .exceptions import (
- ChannelError, ConnectionError, ConsumerCancelled, error_for_code,
+ ChannelError, ConnectionError,
+ ConsumerCancelled, NotConfirmed, error_for_code,
)
from .five import Queue
from .protocol import basic_return_t, queue_declare_ok_t
@@ -2135,7 +2136,8 @@ class Channel(AbstractChannel):
self._confirm_selected = True
self.confirm_select()
ret = self._basic_publish(*args, **kwargs)
- self.wait([(60, 80)])
+ # Basic.Ack / Basic.Nack
+ self.wait([(60, 80), (60, 120)])
return ret
def basic_qos(self, prefetch_size, prefetch_count, a_global):
@@ -2506,8 +2508,18 @@ class Channel(AbstractChannel):
def _basic_ack_recv(self, args):
delivery_tag = args.read_longlong()
multiple = args.read_bit()
- for callback in self.events['basic_ack']:
- callback(delivery_tag, multiple)
+ self._apply_callbacks('basic_ack', delivery_tag, multiple)
+
+ def _apply_callbacks(self, event, *args):
+ return [callback(*args) for callback in self.events[event]]
+
+ def _basic_nack(self, args):
+ delivery_tag = args.read_longlong()
+ multiple = args.read_bit()
+ requeue = args.read_bit()
+ if not self._apply_callbacks(
+ 'basic_nack', delivery_tag, multiple, requeue):
+ raise NotConfirmed(delivery_tag, (60, 120), 'basic.nack')
_METHOD_MAP = {
(20, 11): _open_ok,
@@ -2533,6 +2545,7 @@ class Channel(AbstractChannel):
(60, 71): _basic_get_ok,
(60, 72): _basic_get_empty,
(60, 80): _basic_ack_recv,
+ (60, 120): _basic_nack,
(60, 111): _basic_recover_ok,
(85, 11): _confirm_select_ok,
(90, 11): _tx_select_ok,
diff --git a/amqp/exceptions.py b/amqp/exceptions.py
index 50c12c6..4f1db66 100644
--- a/amqp/exceptions.py
+++ b/amqp/exceptions.py
@@ -27,7 +27,7 @@ __all__ = [
'ConnectionForced', 'InvalidPath', 'AccessRefused', 'NotFound',
'ResourceLocked', 'PreconditionFailed', 'FrameError', 'FrameSyntaxError',
'InvalidCommand', 'ChannelNotOpen', 'UnexpectedFrame', 'ResourceError',
- 'NotAllowed', 'AMQPNotImplementedError', 'InternalError',
+ 'NotConfirmed', 'NotAllowed', 'AMQPNotImplementedError', 'InternalError',
]
@@ -112,6 +112,10 @@ class NotFound(RecoverableChannelError):
code = 404
+class NotConfirmed(RecoverableConnectionError):
+ pass
+
+
class ResourceLocked(RecoverableChannelError):
code = 405
diff --git a/amqp/tests/test_channel.py b/amqp/tests/test_channel.py
new file mode 100644
index 0000000..1baa159
--- /dev/null
+++ b/amqp/tests/test_channel.py
@@ -0,0 +1,35 @@
+from __future__ import absolute_import
+
+from collections import defaultdict
+
+from amqp.channel import Channel
+from amqp.exceptions import NotConfirmed
+from amqp.serialization import AMQPWriter, AMQPReader
+
+from amqp.tests.case import Case, Mock
+
+
+class NoOpenChannel(Channel):
+
+ def _x_open(self):
+ pass
+
+
+class test_Channel(Case):
+
+ def setUp(self):
+ self.args = AMQPWriter()
+ self.connection = Mock(name='connection')
+ self.connection.channels = defaultdict(lambda: None)
+ self.channel = NoOpenChannel(self.connection, channel_id=1)
+
+ def test_basic_nack(self, delivery_tag=3172312312):
+ self.args.write_longlong(delivery_tag)
+ self.args.write_bit(0)
+ self.args.write_bit(0)
+ with self.assertRaises(NotConfirmed):
+ self.channel._basic_nack(AMQPReader(self.args.getvalue()))
+ callback = Mock(name='callback')
+ self.channel.events['basic_nack'].add(callback)
+ self.channel._basic_nack(AMQPReader(self.args.getvalue()))
+ callback.assert_called_with(delivery_tag, False, False)