summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatus Valo <matusvalo@users.noreply.github.com>2019-01-10 11:11:29 +0100
committerAsif Saif Uddin <auvipy@gmail.com>2019-01-10 16:11:29 +0600
commit9dcc0df232ce7f041468ad2948fcbf4a08ab6775 (patch)
tree0c4fb7ba8892a37d16a5ee9fb1531b74497c5835
parent6c8b4b45d622a2ca1a154596e9ea2f3621feb43a (diff)
downloadpy-amqp-9dcc0df232ce7f041468ad2948fcbf4a08ab6775.tar.gz
Move delivery_info to constructor of Message. (#235)
-rw-r--r--amqp/basic_message.py5
-rw-r--r--t/integration/test_integration.py66
-rw-r--r--t/unit/test_basic_message.py3
-rw-r--r--t/unit/test_channel.py22
4 files changed, 89 insertions, 7 deletions
diff --git a/amqp/basic_message.py b/amqp/basic_message.py
index 129a8f8..f0857fa 100644
--- a/amqp/basic_message.py
+++ b/amqp/basic_message.py
@@ -102,11 +102,10 @@ class Message(GenericContent):
('cluster_id', 's')
]
- #: set by basic_consume/basic_get
- delivery_info = None
-
def __init__(self, body='', children=None, channel=None, **properties):
super(Message, self).__init__(**properties)
+ #: set by basic_consume/basic_get
+ self.delivery_info = None
self.body = body
self.channel = channel
diff --git a/t/integration/test_integration.py b/t/integration/test_integration.py
index e682199..d400cad 100644
--- a/t/integration/test_integration.py
+++ b/t/integration/test_integration.py
@@ -2,7 +2,7 @@ from __future__ import absolute_import, unicode_literals
import socket
import pytest
-from case import patch, call, Mock
+from case import patch, call, Mock, ANY
from amqp import spec, Connection, Channel, sasl, Message
from amqp.platform import pack
from amqp.exceptions import ConnectionError, \
@@ -596,6 +596,70 @@ class test_channel:
None
)
+ def test_basic_deliver(self):
+ # Test checking delivering single message
+ callback_mock = Mock()
+ frame_writer_cls_mock = Mock()
+ conn = Connection(frame_writer=frame_writer_cls_mock)
+ consumer_tag = 'amq.ctag-PCmzXGkhCw_v0Zq7jXyvkg'
+ with patch.object(conn, 'Transport') as transport_mock:
+ handshake(conn, transport_mock)
+ ch = create_channel(1, conn, transport_mock)
+
+ # Inject ConsumeOk response from Broker
+ transport_mock().read_frame.side_effect = [
+ # Inject Consume-ok response
+ build_frame_type_1(
+ spec.Basic.ConsumeOk,
+ channel=1,
+ args=(consumer_tag,),
+ arg_format='s'
+ ),
+ # Inject basic-deliver response
+ build_frame_type_1(
+ spec.Basic.Deliver,
+ channel=1,
+ arg_format='sLbss',
+ args=(
+ # consumer-tag, delivery-tag, redelivered,
+ consumer_tag, 1, False,
+ # exchange-name, routing-key
+ 'foo_exchange', 'routing-key'
+ )
+ ),
+ build_frame_type_2(
+ channel=1,
+ body_len=12,
+ properties=b'0\x00\x00\x00\x00\x00\x01'
+ ),
+ build_frame_type_3(
+ channel=1,
+ body=b'Hello World!'
+ ),
+ ]
+ frame_writer_mock = frame_writer_cls_mock()
+ frame_writer_mock.reset_mock()
+ ch.basic_consume('my_queue', callback=callback_mock)
+ conn.drain_events()
+ callback_mock.assert_called_once_with(ANY)
+ msg = callback_mock.call_args[0][0]
+ assert isinstance(msg, Message)
+ assert msg.body_size == 12
+ assert msg.body == b'Hello World!'
+ assert msg.frame_method == spec.Basic.Deliver
+ assert msg.delivery_tag == 1
+ assert msg.ready is True
+ assert msg.delivery_info == {
+ 'consumer_tag': 'amq.ctag-PCmzXGkhCw_v0Zq7jXyvkg',
+ 'delivery_tag': 1,
+ 'redelivered': False,
+ 'exchange': 'foo_exchange',
+ 'routing_key': 'routing-key'
+ }
+ assert msg.properties == {
+ 'application_headers': {}, 'delivery_mode': 1
+ }
+
def test_queue_get(self):
# Test verifying getting message from queue
frame_writer_cls_mock = Mock()
diff --git a/t/unit/test_basic_message.py b/t/unit/test_basic_message.py
index aea8cdb..87225e2 100644
--- a/t/unit/test_basic_message.py
+++ b/t/unit/test_basic_message.py
@@ -13,7 +13,8 @@ class test_Message:
channel=Mock(name='channel'),
application_headers={'h': 'v'},
)
- m.delivery_info = {'delivery_tag': '1234'},
+ m.delivery_info = {'delivery_tag': '1234'}
assert m.body == 'foo'
assert m.channel
assert m.headers == {'h': 'v'}
+ assert m.delivery_tag == '1234'
diff --git a/t/unit/test_channel.py b/t/unit/test_channel.py
index bcea906..ff4e415 100644
--- a/t/unit/test_channel.py
+++ b/t/unit/test_channel.py
@@ -5,6 +5,7 @@ import socket
from case import ContextMock, Mock, patch, ANY, MagicMock
from amqp import spec
+from amqp.basic_message import Message
from amqp.platform import pack
from amqp.serialization import dumps
from amqp.channel import Channel
@@ -328,11 +329,20 @@ class test_Channel:
assert 123 not in self.c.callbacks
def test_on_basic_deliver(self):
- msg = Mock()
+ msg = Message()
self.c._on_basic_deliver(123, '321', False, 'ex', 'rkey', msg)
callback = self.c.callbacks[123] = Mock(name='cb')
+
self.c._on_basic_deliver(123, '321', False, 'ex', 'rkey', msg)
callback.assert_called_with(msg)
+ assert msg.channel == self.c
+ assert msg.delivery_info == {
+ 'consumer_tag': 123,
+ 'delivery_tag': '321',
+ 'redelivered': False,
+ 'exchange': 'ex',
+ 'routing_key': 'rkey',
+ }
def test_basic_get(self):
self.c._on_get_empty = Mock()
@@ -356,11 +366,19 @@ class test_Channel:
self.c._on_get_empty(1)
def test_on_get_ok(self):
- msg = Mock()
+ msg = Message()
m = self.c._on_get_ok(
'dtag', 'redelivered', 'ex', 'rkey', 'mcount', msg,
)
assert m is msg
+ assert m.channel == self.c
+ assert m.delivery_info == {
+ 'delivery_tag': 'dtag',
+ 'redelivered': 'redelivered',
+ 'exchange': 'ex',
+ 'routing_key': 'rkey',
+ 'message_count': 'mcount',
+ }
def test_basic_publish(self):
self.c.connection.transport.having_timeout = ContextMock()