summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatus Valo <matusvalo@gmail.com>2020-05-04 14:59:56 +0200
committerMatus Valo <matusvalo@gmail.com>2020-05-06 08:42:41 +0200
commit5d55eba3ecfc4eb673c5273d8f33017164bb7801 (patch)
tree14148caf4a51b6132191d4608d83fefb38fc37f5
parent90f2afb631adde4fc52827ab0486dfbe6d0277a2 (diff)
downloadkombu-5d55eba3ecfc4eb673c5273d8f33017164bb7801.tar.gz
Added Integration tests for direct, topic and fanout exchange types
-rw-r--r--t/integration/common.py90
-rw-r--r--t/integration/test_py_amqp.py8
-rw-r--r--t/integration/test_redis.py8
3 files changed, 104 insertions, 2 deletions
diff --git a/t/integration/common.py b/t/integration/common.py
index 75e75202..acffb7fc 100644
--- a/t/integration/common.py
+++ b/t/integration/common.py
@@ -1,7 +1,9 @@
from __future__ import absolute_import, unicode_literals
+import socket
from contextlib import closing
+import pytest
import kombu
@@ -62,3 +64,91 @@ class BasicFunctionality(object):
assert message.content_encoding == 'utf-8'
assert message.headers == {'k1': 'v1'}
message.ack()
+
+
+class BaseExchangeTypes(object):
+
+ def _callback(self, body, message):
+ message.ack()
+ assert body == {'hello': 'world'}
+ assert message.content_type == 'application/x-python-serialize'
+ message.delivery_info['routing_key'] == 'test'
+ message.delivery_info['exchange'] == ''
+ assert message.payload == body
+
+ def _consume(self, connection, queue):
+ consumer = kombu.Consumer(
+ connection, [queue], accept=['pickle']
+ )
+ consumer.register_callback(self._callback)
+ with consumer:
+ connection.drain_events(timeout=1)
+
+ def _publish(self, channel, exchange, queues, routing_key=None):
+ producer = kombu.Producer(channel, exchange=exchange)
+ if routing_key:
+ producer.publish(
+ {'hello': 'world'},
+ declare=list(queues),
+ serializer='pickle',
+ routing_key=routing_key
+ )
+ else:
+ producer.publish(
+ {'hello': 'world'},
+ declare=list(queues),
+ serializer='pickle'
+ )
+
+ def test_direct(self, connection):
+ ex = kombu.Exchange('test_direct', type='direct')
+ test_queue = kombu.Queue('direct1', exchange=ex)
+
+ with connection as conn:
+ with conn.channel() as channel:
+ self._publish(channel, ex, [test_queue])
+ self._consume(conn, test_queue)
+
+ def test_direct_routing_keys(self, connection):
+ ex = kombu.Exchange('test_rk_direct', type='direct')
+ test_queue1 = kombu.Queue('rk_direct1', exchange=ex, routing_key='d1')
+ test_queue2 = kombu.Queue('rk_direct2', exchange=ex, routing_key='d2')
+
+ with connection as conn:
+ with conn.channel() as channel:
+ self._publish(channel, ex, [test_queue1, test_queue2], 'd1')
+ self._consume(conn, test_queue1)
+ # direct2 queue should not have data
+ with pytest.raises(socket.timeout):
+ self._consume(conn, test_queue2)
+
+ def test_fanout(self, connection):
+ ex = kombu.Exchange('test_fanout', type='fanout')
+ test_queue1 = kombu.Queue('fanout1', exchange=ex)
+ test_queue2 = kombu.Queue('fanout2', exchange=ex)
+
+ with connection as conn:
+ with conn.channel() as channel:
+ self._publish(channel, ex, [test_queue1, test_queue2])
+
+ self._consume(conn, test_queue1)
+ self._consume(conn, test_queue2)
+
+ def test_topic(self, connection):
+ ex = kombu.Exchange('test_topic', type='topic')
+ test_queue1 = kombu.Queue('topic1', exchange=ex, routing_key='t.*')
+ test_queue2 = kombu.Queue('topic2', exchange=ex, routing_key='t.*')
+ test_queue3 = kombu.Queue('topic3', exchange=ex, routing_key='t')
+
+ with connection as conn:
+ with conn.channel() as channel:
+ self._publish(
+ channel, ex, [test_queue1, test_queue2, test_queue3],
+ routing_key='t.1'
+ )
+
+ self._consume(conn, test_queue1)
+ self._consume(conn, test_queue2)
+ with pytest.raises(socket.timeout):
+ # topic3 queue should not have data
+ self._consume(conn, test_queue3)
diff --git a/t/integration/test_py_amqp.py b/t/integration/test_py_amqp.py
index 6e6585ee..98f6836d 100644
--- a/t/integration/test_py_amqp.py
+++ b/t/integration/test_py_amqp.py
@@ -5,7 +5,7 @@ import os
import pytest
import kombu
-from .common import BasicFunctionality
+from .common import BasicFunctionality, BaseExchangeTypes
def get_connection(
@@ -29,3 +29,9 @@ def connection(request):
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_PyAMQPBasicFunctionality(BasicFunctionality):
pass
+
+
+@pytest.mark.env('py-amqp')
+@pytest.mark.flaky(reruns=5, reruns_delay=2)
+class test_PyAMQPBaseExchangeTypes(BaseExchangeTypes):
+ pass
diff --git a/t/integration/test_redis.py b/t/integration/test_redis.py
index d42ba9a6..4c9167bf 100644
--- a/t/integration/test_redis.py
+++ b/t/integration/test_redis.py
@@ -5,7 +5,7 @@ import os
import pytest
import kombu
-from .common import BasicFunctionality
+from .common import BasicFunctionality, BaseExchangeTypes
def get_connection(
@@ -29,3 +29,9 @@ def connection(request):
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_RedisBasicFunctionality(BasicFunctionality):
pass
+
+
+@pytest.mark.env('redis')
+@pytest.mark.flaky(reruns=5, reruns_delay=2)
+class test_RedisBaseExchangeTypes(BaseExchangeTypes):
+ pass