summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatus Valo <matusvalo@users.noreply.github.com>2018-11-07 15:19:24 +0100
committerOmer Katz <omer.drow@gmail.com>2018-11-07 16:19:24 +0200
commitb0bc72fa09ad36635f61161e357b8a0b633cf6fb (patch)
tree9943a4d6e18c0a6080cbbb30d7d3f8343fcc629f
parent88794b4de3e6ce5d8201ac23df900152c6426a38 (diff)
downloadpy-amqp-b0bc72fa09ad36635f61161e357b8a0b633cf6fb.tar.gz
Drain events before publish data. (#214)
* Drain events before publish data. Data are drained to checked if server sent connection blocked/unblocked notification. * s/assert_called_with/assert_called_once_with/ * Add unittest of connection blocked when broker does not support it * Improve naming of tests * Added unittest for publishing when connection is closed
-rw-r--r--amqp/channel.py9
-rw-r--r--t/unit/test_channel.py53
2 files changed, 58 insertions, 4 deletions
diff --git a/amqp/channel.py b/amqp/channel.py
index db7b8cc..785ad7b 100644
--- a/amqp/channel.py
+++ b/amqp/channel.py
@@ -1732,6 +1732,15 @@ class Channel(AbstractChannel):
if not self.connection:
raise RecoverableConnectionError(
'basic_publish: connection closed')
+
+ client_properties = self.connection.client_properties
+ if client_properties['capabilities']['connection.blocked']:
+ try:
+ # Check if an event was sent, such as the out of memory message
+ self.connection.drain_events(timeout=0)
+ except socket.timeout:
+ pass
+
try:
with self.connection.transport.having_timeout(timeout):
return self.send_method(
diff --git a/t/unit/test_channel.py b/t/unit/test_channel.py
index d6faff4..3e7e5ff 100644
--- a/t/unit/test_channel.py
+++ b/t/unit/test_channel.py
@@ -1,20 +1,22 @@
from __future__ import absolute_import, unicode_literals
import pytest
-from case import ContextMock, Mock, patch, ANY
+import socket
+from case import ContextMock, Mock, patch, ANY, MagicMock
from amqp import spec
from amqp.platform import pack
from amqp.serialization import dumps
from amqp.channel import Channel
-from amqp.exceptions import ConsumerCancelled, NotFound, MessageNacked
+from amqp.exceptions import ConsumerCancelled, NotFound, MessageNacked, \
+ RecoverableConnectionError
class test_Channel:
@pytest.fixture(autouse=True)
def setup_conn(self):
- self.conn = Mock(name='connection')
+ self.conn = MagicMock(name='connection')
self.conn.channels = {}
self.conn._get_free_channel_id.return_value = 2
self.c = Channel(self.conn, 1)
@@ -366,7 +368,43 @@ class test_Channel:
spec.Basic.Nack, frame, None
)
- def test_basic_publsh_confirm_callback(self):
+ def test_basic_publish_connection_blocked(self):
+ # Basic test checking that drain_events() is called
+ # before publishing message and send_method() is called
+ self.c._basic_publish('msg', 'ex', 'rkey')
+ self.conn.drain_events.assert_called_once_with(timeout=0)
+ self.c.send_method.assert_called_once_with(
+ spec.Basic.Publish, 'Bssbb',
+ (0, 'ex', 'rkey', False, False), 'msg',
+ )
+
+ self.c.send_method.reset_mock()
+
+ # Basic test checking that socket.timeout exception
+ # is ignored and send_method() is called.
+ self.conn.drain_events.side_effect = socket.timeout
+ self.c._basic_publish('msg', 'ex', 'rkey')
+ self.c.send_method.assert_called_once_with(
+ spec.Basic.Publish, 'Bssbb',
+ (0, 'ex', 'rkey', False, False), 'msg',
+ )
+
+ def test_basic_publish_connection_blocked_not_supported(self):
+ # Test veryfying that when server does not have
+ # connection.blocked capability, drain_events() are not called
+ self.conn.client_properties = {
+ 'capabilities': {
+ 'connection.blocked': False
+ }
+ }
+ self.c._basic_publish('msg', 'ex', 'rkey')
+ self.conn.drain_events.assert_not_called()
+ self.c.send_method.assert_called_once_with(
+ spec.Basic.Publish, 'Bssbb',
+ (0, 'ex', 'rkey', False, False), 'msg',
+ )
+
+ def test_basic_publish_confirm_callback(self):
def wait_nack(method, *args, **kwargs):
kwargs['callback'](spec.Basic.Nack)
@@ -388,6 +426,13 @@ class test_Channel:
# it must nost raise exception
self.c.basic_publish_confirm(1, 2, arg=1)
+ def test_basic_publish_connection_closed(self):
+ self.c.collect()
+ with pytest.raises(RecoverableConnectionError) as excinfo:
+ self.c._basic_publish('msg', 'ex', 'rkey')
+ assert 'basic_publish: connection closed' in str(excinfo.value)
+ self.c.send_method.assert_not_called()
+
def test_basic_qos(self):
self.c.basic_qos(0, 123, False)
self.c.send_method.assert_called_with(