diff options
author | Matus Valo <matusvalo@gmail.com> | 2019-01-15 15:36:10 -0800 |
---|---|---|
committer | Matus Valo <matusvalo@gmail.com> | 2019-01-15 15:36:10 -0800 |
commit | 8f153ff690a7c17fcb9f681f46bd1face2e96d94 (patch) | |
tree | feb00d3569402a1a636b57c7f3b58f7fd9bd69a9 | |
parent | f507172c1fa88e6c6f50e36762876b3116f4715e (diff) | |
download | py-amqp-fix-243.tar.gz |
Fix crash in basic_publish when broker does not support connection.blocked capabilityfix-243
-rw-r--r-- | amqp/channel.py | 5 | ||||
-rw-r--r-- | t/integration/test_integration.py | 140 | ||||
-rw-r--r-- | t/unit/test_channel.py | 25 |
3 files changed, 162 insertions, 8 deletions
diff --git a/amqp/channel.py b/amqp/channel.py index e4d06ab..39d4cb9 100644 --- a/amqp/channel.py +++ b/amqp/channel.py @@ -1755,8 +1755,9 @@ class Channel(AbstractChannel): raise RecoverableConnectionError( 'basic_publish: connection closed') - client_properties = self.connection.client_properties - if client_properties['capabilities']['connection.blocked']: + capabilities = self.connection.\ + client_properties.get('capabilities', {}) + if capabilities.get('connection.blocked', False): try: # Check if an event was sent, such as the out of memory message self.connection.drain_events(timeout=0) diff --git a/t/integration/test_integration.py b/t/integration/test_integration.py index 57d5e62..2a6228d 100644 --- a/t/integration/test_integration.py +++ b/t/integration/test_integration.py @@ -61,17 +61,17 @@ queue_declare_error_testdata = ( ), ) -CLIENT_CAPABILITIES = { +CLIENT_PROPERTIES = { 'product': 'py-amqp', 'product_version': amqp.__version__, 'capabilities': { 'consumer_cancel_notify': True, 'connection.blocked': True, 'authentication_failure_close': True - } + }, } -SERVER_CAPABILITIES = { +SERVER_PROPERTIES = { 'capabilities': { 'publisher_confirms': True, 'exchange_exchange_bindings': True, @@ -125,13 +125,16 @@ class DataComparator(object): return tuple(values) == tuple(self.items) -def handshake(conn, transport_mock): +def handshake(conn, transport_mock, server_properties=None): # Helper function simulating connection handshake with server + if server_properties is None: + server_properties = SERVER_PROPERTIES + transport_mock().read_frame.side_effect = [ build_frame_type_1( spec.Connection.Start, channel=0, args=( - 0, 9, SERVER_CAPABILITIES, 'AMQPLAIN PLAIN', 'en_US' + 0, 9, server_properties, 'AMQPLAIN PLAIN', 'en_US' ), arg_format='ooFSS' ), @@ -194,7 +197,131 @@ class test_connection: DataComparator( 'FsSs', ( - CLIENT_CAPABILITIES, 'AMQPLAIN', + CLIENT_PROPERTIES, 'AMQPLAIN', + security_mechanism, + 'en_US' + ) + ), + None + ), + call( + 1, 0, spec.Connection.TuneOk, + dumps( + 'BlB', + (conn.channel_max, conn.frame_max, conn.heartbeat) + ), + None + ), + call( + 1, 0, spec.Connection.Open, + dumps('ssb', (conn.virtual_host, '', False)), + None + ) + ] + ) + assert conn.client_properties == CLIENT_PROPERTIES + + def test_connect_no_capabilities(self): + # Test checking connection handshake with broker + # not supporting capabilities + frame_writer_cls_mock = Mock() + on_open_mock = Mock() + frame_writer_mock = frame_writer_cls_mock() + conn = Connection( + frame_writer=frame_writer_cls_mock, on_open=on_open_mock + ) + + with patch.object(conn, 'Transport') as transport_mock: + server_properties = dict(SERVER_PROPERTIES) + del server_properties['capabilities'] + client_properties = dict(CLIENT_PROPERTIES) + del client_properties['capabilities'] + + handshake( + conn, transport_mock, server_properties=server_properties + ) + on_open_mock.assert_called_once_with(conn) + security_mechanism = sasl.AMQPLAIN( + 'guest', 'guest' + ).start(conn).decode('utf-8', 'surrogatepass') + + # Expected responses from client + frame_writer_mock.assert_has_calls( + [ + call( + 1, 0, spec.Connection.StartOk, + # Due Table type, we cannot compare bytestream directly + DataComparator( + 'FsSs', + ( + client_properties, 'AMQPLAIN', + security_mechanism, + 'en_US' + ) + ), + None + ), + call( + 1, 0, spec.Connection.TuneOk, + dumps( + 'BlB', + (conn.channel_max, conn.frame_max, conn.heartbeat) + ), + None + ), + call( + 1, 0, spec.Connection.Open, + dumps('ssb', (conn.virtual_host, '', False)), + None + ) + ] + ) + assert conn.client_properties == client_properties + + def test_connect_missing_capabilities(self): + # Test checking connection handshake with broker + # supporting subset of capabilities + frame_writer_cls_mock = Mock() + on_open_mock = Mock() + frame_writer_mock = frame_writer_cls_mock() + conn = Connection( + frame_writer=frame_writer_cls_mock, on_open=on_open_mock + ) + + with patch.object(conn, 'Transport') as transport_mock: + server_properties = dict(SERVER_PROPERTIES) + server_properties['capabilities'] = { + # This capability is not supported by client + 'basic.nack': True, + 'consumer_cancel_notify': True, + 'connection.blocked': False, + # server does not support 'authentication_failure_close' + # which is supported by client + } + + client_properties = dict(CLIENT_PROPERTIES) + client_properties['capabilities'] = { + 'consumer_cancel_notify': True, + } + + handshake( + conn, transport_mock, server_properties=server_properties + ) + on_open_mock.assert_called_once_with(conn) + security_mechanism = sasl.AMQPLAIN( + 'guest', 'guest' + ).start(conn).decode('utf-8', 'surrogatepass') + + # Expected responses from client + frame_writer_mock.assert_has_calls( + [ + call( + 1, 0, spec.Connection.StartOk, + # Due Table type, we cannot compare bytestream directly + DataComparator( + 'FsSs', + ( + client_properties, 'AMQPLAIN', security_mechanism, 'en_US' ) @@ -216,6 +343,7 @@ class test_connection: ) ] ) + assert conn.client_properties == client_properties def test_connection_close(self): # Test checking closing connection diff --git a/t/unit/test_channel.py b/t/unit/test_channel.py index 507136d..dac1b84 100644 --- a/t/unit/test_channel.py +++ b/t/unit/test_channel.py @@ -469,6 +469,31 @@ class test_Channel: (0, 'ex', 'rkey', False, False), 'msg', ) + def test_basic_publish_connection_blocked_not_supported_missing(self): + # Test veryfying that when server does not have + # connection.blocked capability, drain_events() are not called + self.conn.client_properties = { + 'capabilities': {} + } + self.c._basic_publish('msg', 'ex', 'rkey') + self.conn.drain_events.assert_not_called() + self.c.send_method.assert_called_once_with( + spec.Basic.Publish, 'Bssbb', + (0, 'ex', 'rkey', False, False), 'msg', + ) + + def test_basic_publish_connection_blocked_no_capabilities(self): + # Test veryfying that when server does not have + # support of capabilities, drain_events() are not called + self.conn.client_properties = { + } + self.c._basic_publish('msg', 'ex', 'rkey') + self.conn.drain_events.assert_not_called() + self.c.send_method.assert_called_once_with( + spec.Basic.Publish, 'Bssbb', + (0, 'ex', 'rkey', False, False), 'msg', + ) + def test_basic_publish_confirm_callback(self): def wait_nack(method, *args, **kwargs): |