summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatus Valo <matusvalo@users.noreply.github.com>2019-01-04 06:18:40 +0100
committerAsif Saif Uddin <auvipy@gmail.com>2019-01-04 11:18:40 +0600
commit6c8b4b45d622a2ca1a154596e9ea2f3621feb43a (patch)
treefb3f67499afcd38a8ad46386d0beba472593a5a2
parentccddd02a71cb53f7aa6f4bad7397dc9060a08dfb (diff)
downloadpy-amqp-6c8b4b45d622a2ca1a154596e9ea2f3621feb43a.tar.gz
queue, exchange and basic.get integration tests (#234)
* Improve return values in doc strings * Added queue and exchange integration tests
-rw-r--r--amqp/channel.py11
-rw-r--r--t/integration/test_integration.py405
2 files changed, 376 insertions, 40 deletions
diff --git a/amqp/channel.py b/amqp/channel.py
index 4942a82..42fee9f 100644
--- a/amqp/channel.py
+++ b/amqp/channel.py
@@ -1140,8 +1140,8 @@ class Channel(AbstractChannel):
True.
Returns a tuple containing 3 items:
- the name of the queue (essential for automatically-named queues)
- message count
+ the name of the queue (essential for automatically-named queues),
+ message count and
consumer count
"""
self.send_method(
@@ -1220,6 +1220,8 @@ class Channel(AbstractChannel):
client should not wait for a reply method. If the
server could not complete the method it will raise a
channel or connection exception.
+
+ If nowait is False, returns the number of deleted messages.
"""
return self.send_method(
spec.Queue.Delete, argsig,
@@ -1280,7 +1282,7 @@ class Channel(AbstractChannel):
server could not complete the method it will raise a
channel or connection exception.
- if nowait is False, returns a message_count
+ If nowait is False, returns a number of purged messages.
"""
return self.send_method(
spec.Queue.Purge, argsig, (0, queue, nowait),
@@ -1647,7 +1649,8 @@ class Channel(AbstractChannel):
reliability. Messages can get lost if a client dies
before it can deliver them to the application.
- Non-blocking, returns a message object, or None.
+ Non-blocking, returns a amqp.basic_message.Message object,
+ or None if queue is empty.
"""
ret = self.send_method(
spec.Basic.Get, argsig, (0, queue, no_ack),
diff --git a/t/integration/test_integration.py b/t/integration/test_integration.py
index 3a20540..e682199 100644
--- a/t/integration/test_integration.py
+++ b/t/integration/test_integration.py
@@ -5,20 +5,11 @@ import pytest
from case import patch, call, Mock
from amqp import spec, Connection, Channel, sasl, Message
from amqp.platform import pack
-from amqp.exceptions import ConnectionError
+from amqp.exceptions import ConnectionError, \
+ InvalidCommand, AccessRefused, PreconditionFailed, NotFound, ResourceLocked
from amqp.serialization import dumps, loads
from amqp.protocol import queue_declare_ok_t
-
-def ret_factory(method, channel=0, args=b'', arg_format=None):
- if len(args) > 0:
- args = dumps(arg_format, args)
- else:
- args = b''
- frame = (b''.join([pack('>HH', *method), args]))
- return 1, channel, frame
-
-
connection_testdata = (
(spec.Connection.Blocked, '_on_blocked'),
(spec.Connection.Unblocked, '_on_unblocked'),
@@ -26,13 +17,49 @@ connection_testdata = (
(spec.Connection.CloseOk, '_on_close_ok'),
)
-
channel_testdata = (
(spec.Basic.Ack, '_on_basic_ack'),
(spec.Basic.Nack, '_on_basic_nack'),
(spec.Basic.CancelOk, '_on_basic_cancel_ok'),
)
+exchange_declare_error_testdata = (
+ (
+ 503, "COMMAND_INVALID - "
+ "unknown exchange type 'exchange-type'",
+ InvalidCommand
+ ),
+ (
+ 403, "ACCESS_REFUSED - "
+ "exchange name 'amq.foo' contains reserved prefix 'amq.*'",
+ AccessRefused
+ ),
+ (
+ 406, "PRECONDITION_FAILED - "
+ "inequivalent arg 'type' for exchange 'foo' in vhost '/':"
+ "received 'direct' but current is 'fanout'",
+ PreconditionFailed
+ ),
+)
+
+queue_declare_error_testdata = (
+ (
+ 403, "ACCESS_REFUSED - "
+ "queue name 'amq.foo' contains reserved prefix 'amq.*",
+ AccessRefused
+ ),
+ (
+ 404, "NOT_FOUND - "
+ "no queue 'foo' in vhost '/'",
+ NotFound
+ ),
+ (
+ 405, "RESOURCE_LOCKED - "
+ "cannot obtain exclusive access to locked queue 'foo' in vhost '/'",
+ ResourceLocked
+ ),
+)
+
CLIENT_CAPABILITIES = {
'product': 'py-amqp',
'product_version': '2.3.2',
@@ -64,6 +91,26 @@ SERVER_CAPABILITIES = {
}
+def build_frame_type_1(method, channel=0, args=b'', arg_format=None):
+ if len(args) > 0:
+ args = dumps(arg_format, args)
+ else:
+ args = b''
+ frame = (b''.join([pack('>HH', *method), args]))
+ return 1, channel, frame
+
+
+def build_frame_type_2(body_len, channel, properties):
+ frame = (b''.join(
+ [pack('>HxxQ', spec.Basic.CLASS_ID, body_len), properties])
+ )
+ return 2, channel, frame
+
+
+def build_frame_type_3(channel, body):
+ return 3, channel, body
+
+
class DataComparator(object):
# Comparator used for asserting serialized data. It can be used
# in cases when direct comparision of bytestream cannot be used
@@ -80,18 +127,18 @@ class DataComparator(object):
def handshake(conn, transport_mock):
# Helper function simulating connection handshake with server
transport_mock().read_frame.side_effect = [
- ret_factory(
+ build_frame_type_1(
spec.Connection.Start, channel=0,
args=(
0, 9, SERVER_CAPABILITIES, 'AMQPLAIN PLAIN', 'en_US'
),
arg_format='ooFSS'
),
- ret_factory(
+ build_frame_type_1(
spec.Connection.Tune, channel=0,
args=(2047, 131072, 60), arg_format='BlB'
),
- ret_factory(
+ build_frame_type_1(
spec.Connection.OpenOk, channel=0
)
]
@@ -101,7 +148,7 @@ def handshake(conn, transport_mock):
def create_channel(channel_id, conn, transport_mock):
transport_mock().read_frame.side_effect = [
- ret_factory(
+ build_frame_type_1(
spec.Channel.OpenOk,
channel=channel_id,
args=(1, False),
@@ -178,10 +225,8 @@ class test_connection:
handshake(conn, transport_mock)
frame_writer_mock.reset_mock()
# Inject CloseOk response from broker
- transport_mock().read_frame.return_value = ret_factory(
- spec.Connection.CloseOk,
- args=(1, False),
- arg_format='Lb'
+ transport_mock().read_frame.return_value = build_frame_type_1(
+ spec.Connection.CloseOk
)
t = conn.transport
conn.close()
@@ -201,7 +246,7 @@ class test_connection:
handshake(conn, transport_mock)
frame_writer_mock.reset_mock()
# Inject Close response from broker
- transport_mock().read_frame.return_value = ret_factory(
+ transport_mock().read_frame.return_value = build_frame_type_1(
spec.Connection.Close,
args=(1, False),
arg_format='Lb'
@@ -231,7 +276,7 @@ class test_channel:
with patch.object(conn, 'Transport') as transport_mock:
handshake(conn, transport_mock)
# Inject desired method
- transport_mock().read_frame.return_value = ret_factory(
+ transport_mock().read_frame.return_value = build_frame_type_1(
method, channel=0, args=(1, False), arg_format='Lb'
)
conn.drain_events(0)
@@ -247,18 +292,16 @@ class test_channel:
channel_id = 1
transport_mock().read_frame.side_effect = [
# Inject Open Handshake
- ret_factory(
+ build_frame_type_1(
spec.Channel.OpenOk,
channel=channel_id,
args=(1, False),
arg_format='Lb'
),
# Inject close method
- ret_factory(
+ build_frame_type_1(
spec.Channel.CloseOk,
- channel=channel_id,
- args=(1, False),
- arg_format='Lb'
+ channel=channel_id
)
]
@@ -300,16 +343,14 @@ class test_channel:
# Replies sent by broker
transport_mock().read_frame.side_effect = [
# Inject close methods
- ret_factory(
+ build_frame_type_1(
spec.Channel.Close,
channel=channel_id,
args=(1, False),
arg_format='Lb'
),
- ret_factory(
- spec.Connection.CloseOk,
- args=(1, False),
- arg_format='Lb'
+ build_frame_type_1(
+ spec.Connection.CloseOk
)
]
conn.close()
@@ -325,7 +366,7 @@ class test_channel:
create_channel(1, conn, transport_mock)
# Inject desired method
- transport_mock().read_frame.return_value = ret_factory(
+ transport_mock().read_frame.return_value = build_frame_type_1(
method,
channel=1,
args=(1, False),
@@ -365,7 +406,7 @@ class test_channel:
ch = create_channel(1, conn, transport_mock)
# Inject ConsumeOk response from Broker
- transport_mock().read_frame.return_value = ret_factory(
+ transport_mock().read_frame.return_value = build_frame_type_1(
spec.Basic.ConsumeOk,
channel=1,
args=(consumer_tag,),
@@ -394,7 +435,7 @@ class test_channel:
ch = create_channel(1, conn, transport_mock)
# Inject ConcumeOk response from Broker
- transport_mock().read_frame.return_value = ret_factory(
+ transport_mock().read_frame.return_value = build_frame_type_1(
spec.Basic.ConsumeOk,
channel=1,
args=('my_tag',),
@@ -425,7 +466,7 @@ class test_channel:
with patch.object(conn, 'Transport') as transport_mock:
handshake(conn, transport_mock)
ch = create_channel(1, conn, transport_mock)
- transport_mock().read_frame.return_value = ret_factory(
+ transport_mock().read_frame.return_value = build_frame_type_1(
spec.Queue.DeclareOk,
channel=1,
arg_format='sll',
@@ -451,3 +492,295 @@ class test_channel:
),
None
)
+
+ @pytest.mark.parametrize(
+ "reply_code, reply_text, exception", queue_declare_error_testdata)
+ def test_queue_declare_error(self, reply_code, reply_text, exception):
+ # Test verifying wrong declaring exchange
+ frame_writer_cls_mock = Mock()
+ conn = Connection(frame_writer=frame_writer_cls_mock)
+ with patch.object(conn, 'Transport') as transport_mock:
+ handshake(conn, transport_mock)
+ ch = create_channel(1, conn, transport_mock)
+ transport_mock().read_frame.return_value = build_frame_type_1(
+ spec.Connection.Close,
+ args=(reply_code, reply_text) + spec.Exchange.Declare,
+ arg_format='BsBB'
+ )
+ frame_writer_mock = frame_writer_cls_mock()
+ frame_writer_mock.reset_mock()
+ with pytest.raises(exception) as excinfo:
+ ch.queue_declare('foo')
+ assert excinfo.value.code == reply_code
+ assert excinfo.value.message == reply_text
+ assert excinfo.value.method == 'Exchange.declare'
+ assert excinfo.value.method_name == 'Exchange.declare'
+ assert excinfo.value.method_sig == spec.Exchange.Declare
+ # Client is sending to broker:
+ # 1. Exchange Declare
+ # 2. Connection.CloseOk as reply to received Connecton.Close
+ frame_writer_calls = [
+ call(
+ 1, 1, spec.Queue.Declare,
+ dumps(
+ 'BsbbbbbF',
+ (
+ 0,
+ # queue, passive, durable, exclusive,
+ 'foo', False, False, False,
+ # auto_delete, nowait, arguments
+ True, False, None
+ )
+ ),
+ None
+ ),
+ call(
+ 1, 0, spec.Connection.CloseOk,
+ '',
+ None
+ ),
+ ]
+ frame_writer_mock.assert_has_calls(frame_writer_calls)
+
+ def test_queue_delete(self):
+ # Test verifying deleting queue
+ frame_writer_cls_mock = Mock()
+ conn = Connection(frame_writer=frame_writer_cls_mock)
+ with patch.object(conn, 'Transport') as transport_mock:
+ handshake(conn, transport_mock)
+ ch = create_channel(1, conn, transport_mock)
+ transport_mock().read_frame.return_value = build_frame_type_1(
+ spec.Queue.DeleteOk,
+ channel=1,
+ arg_format='l',
+ args=(5,)
+ )
+ frame_writer_mock = frame_writer_cls_mock()
+ frame_writer_mock.reset_mock()
+ msg_count = ch.queue_delete('foo')
+ assert msg_count == 5
+ frame_writer_mock.assert_called_once_with(
+ 1, 1, spec.Queue.Delete,
+ dumps(
+ 'Bsbbb',
+ # queue, if_unused, if_empty, nowait
+ (0, 'foo', False, False, False)
+ ),
+ None
+ )
+
+ def test_queue_purge(self):
+ # Test verifying purging queue
+ frame_writer_cls_mock = Mock()
+ conn = Connection(frame_writer=frame_writer_cls_mock)
+ with patch.object(conn, 'Transport') as transport_mock:
+ handshake(conn, transport_mock)
+ ch = create_channel(1, conn, transport_mock)
+ transport_mock().read_frame.return_value = build_frame_type_1(
+ spec.Queue.PurgeOk,
+ channel=1,
+ arg_format='l',
+ args=(4,)
+ )
+ frame_writer_mock = frame_writer_cls_mock()
+ frame_writer_mock.reset_mock()
+ msg_count = ch.queue_purge('foo')
+ assert msg_count == 4
+ frame_writer_mock.assert_called_once_with(
+ 1, 1, spec.Queue.Purge,
+ dumps(
+ 'Bsb',
+ # queue, nowait
+ (0, 'foo', False)
+ ),
+ None
+ )
+
+ def test_queue_get(self):
+ # Test verifying getting message from queue
+ frame_writer_cls_mock = Mock()
+ conn = Connection(frame_writer=frame_writer_cls_mock)
+ with patch.object(conn, 'Transport') as transport_mock:
+ handshake(conn, transport_mock)
+ ch = create_channel(1, conn, transport_mock)
+ transport_mock().read_frame.side_effect = [
+ build_frame_type_1(
+ spec.Basic.GetOk,
+ channel=1,
+ arg_format='Lbssl',
+ args=(
+ # delivery_tag, redelivered, exchange_name
+ 1, False, 'foo_exchange',
+ # routing_key, message_count
+ 'routing_key', 1
+ )
+ ),
+ build_frame_type_2(
+ channel=1,
+ body_len=12,
+ properties=b'0\x00\x00\x00\x00\x00\x01'
+ ),
+ build_frame_type_3(
+ channel=1,
+ body=b'Hello World!'
+ )
+ ]
+ frame_writer_mock = frame_writer_cls_mock()
+ frame_writer_mock.reset_mock()
+ msg = ch.basic_get('foo')
+ assert msg.body_size == 12
+ assert msg.body == b'Hello World!'
+ assert msg.frame_method == spec.Basic.GetOk
+ assert msg.delivery_tag == 1
+ assert msg.ready is True
+ assert msg.delivery_info == {
+ 'delivery_tag': 1, 'redelivered': False,
+ 'exchange': 'foo_exchange',
+ 'routing_key': 'routing_key', 'message_count': 1
+ }
+ assert msg.properties == {
+ 'application_headers': {}, 'delivery_mode': 1
+ }
+ frame_writer_mock.assert_called_once_with(
+ 1, 1, spec.Basic.Get,
+ dumps(
+ 'Bsb',
+ # queue, nowait
+ (0, 'foo', False)
+ ),
+ None
+ )
+
+ def test_queue_get_empty(self):
+ # Test verifying getting message from empty queue
+ frame_writer_cls_mock = Mock()
+ conn = Connection(frame_writer=frame_writer_cls_mock)
+ with patch.object(conn, 'Transport') as transport_mock:
+ handshake(conn, transport_mock)
+ ch = create_channel(1, conn, transport_mock)
+ transport_mock().read_frame.return_value = build_frame_type_1(
+ spec.Basic.GetEmpty,
+ channel=1,
+ arg_format='s',
+ args=('s')
+ )
+ frame_writer_mock = frame_writer_cls_mock()
+ frame_writer_mock.reset_mock()
+ ret = ch.basic_get('foo')
+ assert ret is None
+ frame_writer_mock.assert_called_once_with(
+ 1, 1, spec.Basic.Get,
+ dumps(
+ 'Bsb',
+ # queue, nowait
+ (0, 'foo', False)
+ ),
+ None
+ )
+
+ def test_exchange_declare(self):
+ # Test verifying declaring exchange
+ frame_writer_cls_mock = Mock()
+ conn = Connection(frame_writer=frame_writer_cls_mock)
+ with patch.object(conn, 'Transport') as transport_mock:
+ handshake(conn, transport_mock)
+ ch = create_channel(1, conn, transport_mock)
+ transport_mock().read_frame.return_value = build_frame_type_1(
+ spec.Exchange.DeclareOk,
+ channel=1
+ )
+ frame_writer_mock = frame_writer_cls_mock()
+ frame_writer_mock.reset_mock()
+ ret = ch.exchange_declare('foo', 'fanout')
+ assert ret is None
+ frame_writer_mock.assert_called_once_with(
+ 1, 1, spec.Exchange.Declare,
+ dumps(
+ 'BssbbbbbF',
+ (
+ 0,
+ # exchange, type, passive, durable,
+ 'foo', 'fanout', False, False,
+ # auto_delete, internal, nowait, arguments
+ True, False, False, None
+ )
+ ),
+ None
+ )
+
+ @pytest.mark.parametrize(
+ "reply_code, reply_text, exception", exchange_declare_error_testdata)
+ def test_exchange_declare_error(self, reply_code, reply_text, exception):
+ # Test verifying wrong declaring exchange
+ frame_writer_cls_mock = Mock()
+ conn = Connection(frame_writer=frame_writer_cls_mock)
+ with patch.object(conn, 'Transport') as transport_mock:
+ handshake(conn, transport_mock)
+ ch = create_channel(1, conn, transport_mock)
+ transport_mock().read_frame.return_value = build_frame_type_1(
+ spec.Connection.Close,
+ args=(reply_code, reply_text) + spec.Exchange.Declare,
+ arg_format='BsBB'
+ )
+ frame_writer_mock = frame_writer_cls_mock()
+ frame_writer_mock.reset_mock()
+ with pytest.raises(exception) as excinfo:
+ ch.exchange_declare('exchange', 'exchange-type')
+ assert excinfo.value.code == reply_code
+ assert excinfo.value.message == reply_text
+ assert excinfo.value.method == 'Exchange.declare'
+ assert excinfo.value.method_name == 'Exchange.declare'
+ assert excinfo.value.method_sig == spec.Exchange.Declare
+ # Client is sending to broker:
+ # 1. Exchange Declare
+ # 2. Connection.CloseOk as reply to received Connecton.Close
+ frame_writer_calls = [
+ call(
+ 1, 1, spec.Exchange.Declare,
+ dumps(
+ 'BssbbbbbF',
+ (
+ 0,
+ # exchange, type, passive, durable,
+ 'exchange', 'exchange-type', False, False,
+ # auto_delete, internal, nowait, arguments
+ True, False, False, None
+ )
+ ),
+ None
+ ),
+ call(
+ 1, 0, spec.Connection.CloseOk,
+ '',
+ None
+ ),
+ ]
+ frame_writer_mock.assert_has_calls(frame_writer_calls)
+
+ def test_exchange_delete(self):
+ # Test verifying declaring exchange
+ frame_writer_cls_mock = Mock()
+ conn = Connection(frame_writer=frame_writer_cls_mock)
+ with patch.object(conn, 'Transport') as transport_mock:
+ handshake(conn, transport_mock)
+ ch = create_channel(1, conn, transport_mock)
+ transport_mock().read_frame.return_value = build_frame_type_1(
+ spec.Exchange.DeleteOk,
+ channel=1
+ )
+ frame_writer_mock = frame_writer_cls_mock()
+ frame_writer_mock.reset_mock()
+ ret = ch.exchange_delete('foo')
+ assert ret == ()
+ frame_writer_mock.assert_called_once_with(
+ 1, 1, spec.Exchange.Delete,
+ dumps(
+ 'Bsbb',
+ (
+ 0,
+ # exchange, if-unused, no-wait
+ 'foo', False, False
+ )
+ ),
+ None
+ )