diff options
author | Ask Solem <ask@celeryproject.org> | 2015-09-16 12:19:38 -0700 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2015-09-16 12:19:38 -0700 |
commit | e5f3e1ef9b2c934856fb4cd4fa6d581223827bf4 (patch) | |
tree | bd837d1c800a40c56aae56b1fc0ced1b44c71060 | |
parent | f04e98540d9ae3c0233214e7b0f5978049b91f0b (diff) | |
download | py-amqp-e5f3e1ef9b2c934856fb4cd4fa6d581223827bf4.tar.gz |
basic_publish now raises NotConfirmed on basic.nack
-rw-r--r-- | amqp/channel.py | 21 | ||||
-rw-r--r-- | amqp/exceptions.py | 6 | ||||
-rw-r--r-- | amqp/tests/test_channel.py | 35 |
3 files changed, 57 insertions, 5 deletions
diff --git a/amqp/channel.py b/amqp/channel.py index 896588e..b0a6b5a 100644 --- a/amqp/channel.py +++ b/amqp/channel.py @@ -23,7 +23,8 @@ from warnings import warn from .abstract_channel import AbstractChannel from .exceptions import ( - ChannelError, ConnectionError, ConsumerCancelled, error_for_code, + ChannelError, ConnectionError, + ConsumerCancelled, NotConfirmed, error_for_code, ) from .five import Queue from .protocol import basic_return_t, queue_declare_ok_t @@ -2135,7 +2136,8 @@ class Channel(AbstractChannel): self._confirm_selected = True self.confirm_select() ret = self._basic_publish(*args, **kwargs) - self.wait([(60, 80)]) + # Basic.Ack / Basic.Nack + self.wait([(60, 80), (60, 120)]) return ret def basic_qos(self, prefetch_size, prefetch_count, a_global): @@ -2506,8 +2508,18 @@ class Channel(AbstractChannel): def _basic_ack_recv(self, args): delivery_tag = args.read_longlong() multiple = args.read_bit() - for callback in self.events['basic_ack']: - callback(delivery_tag, multiple) + self._apply_callbacks('basic_ack', delivery_tag, multiple) + + def _apply_callbacks(self, event, *args): + return [callback(*args) for callback in self.events[event]] + + def _basic_nack(self, args): + delivery_tag = args.read_longlong() + multiple = args.read_bit() + requeue = args.read_bit() + if not self._apply_callbacks( + 'basic_nack', delivery_tag, multiple, requeue): + raise NotConfirmed(delivery_tag, (60, 120), 'basic.nack') _METHOD_MAP = { (20, 11): _open_ok, @@ -2533,6 +2545,7 @@ class Channel(AbstractChannel): (60, 71): _basic_get_ok, (60, 72): _basic_get_empty, (60, 80): _basic_ack_recv, + (60, 120): _basic_nack, (60, 111): _basic_recover_ok, (85, 11): _confirm_select_ok, (90, 11): _tx_select_ok, diff --git a/amqp/exceptions.py b/amqp/exceptions.py index 50c12c6..4f1db66 100644 --- a/amqp/exceptions.py +++ b/amqp/exceptions.py @@ -27,7 +27,7 @@ __all__ = [ 'ConnectionForced', 'InvalidPath', 'AccessRefused', 'NotFound', 'ResourceLocked', 'PreconditionFailed', 'FrameError', 'FrameSyntaxError', 'InvalidCommand', 'ChannelNotOpen', 'UnexpectedFrame', 'ResourceError', - 'NotAllowed', 'AMQPNotImplementedError', 'InternalError', + 'NotConfirmed', 'NotAllowed', 'AMQPNotImplementedError', 'InternalError', ] @@ -112,6 +112,10 @@ class NotFound(RecoverableChannelError): code = 404 +class NotConfirmed(RecoverableConnectionError): + pass + + class ResourceLocked(RecoverableChannelError): code = 405 diff --git a/amqp/tests/test_channel.py b/amqp/tests/test_channel.py new file mode 100644 index 0000000..1baa159 --- /dev/null +++ b/amqp/tests/test_channel.py @@ -0,0 +1,35 @@ +from __future__ import absolute_import + +from collections import defaultdict + +from amqp.channel import Channel +from amqp.exceptions import NotConfirmed +from amqp.serialization import AMQPWriter, AMQPReader + +from amqp.tests.case import Case, Mock + + +class NoOpenChannel(Channel): + + def _x_open(self): + pass + + +class test_Channel(Case): + + def setUp(self): + self.args = AMQPWriter() + self.connection = Mock(name='connection') + self.connection.channels = defaultdict(lambda: None) + self.channel = NoOpenChannel(self.connection, channel_id=1) + + def test_basic_nack(self, delivery_tag=3172312312): + self.args.write_longlong(delivery_tag) + self.args.write_bit(0) + self.args.write_bit(0) + with self.assertRaises(NotConfirmed): + self.channel._basic_nack(AMQPReader(self.args.getvalue())) + callback = Mock(name='callback') + self.channel.events['basic_nack'].add(callback) + self.channel._basic_nack(AMQPReader(self.args.getvalue())) + callback.assert_called_with(delivery_tag, False, False) |