summaryrefslogtreecommitdiff
path: root/t
diff options
context:
space:
mode:
authorMatus Valo <matusvalo@gmail.com>2020-04-29 23:11:54 +0200
committerAsif Saif Uddin <auvipy@gmail.com>2020-04-30 10:43:50 +0600
commit02bfccf9f795bb153b62e0fb2fb42692ac8764e6 (patch)
tree579628462c6f56570e39e07815e90b1f0029f6a4 /t
parent3155e9407f3b7169ce13bcf57b13673506476a81 (diff)
downloadkombu-02bfccf9f795bb153b62e0fb2fb42692ac8764e6.tar.gz
Added basic integration tests for RabbitMQ
Diffstat (limited to 't')
-rw-r--r--t/integration/test_py_amqp.py50
1 files changed, 50 insertions, 0 deletions
diff --git a/t/integration/test_py_amqp.py b/t/integration/test_py_amqp.py
index c247b833..30add648 100644
--- a/t/integration/test_py_amqp.py
+++ b/t/integration/test_py_amqp.py
@@ -1,6 +1,8 @@
from __future__ import absolute_import, unicode_literals
+from contextlib import closing
import os
+
import pytest
import kombu
@@ -27,3 +29,51 @@ def connection(request):
def test_connect(connection):
connection.connect()
connection.close()
+
+
+@pytest.mark.env('py-amqp')
+@pytest.mark.flaky(reruns=5, reruns_delay=2)
+def test_publish_consume(connection):
+ test_queue = kombu.Queue('test', routing_key='test')
+
+ def callback(body, message):
+ assert body == {'hello': 'world'}
+ assert message.content_type == 'application/x-python-serialize'
+ assert message.delivery_info['routing_key'] == 'test'
+ assert message.delivery_info['exchange'] == ''
+ assert message.headers == {'k1': 'v1'}
+ assert message.payload == body
+
+ message.ack()
+
+ with connection as conn:
+ with conn.channel() as channel:
+ producer = kombu.Producer(channel)
+ producer.publish(
+ {'hello': 'world'},
+ retry=True,
+ headers={'k1': 'v1'},
+ exchange=test_queue.exchange,
+ routing_key=test_queue.routing_key,
+ declare=[test_queue],
+ serializer='pickle'
+ )
+
+ consumer = kombu.Consumer(conn, [test_queue], accept=['pickle'])
+ consumer.register_callback(callback)
+ with consumer:
+ conn.drain_events(timeout=1)
+
+
+@pytest.mark.env('py-amqp')
+@pytest.mark.flaky(reruns=5, reruns_delay=2)
+def test_simple_publish_consume(connection):
+ with connection as conn:
+ with closing(conn.SimpleQueue('simple_test')) as queue:
+ queue.put({'Hello': 'World'}, headers={'k1': 'v1'})
+ message = queue.get(timeout=1)
+ assert message.payload == {'Hello': 'World'}
+ assert message.content_type == 'application/json'
+ assert message.content_encoding == 'utf-8'
+ assert message.headers == {'k1': 'v1'}
+ message.ack()