summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatus Valo <matusvalo@gmail.com>2020-05-07 00:59:25 +0200
committerMatus Valo <matusvalo@gmail.com>2020-05-08 11:32:46 +0200
commit0d3b1e254f9178828f62b7b84f0307882e28e2a0 (patch)
tree3eee0264b64809437cdc51901c4ec511ed67df3a
parent5113df342eb4eb7a2033d56c2dc958874ab24059 (diff)
downloadkombu-0d3b1e254f9178828f62b7b84f0307882e28e2a0.tar.gz
Added integration tests for priority queues
-rw-r--r--t/integration/common.py81
-rw-r--r--t/integration/test_py_amqp.py11
2 files changed, 91 insertions, 1 deletions
diff --git a/t/integration/common.py b/t/integration/common.py
index a1f96e63..4d506ad0 100644
--- a/t/integration/common.py
+++ b/t/integration/common.py
@@ -201,3 +201,84 @@ class BaseTimeToLive(object):
sleep(3)
with pytest.raises(buf.Empty):
buf.get(timeout=1)
+
+
+class BasePriority(object):
+ def test_publish_consume(self, connection):
+ test_queue = kombu.Queue(
+ 'priority_test', routing_key='priority_test', max_priority=10
+ )
+
+ received_messages = []
+
+ def callback(body, message):
+ received_messages.append(body)
+ message.ack()
+
+ with connection as conn:
+ with conn.channel() as channel:
+ producer = kombu.Producer(channel)
+ for msg, prio in [
+ [{'msg': 'first'}, 3],
+ [{'msg': 'second'}, 6]
+ ]:
+ producer.publish(
+ msg,
+ retry=True,
+ exchange=test_queue.exchange,
+ routing_key=test_queue.routing_key,
+ declare=[test_queue],
+ serializer='pickle',
+ priority=prio
+ )
+ # Sleep to make sure that queue sorted based on priority
+ sleep(0.5)
+ consumer = kombu.Consumer(
+ conn, [test_queue], accept=['pickle']
+ )
+ consumer.register_callback(callback)
+ with consumer:
+ conn.drain_events(timeout=1)
+ # Second message must be received first
+ assert received_messages[0] == {'msg': 'second'}
+ assert received_messages[1] == {'msg': 'first'}
+
+ def test_simple_queue_publish_consume(self, connection):
+ with connection as conn:
+ with closing(
+ conn.SimpleQueue(
+ 'priority_simple_queue_test',
+ queue_opts={'max_priority': 10}
+ )
+ ) as queue:
+ queue.put({'msg': 'first'}, headers={'k1': 'v1'}, priority=3)
+ queue.put({'msg': 'second'}, headers={'k1': 'v1'}, priority=6)
+ # Sleep to make sure that queue sorted based on priority
+ sleep(0.5)
+ # Second message must be received first
+ msg = queue.get(timeout=1)
+ msg.ack()
+ assert msg.payload == {'msg': 'second'}
+ msg = queue.get(timeout=1)
+ msg.ack()
+ assert msg.payload == {'msg': 'first'}
+
+ def test_simple_buffer_publish_consume(self, connection):
+ with connection as conn:
+ with closing(
+ conn.SimpleBuffer(
+ 'priority_simple_buffer_test',
+ queue_opts={'max_priority': 10}
+ )
+ ) as buf:
+ buf.put({'msg': 'first'}, headers={'k1': 'v1'}, priority=2)
+ buf.put({'msg': 'second'}, headers={'k1': 'v1'}, priority=6)
+ # Sleep to make sure that queue sorted based on priority
+ sleep(0.5)
+ # Second message must be received first
+ msg = buf.get(timeout=1)
+ msg.ack()
+ assert msg.payload == {'msg': 'second'}
+ msg = buf.get(timeout=1)
+ msg.ack()
+ assert msg.payload == {'msg': 'first'}
diff --git a/t/integration/test_py_amqp.py b/t/integration/test_py_amqp.py
index dfc17ad0..670e7009 100644
--- a/t/integration/test_py_amqp.py
+++ b/t/integration/test_py_amqp.py
@@ -5,7 +5,10 @@ import os
import pytest
import kombu
-from .common import BasicFunctionality, BaseExchangeTypes, BaseTimeToLive
+from .common import (
+ BasicFunctionality, BaseExchangeTypes,
+ BaseTimeToLive, BasePriority
+)
def get_connection(
@@ -41,3 +44,9 @@ class test_PyAMQPBaseExchangeTypes(BaseExchangeTypes):
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_PyAMQPTimeToLive(BaseTimeToLive):
pass
+
+
+@pytest.mark.env('py-amqp')
+@pytest.mark.flaky(reruns=5, reruns_delay=2)
+class test_PyAMQPPriority(BasePriority):
+ pass