summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatus Valo <matusvalo@gmail.com>2020-05-08 18:21:10 +0200
committerMatus Valo <matusvalo@gmail.com>2020-05-12 09:56:04 +0200
commite0c27f3cfee2e4bdc4251369a96acd1b715c3b13 (patch)
treee2358322f555574087488e2068e4e6327c8c2073
parent0d3b1e254f9178828f62b7b84f0307882e28e2a0 (diff)
downloadkombu-e0c27f3cfee2e4bdc4251369a96acd1b715c3b13.tar.gz
Added priority integration tests for Redis
-rw-r--r--t/integration/common.py45
-rw-r--r--t/integration/test_redis.py58
2 files changed, 96 insertions, 7 deletions
diff --git a/t/integration/common.py b/t/integration/common.py
index 4d506ad0..6e6fd017 100644
--- a/t/integration/common.py
+++ b/t/integration/common.py
@@ -204,7 +204,20 @@ class BaseTimeToLive(object):
class BasePriority(object):
+
+ PRIORITY_ORDER = 'asc'
+
def test_publish_consume(self, connection):
+
+ # py-amqp transport has higher numbers higher priority
+ # redis transport has lower numbers higher priority
+ if self.PRIORITY_ORDER == 'asc':
+ prio_high = 6
+ prio_low = 3
+ else:
+ prio_high = 3
+ prio_low = 6
+
test_queue = kombu.Queue(
'priority_test', routing_key='priority_test', max_priority=10
)
@@ -219,8 +232,8 @@ class BasePriority(object):
with conn.channel() as channel:
producer = kombu.Producer(channel)
for msg, prio in [
- [{'msg': 'first'}, 3],
- [{'msg': 'second'}, 6]
+ [{'msg': 'first'}, prio_low],
+ [{'msg': 'second'}, prio_high]
]:
producer.publish(
msg,
@@ -244,6 +257,12 @@ class BasePriority(object):
assert received_messages[1] == {'msg': 'first'}
def test_simple_queue_publish_consume(self, connection):
+ if self.PRIORITY_ORDER == 'asc':
+ prio_high = 7
+ prio_low = 1
+ else:
+ prio_high = 1
+ prio_low = 7
with connection as conn:
with closing(
conn.SimpleQueue(
@@ -251,8 +270,12 @@ class BasePriority(object):
queue_opts={'max_priority': 10}
)
) as queue:
- queue.put({'msg': 'first'}, headers={'k1': 'v1'}, priority=3)
- queue.put({'msg': 'second'}, headers={'k1': 'v1'}, priority=6)
+ queue.put(
+ {'msg': 'first'}, headers={'k1': 'v1'}, priority=prio_low
+ )
+ queue.put(
+ {'msg': 'second'}, headers={'k1': 'v1'}, priority=prio_high
+ )
# Sleep to make sure that queue sorted based on priority
sleep(0.5)
# Second message must be received first
@@ -264,6 +287,12 @@ class BasePriority(object):
assert msg.payload == {'msg': 'first'}
def test_simple_buffer_publish_consume(self, connection):
+ if self.PRIORITY_ORDER == 'asc':
+ prio_high = 6
+ prio_low = 2
+ else:
+ prio_high = 2
+ prio_low = 6
with connection as conn:
with closing(
conn.SimpleBuffer(
@@ -271,8 +300,12 @@ class BasePriority(object):
queue_opts={'max_priority': 10}
)
) as buf:
- buf.put({'msg': 'first'}, headers={'k1': 'v1'}, priority=2)
- buf.put({'msg': 'second'}, headers={'k1': 'v1'}, priority=6)
+ buf.put(
+ {'msg': 'first'}, headers={'k1': 'v1'}, priority=prio_low
+ )
+ buf.put(
+ {'msg': 'second'}, headers={'k1': 'v1'}, priority=prio_high
+ )
# Sleep to make sure that queue sorted based on priority
sleep(0.5)
# Second message must be received first
diff --git a/t/integration/test_redis.py b/t/integration/test_redis.py
index 4c9167bf..c5ac0019 100644
--- a/t/integration/test_redis.py
+++ b/t/integration/test_redis.py
@@ -4,8 +4,9 @@ import os
import pytest
import kombu
+from time import sleep
-from .common import BasicFunctionality, BaseExchangeTypes
+from .common import BasicFunctionality, BaseExchangeTypes, BasePriority
def get_connection(
@@ -35,3 +36,58 @@ class test_RedisBasicFunctionality(BasicFunctionality):
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_RedisBaseExchangeTypes(BaseExchangeTypes):
pass
+
+
+@pytest.mark.env('redis')
+@pytest.mark.flaky(reruns=5, reruns_delay=2)
+class test_RedisPriority(BasePriority):
+
+ # Comparing to py-amqp transport has Redis transport several
+ # differences:
+ # 1. Order of priorities is reversed
+ # 2. drain_events() consumes only single value
+
+ # redis transport has lower numbers higher priority
+ PRIORITY_ORDER = 'desc'
+
+ def test_publish_consume(self, connection):
+ test_queue = kombu.Queue(
+ 'priority_test', routing_key='priority_test', max_priority=10
+ )
+
+ received_messages = []
+
+ def callback(body, message):
+ received_messages.append(body)
+ message.ack()
+
+ with connection as conn:
+ with conn.channel() as channel:
+ producer = kombu.Producer(channel)
+ for msg, prio in [
+ [{'msg': 'first'}, 6],
+ [{'msg': 'second'}, 3]
+ ]:
+ producer.publish(
+ msg,
+ retry=True,
+ exchange=test_queue.exchange,
+ routing_key=test_queue.routing_key,
+ declare=[test_queue],
+ serializer='pickle',
+ priority=prio
+ )
+ # Sleep to make sure that queue sorted based on priority
+ sleep(0.5)
+ consumer = kombu.Consumer(
+ conn, [test_queue], accept=['pickle']
+ )
+ consumer.register_callback(callback)
+ with consumer:
+ # drain_events() returns just on number in
+ # Virtual transports
+ conn.drain_events(timeout=1)
+ conn.drain_events(timeout=1)
+ # Second message must be received first
+ assert received_messages[0] == {'msg': 'second'}
+ assert received_messages[1] == {'msg': 'first'}