summaryrefslogtreecommitdiff
path: root/t/integration/test_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 't/integration/test_integration.py')
-rw-r--r--t/integration/test_integration.py96
1 files changed, 96 insertions, 0 deletions
diff --git a/t/integration/test_integration.py b/t/integration/test_integration.py
index 738e233..cf7c55b 100644
--- a/t/integration/test_integration.py
+++ b/t/integration/test_integration.py
@@ -366,6 +366,34 @@ class test_connection:
)
t.close.assert_called_once_with()
+ @patch('amqp.Connection._on_blocked')
+ def test_connecion_ignore_methods_during_close(self, on_blocked_mock):
+ # Test checking that py-amqp will discard any received methods
+ # except Close and Close-OK after sending Connecion.Close method
+ # to server.
+ frame_writer_cls_mock = Mock()
+ frame_writer_mock = frame_writer_cls_mock()
+ conn = Connection(frame_writer=frame_writer_cls_mock)
+ with patch.object(conn, 'Transport') as transport_mock:
+ handshake(conn, transport_mock)
+ frame_writer_mock.reset_mock()
+ # Inject CloseOk response from broker
+ transport_mock().read_frame.side_effect = [
+ build_frame_type_1(
+ spec.Connection.Blocked, channel=0
+ ),
+ build_frame_type_1(
+ spec.Connection.CloseOk
+ )
+ ]
+ t = conn.transport
+ conn.close()
+ on_blocked_mock.assert_not_called()
+ frame_writer_mock.assert_called_once_with(
+ 1, 0, spec.Connection.Close, dumps('BsBB', (0, '', 0, 0)), None
+ )
+ t.close.assert_called_once_with()
+
def test_connection_closed_by_broker(self):
# Test that library response correctly CloseOk when
# close method is received and _on_close_ok() method is called.
@@ -413,6 +441,74 @@ class test_channel:
conn.drain_events(0)
callback_mock.assert_called_once()
+ def test_channel_ignore_methods_during_close(self):
+ # Test checking that py-amqp will discard any received methods
+ # except Close and Close-OK after sending Channel.Close method
+ # to server.
+ frame_writer_cls_mock = Mock()
+ conn = Connection(frame_writer=frame_writer_cls_mock)
+ consumer_tag = 'amq.ctag-PCmzXGkhCw_v0Zq7jXyvkg'
+ with patch.object(conn, 'Transport') as transport_mock:
+ handshake(conn, transport_mock)
+
+ channel_id = 1
+ transport_mock().read_frame.side_effect = [
+ # Inject Open Handshake
+ build_frame_type_1(
+ spec.Channel.OpenOk,
+ channel=channel_id,
+ args=(1, False),
+ arg_format='Lb'
+ ),
+ # Inject basic-deliver response
+ build_frame_type_1(
+ spec.Basic.Deliver,
+ channel=1,
+ arg_format='sLbss',
+ args=(
+ # consumer-tag, delivery-tag, redelivered,
+ consumer_tag, 1, False,
+ # exchange-name, routing-key
+ 'foo_exchange', 'routing-key'
+ )
+ ),
+ build_frame_type_2(
+ channel=1,
+ body_len=12,
+ properties=b'0\x00\x00\x00\x00\x00\x01'
+ ),
+ build_frame_type_3(
+ channel=1,
+ body=b'Hello World!'
+ ),
+ # Inject close method
+ build_frame_type_1(
+ spec.Channel.CloseOk,
+ channel=channel_id
+ ),
+ ]
+
+ frame_writer_mock = frame_writer_cls_mock()
+ frame_writer_mock.reset_mock()
+
+ with patch('amqp.Channel._on_basic_deliver') as on_deliver_mock:
+ ch = conn.channel(channel_id=channel_id)
+ ch.close()
+ on_deliver_mock.assert_not_called()
+ frame_writer_mock.assert_has_calls(
+ [
+ call(
+ 1, 1, spec.Channel.Open, dumps('s', ('',)),
+ None
+ ),
+ call(
+ 1, 1, spec.Channel.Close, dumps('BsBB', (0, '', 0, 0)),
+ None
+ )
+ ]
+ )
+ assert ch.is_open is False
+
def test_channel_open_close(self):
# Test checking opening and closing channel
frame_writer_cls_mock = Mock()