diff options
author | Matus Valo <matusvalo@gmail.com> | 2020-05-08 18:21:10 +0200 |
---|---|---|
committer | Matus Valo <matusvalo@gmail.com> | 2020-05-12 09:56:04 +0200 |
commit | e0c27f3cfee2e4bdc4251369a96acd1b715c3b13 (patch) | |
tree | e2358322f555574087488e2068e4e6327c8c2073 | |
parent | 0d3b1e254f9178828f62b7b84f0307882e28e2a0 (diff) | |
download | kombu-e0c27f3cfee2e4bdc4251369a96acd1b715c3b13.tar.gz |
Added priority integration tests for Redis
-rw-r--r-- | t/integration/common.py | 45 | ||||
-rw-r--r-- | t/integration/test_redis.py | 58 |
2 files changed, 96 insertions, 7 deletions
diff --git a/t/integration/common.py b/t/integration/common.py index 4d506ad0..6e6fd017 100644 --- a/t/integration/common.py +++ b/t/integration/common.py @@ -204,7 +204,20 @@ class BaseTimeToLive(object): class BasePriority(object): + + PRIORITY_ORDER = 'asc' + def test_publish_consume(self, connection): + + # py-amqp transport has higher numbers higher priority + # redis transport has lower numbers higher priority + if self.PRIORITY_ORDER == 'asc': + prio_high = 6 + prio_low = 3 + else: + prio_high = 3 + prio_low = 6 + test_queue = kombu.Queue( 'priority_test', routing_key='priority_test', max_priority=10 ) @@ -219,8 +232,8 @@ class BasePriority(object): with conn.channel() as channel: producer = kombu.Producer(channel) for msg, prio in [ - [{'msg': 'first'}, 3], - [{'msg': 'second'}, 6] + [{'msg': 'first'}, prio_low], + [{'msg': 'second'}, prio_high] ]: producer.publish( msg, @@ -244,6 +257,12 @@ class BasePriority(object): assert received_messages[1] == {'msg': 'first'} def test_simple_queue_publish_consume(self, connection): + if self.PRIORITY_ORDER == 'asc': + prio_high = 7 + prio_low = 1 + else: + prio_high = 1 + prio_low = 7 with connection as conn: with closing( conn.SimpleQueue( @@ -251,8 +270,12 @@ class BasePriority(object): queue_opts={'max_priority': 10} ) ) as queue: - queue.put({'msg': 'first'}, headers={'k1': 'v1'}, priority=3) - queue.put({'msg': 'second'}, headers={'k1': 'v1'}, priority=6) + queue.put( + {'msg': 'first'}, headers={'k1': 'v1'}, priority=prio_low + ) + queue.put( + {'msg': 'second'}, headers={'k1': 'v1'}, priority=prio_high + ) # Sleep to make sure that queue sorted based on priority sleep(0.5) # Second message must be received first @@ -264,6 +287,12 @@ class BasePriority(object): assert msg.payload == {'msg': 'first'} def test_simple_buffer_publish_consume(self, connection): + if self.PRIORITY_ORDER == 'asc': + prio_high = 6 + prio_low = 2 + else: + prio_high = 2 + prio_low = 6 with connection as conn: with closing( conn.SimpleBuffer( @@ -271,8 +300,12 @@ class BasePriority(object): queue_opts={'max_priority': 10} ) ) as buf: - buf.put({'msg': 'first'}, headers={'k1': 'v1'}, priority=2) - buf.put({'msg': 'second'}, headers={'k1': 'v1'}, priority=6) + buf.put( + {'msg': 'first'}, headers={'k1': 'v1'}, priority=prio_low + ) + buf.put( + {'msg': 'second'}, headers={'k1': 'v1'}, priority=prio_high + ) # Sleep to make sure that queue sorted based on priority sleep(0.5) # Second message must be received first diff --git a/t/integration/test_redis.py b/t/integration/test_redis.py index 4c9167bf..c5ac0019 100644 --- a/t/integration/test_redis.py +++ b/t/integration/test_redis.py @@ -4,8 +4,9 @@ import os import pytest import kombu +from time import sleep -from .common import BasicFunctionality, BaseExchangeTypes +from .common import BasicFunctionality, BaseExchangeTypes, BasePriority def get_connection( @@ -35,3 +36,58 @@ class test_RedisBasicFunctionality(BasicFunctionality): @pytest.mark.flaky(reruns=5, reruns_delay=2) class test_RedisBaseExchangeTypes(BaseExchangeTypes): pass + + +@pytest.mark.env('redis') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_RedisPriority(BasePriority): + + # Comparing to py-amqp transport has Redis transport several + # differences: + # 1. Order of priorities is reversed + # 2. drain_events() consumes only single value + + # redis transport has lower numbers higher priority + PRIORITY_ORDER = 'desc' + + def test_publish_consume(self, connection): + test_queue = kombu.Queue( + 'priority_test', routing_key='priority_test', max_priority=10 + ) + + received_messages = [] + + def callback(body, message): + received_messages.append(body) + message.ack() + + with connection as conn: + with conn.channel() as channel: + producer = kombu.Producer(channel) + for msg, prio in [ + [{'msg': 'first'}, 6], + [{'msg': 'second'}, 3] + ]: + producer.publish( + msg, + retry=True, + exchange=test_queue.exchange, + routing_key=test_queue.routing_key, + declare=[test_queue], + serializer='pickle', + priority=prio + ) + # Sleep to make sure that queue sorted based on priority + sleep(0.5) + consumer = kombu.Consumer( + conn, [test_queue], accept=['pickle'] + ) + consumer.register_callback(callback) + with consumer: + # drain_events() returns just on number in + # Virtual transports + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + # Second message must be received first + assert received_messages[0] == {'msg': 'second'} + assert received_messages[1] == {'msg': 'first'} |