summaryrefslogtreecommitdiff
path: root/t
diff options
context:
space:
mode:
authorMatus Valo <matusvalo@gmail.com>2020-04-30 00:01:12 +0200
committerAsif Saif Uddin <auvipy@gmail.com>2020-05-03 09:39:28 +0600
commit75667082e0d6031ef5029956bd8dcff3c44f1d73 (patch)
treeb10477100595413e94b94d05ad6a872c8cd2b809 /t
parent1b89912a83124f8887e2fd601f64e432b942d7e2 (diff)
downloadkombu-75667082e0d6031ef5029956bd8dcff3c44f1d73.tar.gz
Initial redis integration tests implementation
Diffstat (limited to 't')
-rw-r--r--t/integration/test_redis.py73
1 files changed, 73 insertions, 0 deletions
diff --git a/t/integration/test_redis.py b/t/integration/test_redis.py
new file mode 100644
index 00000000..f4a94a95
--- /dev/null
+++ b/t/integration/test_redis.py
@@ -0,0 +1,73 @@
+from __future__ import absolute_import, unicode_literals
+
+from contextlib import closing
+import os
+
+import pytest
+import kombu
+
+def get_connection(
+ hostname, port, vhost):
+ return kombu.Connection('redis://{}:{}'.format(hostname, port))
+
+
+@pytest.fixture()
+def connection(request):
+ # this fixture yields plain connections to broker and TLS encrypted
+ return get_connection(
+ hostname=os.environ.get('REDIS_HOST', 'localhost'),
+ port=os.environ.get('REDIS_6379_TCP', '6379'),
+ vhost=getattr(
+ request.config, "slaveinput", {}
+ ).get("slaveid", None),
+ )
+
+@pytest.mark.env('redis')
+@pytest.mark.flaky(reruns=5, reruns_delay=2)
+def test_connect(connection):
+ connection.connect()
+ connection.close()
+
+@pytest.mark.env('redis')
+@pytest.mark.flaky(reruns=5, reruns_delay=2)
+def test_publish_consume(connection):
+ test_queue = kombu.Queue('test', routing_key='test')
+
+ def callback(body, message):
+ assert body == {'hello': 'world'}
+ assert message.content_type == 'application/x-python-serialize'
+ message.delivery_info['routing_key'] == 'test'
+ message.delivery_info['exchange'] == ''
+ message.ack()
+ assert message.payload == body
+
+ with connection as conn:
+ with conn.channel() as channel:
+ producer = kombu.Producer(channel)
+ producer.publish(
+ {'hello': 'world'},
+ retry=True,
+ exchange=test_queue.exchange,
+ routing_key=test_queue.routing_key,
+ declare=[test_queue],
+ serializer='pickle'
+ )
+
+ consumer = kombu.Consumer(conn, [test_queue], accept=['pickle'])
+ consumer.register_callback(callback)
+ with consumer:
+ conn.drain_events(timeout=1)
+
+
+@pytest.mark.env('redis')
+@pytest.mark.flaky(reruns=5, reruns_delay=2)
+def test_simple_publish_consume(connection):
+ with connection as conn:
+ with closing(conn.SimpleQueue('simple_test')) as queue:
+ queue.put({'Hello': 'World'}, headers={'k1': 'v1'})
+ message = queue.get(timeout=1)
+ assert message.payload == {'Hello': 'World'}
+ assert message.content_type == 'application/json'
+ assert message.content_encoding == 'utf-8'
+ assert message.headers == {'k1': 'v1'}
+ message.ack()