summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatus Valo <matusvalo@gmail.com>2020-05-02 22:07:34 +0200
committerAsif Saif Uddin <auvipy@gmail.com>2020-05-03 09:39:28 +0600
commit7c87441feb491370b3b8f65b6cb263268e26b8c2 (patch)
tree0d576493f9b9ee679a4ec5f52254643ddadd15b3
parent75667082e0d6031ef5029956bd8dcff3c44f1d73 (diff)
downloadkombu-7c87441feb491370b3b8f65b6cb263268e26b8c2.tar.gz
Create common class of integration tests
-rw-r--r--t/integration/common.py53
-rw-r--r--t/integration/test_py_amqp.py56
-rw-r--r--t/integration/test_redis.py52
3 files changed, 62 insertions, 99 deletions
diff --git a/t/integration/common.py b/t/integration/common.py
new file mode 100644
index 00000000..745ccef1
--- /dev/null
+++ b/t/integration/common.py
@@ -0,0 +1,53 @@
+from __future__ import absolute_import, unicode_literals
+
+from contextlib import closing
+
+import kombu
+
+
+class BasicFunctionality(object):
+
+ def test_connect(self, connection):
+ connection.connect()
+ connection.close()
+
+ def test_publish_consume(self, connection):
+ test_queue = kombu.Queue('test', routing_key='test')
+
+ def callback(body, message):
+ assert body == {'hello': 'world'}
+ assert message.content_type == 'application/x-python-serialize'
+ message.delivery_info['routing_key'] == 'test'
+ message.delivery_info['exchange'] == ''
+ message.ack()
+ assert message.payload == body
+
+ with connection as conn:
+ with conn.channel() as channel:
+ producer = kombu.Producer(channel)
+ producer.publish(
+ {'hello': 'world'},
+ retry=True,
+ exchange=test_queue.exchange,
+ routing_key=test_queue.routing_key,
+ declare=[test_queue],
+ serializer='pickle'
+ )
+
+ consumer = kombu.Consumer(
+ conn, [test_queue], accept=['pickle']
+ )
+ consumer.register_callback(callback)
+ with consumer:
+ conn.drain_events(timeout=1)
+
+ def test_simple_publish_consume(self, connection):
+ with connection as conn:
+ with closing(conn.SimpleQueue('simple_test')) as queue:
+ queue.put({'Hello': 'World'}, headers={'k1': 'v1'})
+ message = queue.get(timeout=1)
+ assert message.payload == {'Hello': 'World'}
+ assert message.content_type == 'application/json'
+ assert message.content_encoding == 'utf-8'
+ assert message.headers == {'k1': 'v1'}
+ message.ack()
diff --git a/t/integration/test_py_amqp.py b/t/integration/test_py_amqp.py
index 30add648..f337be94 100644
--- a/t/integration/test_py_amqp.py
+++ b/t/integration/test_py_amqp.py
@@ -1,11 +1,12 @@
from __future__ import absolute_import, unicode_literals
-from contextlib import closing
import os
import pytest
import kombu
+from .common import BasicFunctionality
+
def get_connection(
hostname, port, vhost):
@@ -26,54 +27,5 @@ def connection(request):
@pytest.mark.env('py-amqp')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
-def test_connect(connection):
- connection.connect()
- connection.close()
-
-
-@pytest.mark.env('py-amqp')
-@pytest.mark.flaky(reruns=5, reruns_delay=2)
-def test_publish_consume(connection):
- test_queue = kombu.Queue('test', routing_key='test')
-
- def callback(body, message):
- assert body == {'hello': 'world'}
- assert message.content_type == 'application/x-python-serialize'
- assert message.delivery_info['routing_key'] == 'test'
- assert message.delivery_info['exchange'] == ''
- assert message.headers == {'k1': 'v1'}
- assert message.payload == body
-
- message.ack()
-
- with connection as conn:
- with conn.channel() as channel:
- producer = kombu.Producer(channel)
- producer.publish(
- {'hello': 'world'},
- retry=True,
- headers={'k1': 'v1'},
- exchange=test_queue.exchange,
- routing_key=test_queue.routing_key,
- declare=[test_queue],
- serializer='pickle'
- )
-
- consumer = kombu.Consumer(conn, [test_queue], accept=['pickle'])
- consumer.register_callback(callback)
- with consumer:
- conn.drain_events(timeout=1)
-
-
-@pytest.mark.env('py-amqp')
-@pytest.mark.flaky(reruns=5, reruns_delay=2)
-def test_simple_publish_consume(connection):
- with connection as conn:
- with closing(conn.SimpleQueue('simple_test')) as queue:
- queue.put({'Hello': 'World'}, headers={'k1': 'v1'})
- message = queue.get(timeout=1)
- assert message.payload == {'Hello': 'World'}
- assert message.content_type == 'application/json'
- assert message.content_encoding == 'utf-8'
- assert message.headers == {'k1': 'v1'}
- message.ack()
+class test_BasicFunctionality(BasicFunctionality):
+ pass
diff --git a/t/integration/test_redis.py b/t/integration/test_redis.py
index f4a94a95..6a132bff 100644
--- a/t/integration/test_redis.py
+++ b/t/integration/test_redis.py
@@ -1,11 +1,13 @@
from __future__ import absolute_import, unicode_literals
-from contextlib import closing
import os
import pytest
import kombu
+from .common import BasicFunctionality
+
+
def get_connection(
hostname, port, vhost):
return kombu.Connection('redis://{}:{}'.format(hostname, port))
@@ -22,52 +24,8 @@ def connection(request):
).get("slaveid", None),
)
-@pytest.mark.env('redis')
-@pytest.mark.flaky(reruns=5, reruns_delay=2)
-def test_connect(connection):
- connection.connect()
- connection.close()
-
-@pytest.mark.env('redis')
-@pytest.mark.flaky(reruns=5, reruns_delay=2)
-def test_publish_consume(connection):
- test_queue = kombu.Queue('test', routing_key='test')
-
- def callback(body, message):
- assert body == {'hello': 'world'}
- assert message.content_type == 'application/x-python-serialize'
- message.delivery_info['routing_key'] == 'test'
- message.delivery_info['exchange'] == ''
- message.ack()
- assert message.payload == body
-
- with connection as conn:
- with conn.channel() as channel:
- producer = kombu.Producer(channel)
- producer.publish(
- {'hello': 'world'},
- retry=True,
- exchange=test_queue.exchange,
- routing_key=test_queue.routing_key,
- declare=[test_queue],
- serializer='pickle'
- )
-
- consumer = kombu.Consumer(conn, [test_queue], accept=['pickle'])
- consumer.register_callback(callback)
- with consumer:
- conn.drain_events(timeout=1)
-
@pytest.mark.env('redis')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
-def test_simple_publish_consume(connection):
- with connection as conn:
- with closing(conn.SimpleQueue('simple_test')) as queue:
- queue.put({'Hello': 'World'}, headers={'k1': 'v1'})
- message = queue.get(timeout=1)
- assert message.payload == {'Hello': 'World'}
- assert message.content_type == 'application/json'
- assert message.content_encoding == 'utf-8'
- assert message.headers == {'k1': 'v1'}
- message.ack()
+class test_BasicFunctionality(BasicFunctionality):
+ pass