diff options
author | Matus Valo <matusvalo@gmail.com> | 2020-05-07 00:59:25 +0200 |
---|---|---|
committer | Matus Valo <matusvalo@gmail.com> | 2020-05-08 11:32:46 +0200 |
commit | 0d3b1e254f9178828f62b7b84f0307882e28e2a0 (patch) | |
tree | 3eee0264b64809437cdc51901c4ec511ed67df3a /t | |
parent | 5113df342eb4eb7a2033d56c2dc958874ab24059 (diff) | |
download | kombu-0d3b1e254f9178828f62b7b84f0307882e28e2a0.tar.gz |
Added integration tests for priority queues
Diffstat (limited to 't')
-rw-r--r-- | t/integration/common.py | 81 | ||||
-rw-r--r-- | t/integration/test_py_amqp.py | 11 |
2 files changed, 91 insertions, 1 deletions
diff --git a/t/integration/common.py b/t/integration/common.py index a1f96e63..4d506ad0 100644 --- a/t/integration/common.py +++ b/t/integration/common.py @@ -201,3 +201,84 @@ class BaseTimeToLive(object): sleep(3) with pytest.raises(buf.Empty): buf.get(timeout=1) + + +class BasePriority(object): + def test_publish_consume(self, connection): + test_queue = kombu.Queue( + 'priority_test', routing_key='priority_test', max_priority=10 + ) + + received_messages = [] + + def callback(body, message): + received_messages.append(body) + message.ack() + + with connection as conn: + with conn.channel() as channel: + producer = kombu.Producer(channel) + for msg, prio in [ + [{'msg': 'first'}, 3], + [{'msg': 'second'}, 6] + ]: + producer.publish( + msg, + retry=True, + exchange=test_queue.exchange, + routing_key=test_queue.routing_key, + declare=[test_queue], + serializer='pickle', + priority=prio + ) + # Sleep to make sure that queue sorted based on priority + sleep(0.5) + consumer = kombu.Consumer( + conn, [test_queue], accept=['pickle'] + ) + consumer.register_callback(callback) + with consumer: + conn.drain_events(timeout=1) + # Second message must be received first + assert received_messages[0] == {'msg': 'second'} + assert received_messages[1] == {'msg': 'first'} + + def test_simple_queue_publish_consume(self, connection): + with connection as conn: + with closing( + conn.SimpleQueue( + 'priority_simple_queue_test', + queue_opts={'max_priority': 10} + ) + ) as queue: + queue.put({'msg': 'first'}, headers={'k1': 'v1'}, priority=3) + queue.put({'msg': 'second'}, headers={'k1': 'v1'}, priority=6) + # Sleep to make sure that queue sorted based on priority + sleep(0.5) + # Second message must be received first + msg = queue.get(timeout=1) + msg.ack() + assert msg.payload == {'msg': 'second'} + msg = queue.get(timeout=1) + msg.ack() + assert msg.payload == {'msg': 'first'} + + def test_simple_buffer_publish_consume(self, connection): + with connection as conn: + with closing( + conn.SimpleBuffer( + 'priority_simple_buffer_test', + queue_opts={'max_priority': 10} + ) + ) as buf: + buf.put({'msg': 'first'}, headers={'k1': 'v1'}, priority=2) + buf.put({'msg': 'second'}, headers={'k1': 'v1'}, priority=6) + # Sleep to make sure that queue sorted based on priority + sleep(0.5) + # Second message must be received first + msg = buf.get(timeout=1) + msg.ack() + assert msg.payload == {'msg': 'second'} + msg = buf.get(timeout=1) + msg.ack() + assert msg.payload == {'msg': 'first'} diff --git a/t/integration/test_py_amqp.py b/t/integration/test_py_amqp.py index dfc17ad0..670e7009 100644 --- a/t/integration/test_py_amqp.py +++ b/t/integration/test_py_amqp.py @@ -5,7 +5,10 @@ import os import pytest import kombu -from .common import BasicFunctionality, BaseExchangeTypes, BaseTimeToLive +from .common import ( + BasicFunctionality, BaseExchangeTypes, + BaseTimeToLive, BasePriority +) def get_connection( @@ -41,3 +44,9 @@ class test_PyAMQPBaseExchangeTypes(BaseExchangeTypes): @pytest.mark.flaky(reruns=5, reruns_delay=2) class test_PyAMQPTimeToLive(BaseTimeToLive): pass + + +@pytest.mark.env('py-amqp') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_PyAMQPPriority(BasePriority): + pass |