diff options
author | Matus Valo <matusvalo@gmail.com> | 2020-04-29 23:11:54 +0200 |
---|---|---|
committer | Asif Saif Uddin <auvipy@gmail.com> | 2020-04-30 10:43:50 +0600 |
commit | 02bfccf9f795bb153b62e0fb2fb42692ac8764e6 (patch) | |
tree | 579628462c6f56570e39e07815e90b1f0029f6a4 /t | |
parent | 3155e9407f3b7169ce13bcf57b13673506476a81 (diff) | |
download | kombu-02bfccf9f795bb153b62e0fb2fb42692ac8764e6.tar.gz |
Added basic integration tests for RabbitMQ
Diffstat (limited to 't')
-rw-r--r-- | t/integration/test_py_amqp.py | 50 |
1 files changed, 50 insertions, 0 deletions
diff --git a/t/integration/test_py_amqp.py b/t/integration/test_py_amqp.py index c247b833..30add648 100644 --- a/t/integration/test_py_amqp.py +++ b/t/integration/test_py_amqp.py @@ -1,6 +1,8 @@ from __future__ import absolute_import, unicode_literals +from contextlib import closing import os + import pytest import kombu @@ -27,3 +29,51 @@ def connection(request): def test_connect(connection): connection.connect() connection.close() + + +@pytest.mark.env('py-amqp') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +def test_publish_consume(connection): + test_queue = kombu.Queue('test', routing_key='test') + + def callback(body, message): + assert body == {'hello': 'world'} + assert message.content_type == 'application/x-python-serialize' + assert message.delivery_info['routing_key'] == 'test' + assert message.delivery_info['exchange'] == '' + assert message.headers == {'k1': 'v1'} + assert message.payload == body + + message.ack() + + with connection as conn: + with conn.channel() as channel: + producer = kombu.Producer(channel) + producer.publish( + {'hello': 'world'}, + retry=True, + headers={'k1': 'v1'}, + exchange=test_queue.exchange, + routing_key=test_queue.routing_key, + declare=[test_queue], + serializer='pickle' + ) + + consumer = kombu.Consumer(conn, [test_queue], accept=['pickle']) + consumer.register_callback(callback) + with consumer: + conn.drain_events(timeout=1) + + +@pytest.mark.env('py-amqp') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +def test_simple_publish_consume(connection): + with connection as conn: + with closing(conn.SimpleQueue('simple_test')) as queue: + queue.put({'Hello': 'World'}, headers={'k1': 'v1'}) + message = queue.get(timeout=1) + assert message.payload == {'Hello': 'World'} + assert message.content_type == 'application/json' + assert message.content_encoding == 'utf-8' + assert message.headers == {'k1': 'v1'} + message.ack() |