diff options
author | Matus Valo <matusvalo@gmail.com> | 2020-04-30 00:01:12 +0200 |
---|---|---|
committer | Asif Saif Uddin <auvipy@gmail.com> | 2020-05-03 09:39:28 +0600 |
commit | 75667082e0d6031ef5029956bd8dcff3c44f1d73 (patch) | |
tree | b10477100595413e94b94d05ad6a872c8cd2b809 /t | |
parent | 1b89912a83124f8887e2fd601f64e432b942d7e2 (diff) | |
download | kombu-75667082e0d6031ef5029956bd8dcff3c44f1d73.tar.gz |
Initial redis integration tests implementation
Diffstat (limited to 't')
-rw-r--r-- | t/integration/test_redis.py | 73 |
1 files changed, 73 insertions, 0 deletions
diff --git a/t/integration/test_redis.py b/t/integration/test_redis.py new file mode 100644 index 00000000..f4a94a95 --- /dev/null +++ b/t/integration/test_redis.py @@ -0,0 +1,73 @@ +from __future__ import absolute_import, unicode_literals + +from contextlib import closing +import os + +import pytest +import kombu + +def get_connection( + hostname, port, vhost): + return kombu.Connection('redis://{}:{}'.format(hostname, port)) + + +@pytest.fixture() +def connection(request): + # this fixture yields plain connections to broker and TLS encrypted + return get_connection( + hostname=os.environ.get('REDIS_HOST', 'localhost'), + port=os.environ.get('REDIS_6379_TCP', '6379'), + vhost=getattr( + request.config, "slaveinput", {} + ).get("slaveid", None), + ) + +@pytest.mark.env('redis') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +def test_connect(connection): + connection.connect() + connection.close() + +@pytest.mark.env('redis') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +def test_publish_consume(connection): + test_queue = kombu.Queue('test', routing_key='test') + + def callback(body, message): + assert body == {'hello': 'world'} + assert message.content_type == 'application/x-python-serialize' + message.delivery_info['routing_key'] == 'test' + message.delivery_info['exchange'] == '' + message.ack() + assert message.payload == body + + with connection as conn: + with conn.channel() as channel: + producer = kombu.Producer(channel) + producer.publish( + {'hello': 'world'}, + retry=True, + exchange=test_queue.exchange, + routing_key=test_queue.routing_key, + declare=[test_queue], + serializer='pickle' + ) + + consumer = kombu.Consumer(conn, [test_queue], accept=['pickle']) + consumer.register_callback(callback) + with consumer: + conn.drain_events(timeout=1) + + +@pytest.mark.env('redis') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +def test_simple_publish_consume(connection): + with connection as conn: + with closing(conn.SimpleQueue('simple_test')) as queue: + queue.put({'Hello': 'World'}, headers={'k1': 'v1'}) + message = queue.get(timeout=1) + assert message.payload == {'Hello': 'World'} + assert message.content_type == 'application/json' + assert message.content_encoding == 'utf-8' + assert message.headers == {'k1': 'v1'} + message.ack() |