diff options
author | Matus Valo <matusvalo@gmail.com> | 2020-05-02 22:07:34 +0200 |
---|---|---|
committer | Asif Saif Uddin <auvipy@gmail.com> | 2020-05-03 09:39:28 +0600 |
commit | 7c87441feb491370b3b8f65b6cb263268e26b8c2 (patch) | |
tree | 0d576493f9b9ee679a4ec5f52254643ddadd15b3 | |
parent | 75667082e0d6031ef5029956bd8dcff3c44f1d73 (diff) | |
download | kombu-7c87441feb491370b3b8f65b6cb263268e26b8c2.tar.gz |
Create common class of integration tests
-rw-r--r-- | t/integration/common.py | 53 | ||||
-rw-r--r-- | t/integration/test_py_amqp.py | 56 | ||||
-rw-r--r-- | t/integration/test_redis.py | 52 |
3 files changed, 62 insertions, 99 deletions
diff --git a/t/integration/common.py b/t/integration/common.py new file mode 100644 index 00000000..745ccef1 --- /dev/null +++ b/t/integration/common.py @@ -0,0 +1,53 @@ +from __future__ import absolute_import, unicode_literals + +from contextlib import closing + +import kombu + + +class BasicFunctionality(object): + + def test_connect(self, connection): + connection.connect() + connection.close() + + def test_publish_consume(self, connection): + test_queue = kombu.Queue('test', routing_key='test') + + def callback(body, message): + assert body == {'hello': 'world'} + assert message.content_type == 'application/x-python-serialize' + message.delivery_info['routing_key'] == 'test' + message.delivery_info['exchange'] == '' + message.ack() + assert message.payload == body + + with connection as conn: + with conn.channel() as channel: + producer = kombu.Producer(channel) + producer.publish( + {'hello': 'world'}, + retry=True, + exchange=test_queue.exchange, + routing_key=test_queue.routing_key, + declare=[test_queue], + serializer='pickle' + ) + + consumer = kombu.Consumer( + conn, [test_queue], accept=['pickle'] + ) + consumer.register_callback(callback) + with consumer: + conn.drain_events(timeout=1) + + def test_simple_publish_consume(self, connection): + with connection as conn: + with closing(conn.SimpleQueue('simple_test')) as queue: + queue.put({'Hello': 'World'}, headers={'k1': 'v1'}) + message = queue.get(timeout=1) + assert message.payload == {'Hello': 'World'} + assert message.content_type == 'application/json' + assert message.content_encoding == 'utf-8' + assert message.headers == {'k1': 'v1'} + message.ack() diff --git a/t/integration/test_py_amqp.py b/t/integration/test_py_amqp.py index 30add648..f337be94 100644 --- a/t/integration/test_py_amqp.py +++ b/t/integration/test_py_amqp.py @@ -1,11 +1,12 @@ from __future__ import absolute_import, unicode_literals -from contextlib import closing import os import pytest import kombu +from .common import BasicFunctionality + def get_connection( hostname, port, vhost): @@ -26,54 +27,5 @@ def connection(request): @pytest.mark.env('py-amqp') @pytest.mark.flaky(reruns=5, reruns_delay=2) -def test_connect(connection): - connection.connect() - connection.close() - - -@pytest.mark.env('py-amqp') -@pytest.mark.flaky(reruns=5, reruns_delay=2) -def test_publish_consume(connection): - test_queue = kombu.Queue('test', routing_key='test') - - def callback(body, message): - assert body == {'hello': 'world'} - assert message.content_type == 'application/x-python-serialize' - assert message.delivery_info['routing_key'] == 'test' - assert message.delivery_info['exchange'] == '' - assert message.headers == {'k1': 'v1'} - assert message.payload == body - - message.ack() - - with connection as conn: - with conn.channel() as channel: - producer = kombu.Producer(channel) - producer.publish( - {'hello': 'world'}, - retry=True, - headers={'k1': 'v1'}, - exchange=test_queue.exchange, - routing_key=test_queue.routing_key, - declare=[test_queue], - serializer='pickle' - ) - - consumer = kombu.Consumer(conn, [test_queue], accept=['pickle']) - consumer.register_callback(callback) - with consumer: - conn.drain_events(timeout=1) - - -@pytest.mark.env('py-amqp') -@pytest.mark.flaky(reruns=5, reruns_delay=2) -def test_simple_publish_consume(connection): - with connection as conn: - with closing(conn.SimpleQueue('simple_test')) as queue: - queue.put({'Hello': 'World'}, headers={'k1': 'v1'}) - message = queue.get(timeout=1) - assert message.payload == {'Hello': 'World'} - assert message.content_type == 'application/json' - assert message.content_encoding == 'utf-8' - assert message.headers == {'k1': 'v1'} - message.ack() +class test_BasicFunctionality(BasicFunctionality): + pass diff --git a/t/integration/test_redis.py b/t/integration/test_redis.py index f4a94a95..6a132bff 100644 --- a/t/integration/test_redis.py +++ b/t/integration/test_redis.py @@ -1,11 +1,13 @@ from __future__ import absolute_import, unicode_literals -from contextlib import closing import os import pytest import kombu +from .common import BasicFunctionality + + def get_connection( hostname, port, vhost): return kombu.Connection('redis://{}:{}'.format(hostname, port)) @@ -22,52 +24,8 @@ def connection(request): ).get("slaveid", None), ) -@pytest.mark.env('redis') -@pytest.mark.flaky(reruns=5, reruns_delay=2) -def test_connect(connection): - connection.connect() - connection.close() - -@pytest.mark.env('redis') -@pytest.mark.flaky(reruns=5, reruns_delay=2) -def test_publish_consume(connection): - test_queue = kombu.Queue('test', routing_key='test') - - def callback(body, message): - assert body == {'hello': 'world'} - assert message.content_type == 'application/x-python-serialize' - message.delivery_info['routing_key'] == 'test' - message.delivery_info['exchange'] == '' - message.ack() - assert message.payload == body - - with connection as conn: - with conn.channel() as channel: - producer = kombu.Producer(channel) - producer.publish( - {'hello': 'world'}, - retry=True, - exchange=test_queue.exchange, - routing_key=test_queue.routing_key, - declare=[test_queue], - serializer='pickle' - ) - - consumer = kombu.Consumer(conn, [test_queue], accept=['pickle']) - consumer.register_callback(callback) - with consumer: - conn.drain_events(timeout=1) - @pytest.mark.env('redis') @pytest.mark.flaky(reruns=5, reruns_delay=2) -def test_simple_publish_consume(connection): - with connection as conn: - with closing(conn.SimpleQueue('simple_test')) as queue: - queue.put({'Hello': 'World'}, headers={'k1': 'v1'}) - message = queue.get(timeout=1) - assert message.payload == {'Hello': 'World'} - assert message.content_type == 'application/json' - assert message.content_encoding == 'utf-8' - assert message.headers == {'k1': 'v1'} - message.ack() +class test_BasicFunctionality(BasicFunctionality): + pass |