summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatus Valo <matusvalo@gmail.com>2019-01-15 15:36:10 -0800
committerMatus Valo <matusvalo@gmail.com>2019-01-15 15:36:10 -0800
commit8f153ff690a7c17fcb9f681f46bd1face2e96d94 (patch)
treefeb00d3569402a1a636b57c7f3b58f7fd9bd69a9
parentf507172c1fa88e6c6f50e36762876b3116f4715e (diff)
downloadpy-amqp-fix-243.tar.gz
Fix crash in basic_publish when broker does not support connection.blocked capabilityfix-243
-rw-r--r--amqp/channel.py5
-rw-r--r--t/integration/test_integration.py140
-rw-r--r--t/unit/test_channel.py25
3 files changed, 162 insertions, 8 deletions
diff --git a/amqp/channel.py b/amqp/channel.py
index e4d06ab..39d4cb9 100644
--- a/amqp/channel.py
+++ b/amqp/channel.py
@@ -1755,8 +1755,9 @@ class Channel(AbstractChannel):
raise RecoverableConnectionError(
'basic_publish: connection closed')
- client_properties = self.connection.client_properties
- if client_properties['capabilities']['connection.blocked']:
+ capabilities = self.connection.\
+ client_properties.get('capabilities', {})
+ if capabilities.get('connection.blocked', False):
try:
# Check if an event was sent, such as the out of memory message
self.connection.drain_events(timeout=0)
diff --git a/t/integration/test_integration.py b/t/integration/test_integration.py
index 57d5e62..2a6228d 100644
--- a/t/integration/test_integration.py
+++ b/t/integration/test_integration.py
@@ -61,17 +61,17 @@ queue_declare_error_testdata = (
),
)
-CLIENT_CAPABILITIES = {
+CLIENT_PROPERTIES = {
'product': 'py-amqp',
'product_version': amqp.__version__,
'capabilities': {
'consumer_cancel_notify': True,
'connection.blocked': True,
'authentication_failure_close': True
- }
+ },
}
-SERVER_CAPABILITIES = {
+SERVER_PROPERTIES = {
'capabilities': {
'publisher_confirms': True,
'exchange_exchange_bindings': True,
@@ -125,13 +125,16 @@ class DataComparator(object):
return tuple(values) == tuple(self.items)
-def handshake(conn, transport_mock):
+def handshake(conn, transport_mock, server_properties=None):
# Helper function simulating connection handshake with server
+ if server_properties is None:
+ server_properties = SERVER_PROPERTIES
+
transport_mock().read_frame.side_effect = [
build_frame_type_1(
spec.Connection.Start, channel=0,
args=(
- 0, 9, SERVER_CAPABILITIES, 'AMQPLAIN PLAIN', 'en_US'
+ 0, 9, server_properties, 'AMQPLAIN PLAIN', 'en_US'
),
arg_format='ooFSS'
),
@@ -194,7 +197,131 @@ class test_connection:
DataComparator(
'FsSs',
(
- CLIENT_CAPABILITIES, 'AMQPLAIN',
+ CLIENT_PROPERTIES, 'AMQPLAIN',
+ security_mechanism,
+ 'en_US'
+ )
+ ),
+ None
+ ),
+ call(
+ 1, 0, spec.Connection.TuneOk,
+ dumps(
+ 'BlB',
+ (conn.channel_max, conn.frame_max, conn.heartbeat)
+ ),
+ None
+ ),
+ call(
+ 1, 0, spec.Connection.Open,
+ dumps('ssb', (conn.virtual_host, '', False)),
+ None
+ )
+ ]
+ )
+ assert conn.client_properties == CLIENT_PROPERTIES
+
+ def test_connect_no_capabilities(self):
+ # Test checking connection handshake with broker
+ # not supporting capabilities
+ frame_writer_cls_mock = Mock()
+ on_open_mock = Mock()
+ frame_writer_mock = frame_writer_cls_mock()
+ conn = Connection(
+ frame_writer=frame_writer_cls_mock, on_open=on_open_mock
+ )
+
+ with patch.object(conn, 'Transport') as transport_mock:
+ server_properties = dict(SERVER_PROPERTIES)
+ del server_properties['capabilities']
+ client_properties = dict(CLIENT_PROPERTIES)
+ del client_properties['capabilities']
+
+ handshake(
+ conn, transport_mock, server_properties=server_properties
+ )
+ on_open_mock.assert_called_once_with(conn)
+ security_mechanism = sasl.AMQPLAIN(
+ 'guest', 'guest'
+ ).start(conn).decode('utf-8', 'surrogatepass')
+
+ # Expected responses from client
+ frame_writer_mock.assert_has_calls(
+ [
+ call(
+ 1, 0, spec.Connection.StartOk,
+ # Due Table type, we cannot compare bytestream directly
+ DataComparator(
+ 'FsSs',
+ (
+ client_properties, 'AMQPLAIN',
+ security_mechanism,
+ 'en_US'
+ )
+ ),
+ None
+ ),
+ call(
+ 1, 0, spec.Connection.TuneOk,
+ dumps(
+ 'BlB',
+ (conn.channel_max, conn.frame_max, conn.heartbeat)
+ ),
+ None
+ ),
+ call(
+ 1, 0, spec.Connection.Open,
+ dumps('ssb', (conn.virtual_host, '', False)),
+ None
+ )
+ ]
+ )
+ assert conn.client_properties == client_properties
+
+ def test_connect_missing_capabilities(self):
+ # Test checking connection handshake with broker
+ # supporting subset of capabilities
+ frame_writer_cls_mock = Mock()
+ on_open_mock = Mock()
+ frame_writer_mock = frame_writer_cls_mock()
+ conn = Connection(
+ frame_writer=frame_writer_cls_mock, on_open=on_open_mock
+ )
+
+ with patch.object(conn, 'Transport') as transport_mock:
+ server_properties = dict(SERVER_PROPERTIES)
+ server_properties['capabilities'] = {
+ # This capability is not supported by client
+ 'basic.nack': True,
+ 'consumer_cancel_notify': True,
+ 'connection.blocked': False,
+ # server does not support 'authentication_failure_close'
+ # which is supported by client
+ }
+
+ client_properties = dict(CLIENT_PROPERTIES)
+ client_properties['capabilities'] = {
+ 'consumer_cancel_notify': True,
+ }
+
+ handshake(
+ conn, transport_mock, server_properties=server_properties
+ )
+ on_open_mock.assert_called_once_with(conn)
+ security_mechanism = sasl.AMQPLAIN(
+ 'guest', 'guest'
+ ).start(conn).decode('utf-8', 'surrogatepass')
+
+ # Expected responses from client
+ frame_writer_mock.assert_has_calls(
+ [
+ call(
+ 1, 0, spec.Connection.StartOk,
+ # Due Table type, we cannot compare bytestream directly
+ DataComparator(
+ 'FsSs',
+ (
+ client_properties, 'AMQPLAIN',
security_mechanism,
'en_US'
)
@@ -216,6 +343,7 @@ class test_connection:
)
]
)
+ assert conn.client_properties == client_properties
def test_connection_close(self):
# Test checking closing connection
diff --git a/t/unit/test_channel.py b/t/unit/test_channel.py
index 507136d..dac1b84 100644
--- a/t/unit/test_channel.py
+++ b/t/unit/test_channel.py
@@ -469,6 +469,31 @@ class test_Channel:
(0, 'ex', 'rkey', False, False), 'msg',
)
+ def test_basic_publish_connection_blocked_not_supported_missing(self):
+ # Test veryfying that when server does not have
+ # connection.blocked capability, drain_events() are not called
+ self.conn.client_properties = {
+ 'capabilities': {}
+ }
+ self.c._basic_publish('msg', 'ex', 'rkey')
+ self.conn.drain_events.assert_not_called()
+ self.c.send_method.assert_called_once_with(
+ spec.Basic.Publish, 'Bssbb',
+ (0, 'ex', 'rkey', False, False), 'msg',
+ )
+
+ def test_basic_publish_connection_blocked_no_capabilities(self):
+ # Test veryfying that when server does not have
+ # support of capabilities, drain_events() are not called
+ self.conn.client_properties = {
+ }
+ self.c._basic_publish('msg', 'ex', 'rkey')
+ self.conn.drain_events.assert_not_called()
+ self.c.send_method.assert_called_once_with(
+ spec.Basic.Publish, 'Bssbb',
+ (0, 'ex', 'rkey', False, False), 'msg',
+ )
+
def test_basic_publish_confirm_callback(self):
def wait_nack(method, *args, **kwargs):