diff options
author | Matus Valo <matusvalo@users.noreply.github.com> | 2018-11-07 15:19:24 +0100 |
---|---|---|
committer | Omer Katz <omer.drow@gmail.com> | 2018-11-07 16:19:24 +0200 |
commit | b0bc72fa09ad36635f61161e357b8a0b633cf6fb (patch) | |
tree | 9943a4d6e18c0a6080cbbb30d7d3f8343fcc629f | |
parent | 88794b4de3e6ce5d8201ac23df900152c6426a38 (diff) | |
download | py-amqp-b0bc72fa09ad36635f61161e357b8a0b633cf6fb.tar.gz |
Drain events before publish data. (#214)
* Drain events before publish data.
Data are drained to checked if server sent connection blocked/unblocked notification.
* s/assert_called_with/assert_called_once_with/
* Add unittest of connection blocked when broker does not support it
* Improve naming of tests
* Added unittest for publishing when connection is closed
-rw-r--r-- | amqp/channel.py | 9 | ||||
-rw-r--r-- | t/unit/test_channel.py | 53 |
2 files changed, 58 insertions, 4 deletions
diff --git a/amqp/channel.py b/amqp/channel.py index db7b8cc..785ad7b 100644 --- a/amqp/channel.py +++ b/amqp/channel.py @@ -1732,6 +1732,15 @@ class Channel(AbstractChannel): if not self.connection: raise RecoverableConnectionError( 'basic_publish: connection closed') + + client_properties = self.connection.client_properties + if client_properties['capabilities']['connection.blocked']: + try: + # Check if an event was sent, such as the out of memory message + self.connection.drain_events(timeout=0) + except socket.timeout: + pass + try: with self.connection.transport.having_timeout(timeout): return self.send_method( diff --git a/t/unit/test_channel.py b/t/unit/test_channel.py index d6faff4..3e7e5ff 100644 --- a/t/unit/test_channel.py +++ b/t/unit/test_channel.py @@ -1,20 +1,22 @@ from __future__ import absolute_import, unicode_literals import pytest -from case import ContextMock, Mock, patch, ANY +import socket +from case import ContextMock, Mock, patch, ANY, MagicMock from amqp import spec from amqp.platform import pack from amqp.serialization import dumps from amqp.channel import Channel -from amqp.exceptions import ConsumerCancelled, NotFound, MessageNacked +from amqp.exceptions import ConsumerCancelled, NotFound, MessageNacked, \ + RecoverableConnectionError class test_Channel: @pytest.fixture(autouse=True) def setup_conn(self): - self.conn = Mock(name='connection') + self.conn = MagicMock(name='connection') self.conn.channels = {} self.conn._get_free_channel_id.return_value = 2 self.c = Channel(self.conn, 1) @@ -366,7 +368,43 @@ class test_Channel: spec.Basic.Nack, frame, None ) - def test_basic_publsh_confirm_callback(self): + def test_basic_publish_connection_blocked(self): + # Basic test checking that drain_events() is called + # before publishing message and send_method() is called + self.c._basic_publish('msg', 'ex', 'rkey') + self.conn.drain_events.assert_called_once_with(timeout=0) + self.c.send_method.assert_called_once_with( + spec.Basic.Publish, 'Bssbb', + (0, 'ex', 'rkey', False, False), 'msg', + ) + + self.c.send_method.reset_mock() + + # Basic test checking that socket.timeout exception + # is ignored and send_method() is called. + self.conn.drain_events.side_effect = socket.timeout + self.c._basic_publish('msg', 'ex', 'rkey') + self.c.send_method.assert_called_once_with( + spec.Basic.Publish, 'Bssbb', + (0, 'ex', 'rkey', False, False), 'msg', + ) + + def test_basic_publish_connection_blocked_not_supported(self): + # Test veryfying that when server does not have + # connection.blocked capability, drain_events() are not called + self.conn.client_properties = { + 'capabilities': { + 'connection.blocked': False + } + } + self.c._basic_publish('msg', 'ex', 'rkey') + self.conn.drain_events.assert_not_called() + self.c.send_method.assert_called_once_with( + spec.Basic.Publish, 'Bssbb', + (0, 'ex', 'rkey', False, False), 'msg', + ) + + def test_basic_publish_confirm_callback(self): def wait_nack(method, *args, **kwargs): kwargs['callback'](spec.Basic.Nack) @@ -388,6 +426,13 @@ class test_Channel: # it must nost raise exception self.c.basic_publish_confirm(1, 2, arg=1) + def test_basic_publish_connection_closed(self): + self.c.collect() + with pytest.raises(RecoverableConnectionError) as excinfo: + self.c._basic_publish('msg', 'ex', 'rkey') + assert 'basic_publish: connection closed' in str(excinfo.value) + self.c.send_method.assert_not_called() + def test_basic_qos(self): self.c.basic_qos(0, 123, False) self.c.send_method.assert_called_with( |