diff options
-rw-r--r-- | t/integration/common.py | 90 | ||||
-rw-r--r-- | t/integration/test_py_amqp.py | 8 | ||||
-rw-r--r-- | t/integration/test_redis.py | 8 |
3 files changed, 104 insertions, 2 deletions
diff --git a/t/integration/common.py b/t/integration/common.py index 75e75202..acffb7fc 100644 --- a/t/integration/common.py +++ b/t/integration/common.py @@ -1,7 +1,9 @@ from __future__ import absolute_import, unicode_literals +import socket from contextlib import closing +import pytest import kombu @@ -62,3 +64,91 @@ class BasicFunctionality(object): assert message.content_encoding == 'utf-8' assert message.headers == {'k1': 'v1'} message.ack() + + +class BaseExchangeTypes(object): + + def _callback(self, body, message): + message.ack() + assert body == {'hello': 'world'} + assert message.content_type == 'application/x-python-serialize' + message.delivery_info['routing_key'] == 'test' + message.delivery_info['exchange'] == '' + assert message.payload == body + + def _consume(self, connection, queue): + consumer = kombu.Consumer( + connection, [queue], accept=['pickle'] + ) + consumer.register_callback(self._callback) + with consumer: + connection.drain_events(timeout=1) + + def _publish(self, channel, exchange, queues, routing_key=None): + producer = kombu.Producer(channel, exchange=exchange) + if routing_key: + producer.publish( + {'hello': 'world'}, + declare=list(queues), + serializer='pickle', + routing_key=routing_key + ) + else: + producer.publish( + {'hello': 'world'}, + declare=list(queues), + serializer='pickle' + ) + + def test_direct(self, connection): + ex = kombu.Exchange('test_direct', type='direct') + test_queue = kombu.Queue('direct1', exchange=ex) + + with connection as conn: + with conn.channel() as channel: + self._publish(channel, ex, [test_queue]) + self._consume(conn, test_queue) + + def test_direct_routing_keys(self, connection): + ex = kombu.Exchange('test_rk_direct', type='direct') + test_queue1 = kombu.Queue('rk_direct1', exchange=ex, routing_key='d1') + test_queue2 = kombu.Queue('rk_direct2', exchange=ex, routing_key='d2') + + with connection as conn: + with conn.channel() as channel: + self._publish(channel, ex, [test_queue1, test_queue2], 'd1') + self._consume(conn, test_queue1) + # direct2 queue should not have data + with pytest.raises(socket.timeout): + self._consume(conn, test_queue2) + + def test_fanout(self, connection): + ex = kombu.Exchange('test_fanout', type='fanout') + test_queue1 = kombu.Queue('fanout1', exchange=ex) + test_queue2 = kombu.Queue('fanout2', exchange=ex) + + with connection as conn: + with conn.channel() as channel: + self._publish(channel, ex, [test_queue1, test_queue2]) + + self._consume(conn, test_queue1) + self._consume(conn, test_queue2) + + def test_topic(self, connection): + ex = kombu.Exchange('test_topic', type='topic') + test_queue1 = kombu.Queue('topic1', exchange=ex, routing_key='t.*') + test_queue2 = kombu.Queue('topic2', exchange=ex, routing_key='t.*') + test_queue3 = kombu.Queue('topic3', exchange=ex, routing_key='t') + + with connection as conn: + with conn.channel() as channel: + self._publish( + channel, ex, [test_queue1, test_queue2, test_queue3], + routing_key='t.1' + ) + + self._consume(conn, test_queue1) + self._consume(conn, test_queue2) + with pytest.raises(socket.timeout): + # topic3 queue should not have data + self._consume(conn, test_queue3) diff --git a/t/integration/test_py_amqp.py b/t/integration/test_py_amqp.py index 6e6585ee..98f6836d 100644 --- a/t/integration/test_py_amqp.py +++ b/t/integration/test_py_amqp.py @@ -5,7 +5,7 @@ import os import pytest import kombu -from .common import BasicFunctionality +from .common import BasicFunctionality, BaseExchangeTypes def get_connection( @@ -29,3 +29,9 @@ def connection(request): @pytest.mark.flaky(reruns=5, reruns_delay=2) class test_PyAMQPBasicFunctionality(BasicFunctionality): pass + + +@pytest.mark.env('py-amqp') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_PyAMQPBaseExchangeTypes(BaseExchangeTypes): + pass diff --git a/t/integration/test_redis.py b/t/integration/test_redis.py index d42ba9a6..4c9167bf 100644 --- a/t/integration/test_redis.py +++ b/t/integration/test_redis.py @@ -5,7 +5,7 @@ import os import pytest import kombu -from .common import BasicFunctionality +from .common import BasicFunctionality, BaseExchangeTypes def get_connection( @@ -29,3 +29,9 @@ def connection(request): @pytest.mark.flaky(reruns=5, reruns_delay=2) class test_RedisBasicFunctionality(BasicFunctionality): pass + + +@pytest.mark.env('redis') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_RedisBaseExchangeTypes(BaseExchangeTypes): + pass |