summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatus Valo <matusvalo@gmail.com>2019-06-12 07:28:53 +0000
committerMatus Valo <matusvalo@gmail.com>2019-06-12 22:43:37 +0000
commit99ec59ccb263cde370f6b318b92051d213bf4280 (patch)
tree7801e67e1305f3bcd403548be54977e12b5c16d3
parent39e81d64667405325a55baeeb3aaa7988013b9b8 (diff)
downloadpy-amqp-connection_channel_close.tar.gz
Ignore all methods except Close and Close-OK when channel/connection is closingconnection_channel_close
-rw-r--r--amqp/abstract_channel.py19
-rw-r--r--amqp/channel.py4
-rw-r--r--amqp/connection.py7
-rw-r--r--t/integration/test_integration.py96
-rw-r--r--t/unit/test_abstract_channel.py37
-rw-r--r--t/unit/test_channel.py1
6 files changed, 161 insertions, 3 deletions
diff --git a/amqp/abstract_channel.py b/amqp/abstract_channel.py
index 8530bec..5b6da0d 100644
--- a/amqp/abstract_channel.py
+++ b/amqp/abstract_channel.py
@@ -2,6 +2,8 @@
# Copyright (C) 2007-2008 Barry Pederson <bp@barryp.org>)
from __future__ import absolute_import, unicode_literals
+import logging
+
from vine import ensure_promise, promise
from .exceptions import AMQPNotImplementedError, RecoverableConnectionError
@@ -10,6 +12,12 @@ from .serialization import dumps, loads
__all__ = ['AbstractChannel']
+AMQP_LOGGER = logging.getLogger('amqp')
+
+IGNORED_METHOD_DURING_CHANNEL_CLOSE = """\
+Received method %s during closing channel %s. This method will be ignored\
+"""
+
class AbstractChannel(object):
"""Superclass for Connection and Channel.
@@ -91,6 +99,17 @@ class AbstractChannel(object):
pending.pop(m, None)
def dispatch_method(self, method_sig, payload, content):
+ if self.is_closing and method_sig not in (
+ self._ALLOWED_METHODS_WHEN_CLOSING
+ ):
+ # When channel.close() was called we must ignore all methods except
+ # Channel.close and Channel.CloseOk
+ AMQP_LOGGER.warning(
+ IGNORED_METHOD_DURING_CHANNEL_CLOSE,
+ method_sig, self.channel_id
+ )
+ return
+
if content and \
self.auto_decode and \
hasattr(content, 'content_encoding'):
diff --git a/amqp/channel.py b/amqp/channel.py
index d91ad33..ed7bc1a 100644
--- a/amqp/channel.py
+++ b/amqp/channel.py
@@ -97,6 +97,10 @@ class Channel(AbstractChannel):
}
_METHODS = {m.method_sig: m for m in _METHODS}
+ _ALLOWED_METHODS_WHEN_CLOSING = (
+ spec.Channel.Close, spec.Channel.CloseOk
+ )
+
def __init__(self, connection,
channel_id=None, auto_decode=True, on_open=None):
if channel_id:
diff --git a/amqp/connection.py b/amqp/connection.py
index 5af06e1..4286f24 100644
--- a/amqp/connection.py
+++ b/amqp/connection.py
@@ -162,6 +162,10 @@ class Connection(AbstractChannel):
}
_METHODS = {m.method_sig: m for m in _METHODS}
+ _ALLOWED_METHODS_WHEN_CLOSING = (
+ spec.Connection.Close, spec.Connection.CloseOk
+ )
+
connection_errors = (
ConnectionError,
socket.error,
@@ -576,10 +580,11 @@ class Connection(AbstractChannel):
wait=spec.Connection.CloseOk,
)
except (OSError, IOError, SSLError):
- self.is_closing = False
# close connection
self.collect()
raise
+ finally:
+ self.is_closing = False
def _on_close(self, reply_code, reply_text, class_id, method_id):
"""Request a connection close.
diff --git a/t/integration/test_integration.py b/t/integration/test_integration.py
index 738e233..cf7c55b 100644
--- a/t/integration/test_integration.py
+++ b/t/integration/test_integration.py
@@ -366,6 +366,34 @@ class test_connection:
)
t.close.assert_called_once_with()
+ @patch('amqp.Connection._on_blocked')
+ def test_connecion_ignore_methods_during_close(self, on_blocked_mock):
+ # Test checking that py-amqp will discard any received methods
+ # except Close and Close-OK after sending Connecion.Close method
+ # to server.
+ frame_writer_cls_mock = Mock()
+ frame_writer_mock = frame_writer_cls_mock()
+ conn = Connection(frame_writer=frame_writer_cls_mock)
+ with patch.object(conn, 'Transport') as transport_mock:
+ handshake(conn, transport_mock)
+ frame_writer_mock.reset_mock()
+ # Inject CloseOk response from broker
+ transport_mock().read_frame.side_effect = [
+ build_frame_type_1(
+ spec.Connection.Blocked, channel=0
+ ),
+ build_frame_type_1(
+ spec.Connection.CloseOk
+ )
+ ]
+ t = conn.transport
+ conn.close()
+ on_blocked_mock.assert_not_called()
+ frame_writer_mock.assert_called_once_with(
+ 1, 0, spec.Connection.Close, dumps('BsBB', (0, '', 0, 0)), None
+ )
+ t.close.assert_called_once_with()
+
def test_connection_closed_by_broker(self):
# Test that library response correctly CloseOk when
# close method is received and _on_close_ok() method is called.
@@ -413,6 +441,74 @@ class test_channel:
conn.drain_events(0)
callback_mock.assert_called_once()
+ def test_channel_ignore_methods_during_close(self):
+ # Test checking that py-amqp will discard any received methods
+ # except Close and Close-OK after sending Channel.Close method
+ # to server.
+ frame_writer_cls_mock = Mock()
+ conn = Connection(frame_writer=frame_writer_cls_mock)
+ consumer_tag = 'amq.ctag-PCmzXGkhCw_v0Zq7jXyvkg'
+ with patch.object(conn, 'Transport') as transport_mock:
+ handshake(conn, transport_mock)
+
+ channel_id = 1
+ transport_mock().read_frame.side_effect = [
+ # Inject Open Handshake
+ build_frame_type_1(
+ spec.Channel.OpenOk,
+ channel=channel_id,
+ args=(1, False),
+ arg_format='Lb'
+ ),
+ # Inject basic-deliver response
+ build_frame_type_1(
+ spec.Basic.Deliver,
+ channel=1,
+ arg_format='sLbss',
+ args=(
+ # consumer-tag, delivery-tag, redelivered,
+ consumer_tag, 1, False,
+ # exchange-name, routing-key
+ 'foo_exchange', 'routing-key'
+ )
+ ),
+ build_frame_type_2(
+ channel=1,
+ body_len=12,
+ properties=b'0\x00\x00\x00\x00\x00\x01'
+ ),
+ build_frame_type_3(
+ channel=1,
+ body=b'Hello World!'
+ ),
+ # Inject close method
+ build_frame_type_1(
+ spec.Channel.CloseOk,
+ channel=channel_id
+ ),
+ ]
+
+ frame_writer_mock = frame_writer_cls_mock()
+ frame_writer_mock.reset_mock()
+
+ with patch('amqp.Channel._on_basic_deliver') as on_deliver_mock:
+ ch = conn.channel(channel_id=channel_id)
+ ch.close()
+ on_deliver_mock.assert_not_called()
+ frame_writer_mock.assert_has_calls(
+ [
+ call(
+ 1, 1, spec.Channel.Open, dumps('s', ('',)),
+ None
+ ),
+ call(
+ 1, 1, spec.Channel.Close, dumps('BsBB', (0, '', 0, 0)),
+ None
+ )
+ ]
+ )
+ assert ch.is_open is False
+
def test_channel_open_close(self):
# Test checking opening and closing channel
frame_writer_cls_mock = Mock()
diff --git a/t/unit/test_abstract_channel.py b/t/unit/test_abstract_channel.py
index 01acf08..2703302 100644
--- a/t/unit/test_abstract_channel.py
+++ b/t/unit/test_abstract_channel.py
@@ -3,10 +3,13 @@ from __future__ import absolute_import, unicode_literals
import pytest
from vine import promise
-from amqp.abstract_channel import AbstractChannel
+from amqp import spec
+from amqp.abstract_channel import (
+ AbstractChannel, IGNORED_METHOD_DURING_CHANNEL_CLOSE
+)
from amqp.exceptions import AMQPNotImplementedError, RecoverableConnectionError
from amqp.serialization import dumps
-from case import Mock, patch
+from case import Mock, patch, sentinel
class test_AbstractChannel:
@@ -124,3 +127,33 @@ class test_AbstractChannel:
p2.assert_called_with((50, 61), 1, 2, 3, self.content)
assert not self.c._pending
assert self.c._callbacks[(50, 61)]
+
+ @pytest.mark.parametrize(
+ "method",
+ (
+ spec.Channel.Close,
+ spec.Channel.CloseOk,
+ spec.Basic.Deliver
+ )
+ )
+ def test_dispatch_method__closing_connection(self, method, caplog):
+ self.c._ALLOWED_METHODS_WHEN_CLOSING = (
+ spec.Channel.Close, spec.Channel.CloseOk
+ )
+ self.c.is_closing = True
+ with patch.object(self.c, '_METHODS'), \
+ patch.object(self.c, '_callbacks'):
+ self.c.dispatch_method(
+ method, sentinel.PAYLOAD, sentinel.CONTENT
+ )
+ if method in (spec.Channel.Close, spec.Channel.CloseOk):
+ self.c._METHODS.__getitem__.assert_called_once_with(method)
+ self.c._callbacks[method].assert_called_once()
+ else:
+ self.c._METHODS.__getitem__.assert_not_called()
+ self.c._callbacks[method].assert_not_called()
+ assert caplog.records[0].msg == \
+ IGNORED_METHOD_DURING_CHANNEL_CLOSE
+ assert caplog.records[0].args[0] == method
+ assert caplog.records[0].args[1] == self.channel_id
+ assert caplog.records[0].levelname == 'WARNING'
diff --git a/t/unit/test_channel.py b/t/unit/test_channel.py
index 9cd9ed0..eb65644 100644
--- a/t/unit/test_channel.py
+++ b/t/unit/test_channel.py
@@ -88,6 +88,7 @@ class test_Channel:
(30, 'text', spec.Queue.Declare[0], spec.Queue.Declare[1]),
wait=spec.Channel.CloseOk,
)
+ assert self.c.is_closing is False
assert self.c.connection is None
def test_on_close(self):