diff options
author | Matus Valo <matusvalo@gmail.com> | 2019-06-12 07:28:53 +0000 |
---|---|---|
committer | Matus Valo <matusvalo@gmail.com> | 2019-06-12 22:43:37 +0000 |
commit | 99ec59ccb263cde370f6b318b92051d213bf4280 (patch) | |
tree | 7801e67e1305f3bcd403548be54977e12b5c16d3 | |
parent | 39e81d64667405325a55baeeb3aaa7988013b9b8 (diff) | |
download | py-amqp-connection_channel_close.tar.gz |
Ignore all methods except Close and Close-OK when channel/connection is closingconnection_channel_close
-rw-r--r-- | amqp/abstract_channel.py | 19 | ||||
-rw-r--r-- | amqp/channel.py | 4 | ||||
-rw-r--r-- | amqp/connection.py | 7 | ||||
-rw-r--r-- | t/integration/test_integration.py | 96 | ||||
-rw-r--r-- | t/unit/test_abstract_channel.py | 37 | ||||
-rw-r--r-- | t/unit/test_channel.py | 1 |
6 files changed, 161 insertions, 3 deletions
diff --git a/amqp/abstract_channel.py b/amqp/abstract_channel.py index 8530bec..5b6da0d 100644 --- a/amqp/abstract_channel.py +++ b/amqp/abstract_channel.py @@ -2,6 +2,8 @@ # Copyright (C) 2007-2008 Barry Pederson <bp@barryp.org>) from __future__ import absolute_import, unicode_literals +import logging + from vine import ensure_promise, promise from .exceptions import AMQPNotImplementedError, RecoverableConnectionError @@ -10,6 +12,12 @@ from .serialization import dumps, loads __all__ = ['AbstractChannel'] +AMQP_LOGGER = logging.getLogger('amqp') + +IGNORED_METHOD_DURING_CHANNEL_CLOSE = """\ +Received method %s during closing channel %s. This method will be ignored\ +""" + class AbstractChannel(object): """Superclass for Connection and Channel. @@ -91,6 +99,17 @@ class AbstractChannel(object): pending.pop(m, None) def dispatch_method(self, method_sig, payload, content): + if self.is_closing and method_sig not in ( + self._ALLOWED_METHODS_WHEN_CLOSING + ): + # When channel.close() was called we must ignore all methods except + # Channel.close and Channel.CloseOk + AMQP_LOGGER.warning( + IGNORED_METHOD_DURING_CHANNEL_CLOSE, + method_sig, self.channel_id + ) + return + if content and \ self.auto_decode and \ hasattr(content, 'content_encoding'): diff --git a/amqp/channel.py b/amqp/channel.py index d91ad33..ed7bc1a 100644 --- a/amqp/channel.py +++ b/amqp/channel.py @@ -97,6 +97,10 @@ class Channel(AbstractChannel): } _METHODS = {m.method_sig: m for m in _METHODS} + _ALLOWED_METHODS_WHEN_CLOSING = ( + spec.Channel.Close, spec.Channel.CloseOk + ) + def __init__(self, connection, channel_id=None, auto_decode=True, on_open=None): if channel_id: diff --git a/amqp/connection.py b/amqp/connection.py index 5af06e1..4286f24 100644 --- a/amqp/connection.py +++ b/amqp/connection.py @@ -162,6 +162,10 @@ class Connection(AbstractChannel): } _METHODS = {m.method_sig: m for m in _METHODS} + _ALLOWED_METHODS_WHEN_CLOSING = ( + spec.Connection.Close, spec.Connection.CloseOk + ) + connection_errors = ( ConnectionError, socket.error, @@ -576,10 +580,11 @@ class Connection(AbstractChannel): wait=spec.Connection.CloseOk, ) except (OSError, IOError, SSLError): - self.is_closing = False # close connection self.collect() raise + finally: + self.is_closing = False def _on_close(self, reply_code, reply_text, class_id, method_id): """Request a connection close. diff --git a/t/integration/test_integration.py b/t/integration/test_integration.py index 738e233..cf7c55b 100644 --- a/t/integration/test_integration.py +++ b/t/integration/test_integration.py @@ -366,6 +366,34 @@ class test_connection: ) t.close.assert_called_once_with() + @patch('amqp.Connection._on_blocked') + def test_connecion_ignore_methods_during_close(self, on_blocked_mock): + # Test checking that py-amqp will discard any received methods + # except Close and Close-OK after sending Connecion.Close method + # to server. + frame_writer_cls_mock = Mock() + frame_writer_mock = frame_writer_cls_mock() + conn = Connection(frame_writer=frame_writer_cls_mock) + with patch.object(conn, 'Transport') as transport_mock: + handshake(conn, transport_mock) + frame_writer_mock.reset_mock() + # Inject CloseOk response from broker + transport_mock().read_frame.side_effect = [ + build_frame_type_1( + spec.Connection.Blocked, channel=0 + ), + build_frame_type_1( + spec.Connection.CloseOk + ) + ] + t = conn.transport + conn.close() + on_blocked_mock.assert_not_called() + frame_writer_mock.assert_called_once_with( + 1, 0, spec.Connection.Close, dumps('BsBB', (0, '', 0, 0)), None + ) + t.close.assert_called_once_with() + def test_connection_closed_by_broker(self): # Test that library response correctly CloseOk when # close method is received and _on_close_ok() method is called. @@ -413,6 +441,74 @@ class test_channel: conn.drain_events(0) callback_mock.assert_called_once() + def test_channel_ignore_methods_during_close(self): + # Test checking that py-amqp will discard any received methods + # except Close and Close-OK after sending Channel.Close method + # to server. + frame_writer_cls_mock = Mock() + conn = Connection(frame_writer=frame_writer_cls_mock) + consumer_tag = 'amq.ctag-PCmzXGkhCw_v0Zq7jXyvkg' + with patch.object(conn, 'Transport') as transport_mock: + handshake(conn, transport_mock) + + channel_id = 1 + transport_mock().read_frame.side_effect = [ + # Inject Open Handshake + build_frame_type_1( + spec.Channel.OpenOk, + channel=channel_id, + args=(1, False), + arg_format='Lb' + ), + # Inject basic-deliver response + build_frame_type_1( + spec.Basic.Deliver, + channel=1, + arg_format='sLbss', + args=( + # consumer-tag, delivery-tag, redelivered, + consumer_tag, 1, False, + # exchange-name, routing-key + 'foo_exchange', 'routing-key' + ) + ), + build_frame_type_2( + channel=1, + body_len=12, + properties=b'0\x00\x00\x00\x00\x00\x01' + ), + build_frame_type_3( + channel=1, + body=b'Hello World!' + ), + # Inject close method + build_frame_type_1( + spec.Channel.CloseOk, + channel=channel_id + ), + ] + + frame_writer_mock = frame_writer_cls_mock() + frame_writer_mock.reset_mock() + + with patch('amqp.Channel._on_basic_deliver') as on_deliver_mock: + ch = conn.channel(channel_id=channel_id) + ch.close() + on_deliver_mock.assert_not_called() + frame_writer_mock.assert_has_calls( + [ + call( + 1, 1, spec.Channel.Open, dumps('s', ('',)), + None + ), + call( + 1, 1, spec.Channel.Close, dumps('BsBB', (0, '', 0, 0)), + None + ) + ] + ) + assert ch.is_open is False + def test_channel_open_close(self): # Test checking opening and closing channel frame_writer_cls_mock = Mock() diff --git a/t/unit/test_abstract_channel.py b/t/unit/test_abstract_channel.py index 01acf08..2703302 100644 --- a/t/unit/test_abstract_channel.py +++ b/t/unit/test_abstract_channel.py @@ -3,10 +3,13 @@ from __future__ import absolute_import, unicode_literals import pytest from vine import promise -from amqp.abstract_channel import AbstractChannel +from amqp import spec +from amqp.abstract_channel import ( + AbstractChannel, IGNORED_METHOD_DURING_CHANNEL_CLOSE +) from amqp.exceptions import AMQPNotImplementedError, RecoverableConnectionError from amqp.serialization import dumps -from case import Mock, patch +from case import Mock, patch, sentinel class test_AbstractChannel: @@ -124,3 +127,33 @@ class test_AbstractChannel: p2.assert_called_with((50, 61), 1, 2, 3, self.content) assert not self.c._pending assert self.c._callbacks[(50, 61)] + + @pytest.mark.parametrize( + "method", + ( + spec.Channel.Close, + spec.Channel.CloseOk, + spec.Basic.Deliver + ) + ) + def test_dispatch_method__closing_connection(self, method, caplog): + self.c._ALLOWED_METHODS_WHEN_CLOSING = ( + spec.Channel.Close, spec.Channel.CloseOk + ) + self.c.is_closing = True + with patch.object(self.c, '_METHODS'), \ + patch.object(self.c, '_callbacks'): + self.c.dispatch_method( + method, sentinel.PAYLOAD, sentinel.CONTENT + ) + if method in (spec.Channel.Close, spec.Channel.CloseOk): + self.c._METHODS.__getitem__.assert_called_once_with(method) + self.c._callbacks[method].assert_called_once() + else: + self.c._METHODS.__getitem__.assert_not_called() + self.c._callbacks[method].assert_not_called() + assert caplog.records[0].msg == \ + IGNORED_METHOD_DURING_CHANNEL_CLOSE + assert caplog.records[0].args[0] == method + assert caplog.records[0].args[1] == self.channel_id + assert caplog.records[0].levelname == 'WARNING' diff --git a/t/unit/test_channel.py b/t/unit/test_channel.py index 9cd9ed0..eb65644 100644 --- a/t/unit/test_channel.py +++ b/t/unit/test_channel.py @@ -88,6 +88,7 @@ class test_Channel: (30, 'text', spec.Queue.Declare[0], spec.Queue.Declare[1]), wait=spec.Channel.CloseOk, ) + assert self.c.is_closing is False assert self.c.connection is None def test_on_close(self): |