summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatus Valo <matusvalo@gmail.com>2020-04-27 10:14:53 +0200
committerAsif Saif Uddin <auvipy@gmail.com>2020-04-30 10:43:50 +0600
commit3155e9407f3b7169ce13bcf57b13673506476a81 (patch)
treebc9ca8e846b68a992c64d6808414968f7a4bf3f3
parent8f1de37a08318d388a048341ddca12af30019bf3 (diff)
downloadkombu-3155e9407f3b7169ce13bcf57b13673506476a81.tar.gz
Added integration testing infrastructure for RabbitMQ
-rw-r--r--.travis.yml29
-rw-r--r--conftest.py17
-rw-r--r--rabbitmq_logs.sh12
-rw-r--r--t/integration/conftest.py10
-rw-r--r--t/integration/test_py_amqp.py29
-rw-r--r--t/integration/tests/__init__.py9
-rw-r--r--t/integration/tests/test_SLMQ.py20
-rw-r--r--t/integration/tests/test_SQS.py21
-rw-r--r--t/integration/tests/test_amqp.py8
-rw-r--r--t/integration/tests/test_azureservicebus.py12
-rw-r--r--t/integration/tests/test_azurestoragequeues.py16
-rw-r--r--t/integration/tests/test_librabbitmq.py11
-rw-r--r--t/integration/tests/test_mongodb.py76
-rw-r--r--t/integration/tests/test_pyamqp.py8
-rw-r--r--t/integration/tests/test_qpid.py11
-rw-r--r--t/integration/tests/test_redis.py20
-rw-r--r--t/integration/tests/test_sqla.py13
-rw-r--r--t/integration/tests/test_zookeeper.py15
-rw-r--r--t/integration/transport.py305
-rw-r--r--tox.ini33
20 files changed, 112 insertions, 563 deletions
diff --git a/.travis.yml b/.travis.yml
index 0c90e8f3..7221640a 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -11,28 +11,38 @@ addons:
env:
global:
PYTHONUNBUFFERED=yes
+ matrix:
+ - MATRIX_TOXENV=unit
stages:
- test
+ - integration
- lint
+_integration_job_py_amqp: &integration_job_py_amqp
+ script:
+ - tox -v -- -v
+ services:
+ - docker
+ stage: integration
+
jobs:
include:
- python: 2.7
- env: TOXENV=2.7
+ env: TOXENV=2.7-unit
- python: 3.5
- env: TOXENV=3.5
+ env: TOXENV=3.5-unit
- python: 3.6
- env: TOXENV=3.6
+ env: TOXENV=3.6-unit
- python: 3.7
- env: TOXENV=3.7-linux
+ env: TOXENV=3.7-linux-unit
- python: 3.8
- env: TOXENV=3.8-linux
+ env: TOXENV=3.8-linux-unit
- python: pypy2.7-7.1.1
- env: TOXENV=pypy
+ env: TOXENV=pypy-unit
dist: xenial
- python: pypy3.5-7.0
- env: TOXENV=pypy3
+ env: TOXENV=pypy3-unit
dist: xenial
- env: TOXENV=flake8
stage: lint
@@ -44,10 +54,13 @@ jobs:
- python: '3.6'
env: TOXENV=pydocstyle
stage: lint
+ - python: 2.7
+ <<: *integration_job_py_amqp
+ env: TOXENV=2.7-integration-py-amqp
install:
- pip --disable-pip-version-check install -U pip setuptools wheel | cat
- - pip --disable-pip-version-check install -U tox | cat
+ - pip --disable-pip-version-check install -U tox tox-docker | cat
script: tox -v -- -v
after_success:
- .tox/$TRAVIS_PYTHON_VERSION/bin/coverage xml
diff --git a/conftest.py b/conftest.py
new file mode 100644
index 00000000..f6d7d655
--- /dev/null
+++ b/conftest.py
@@ -0,0 +1,17 @@
+import pytest
+def pytest_addoption(parser):
+ parser.addoption("-E", action="append", metavar="NAME",
+ help="only run tests matching the environment NAME.")
+
+def pytest_configure(config):
+ # register an additional marker
+ config.addinivalue_line("markers",
+ "env(name): mark test to run only on named environment")
+
+def pytest_runtest_setup(item):
+ envnames = [mark.args[0] for mark in item.iter_markers(name='env')]
+ if envnames:
+ if item.config.getoption("-E") is None or len(set(item.config.getoption("-E")) & set(envnames)) == 0:
+ # We skip test if does not mentioned by -E param
+ pytest.skip("test requires env in %r" % envnames)
+
diff --git a/rabbitmq_logs.sh b/rabbitmq_logs.sh
new file mode 100644
index 00000000..387a0b0a
--- /dev/null
+++ b/rabbitmq_logs.sh
@@ -0,0 +1,12 @@
+#!/bin/bash
+
+containers=$(sudo docker ps -q | tail -n +1)
+
+for item in ${containers//\\n/}
+do
+ env=$(sudo docker inspect -f '{{range $index, $value := .Config.Env}}{{$value}} {{end}}' $item);
+ if [[ $env == *"PYAMQP_INTEGRATION_INSTANCE=1"* ]]; then
+ grep -m1 'Server startup complete' <(sudo docker logs -f $item)
+ sudo docker logs $item
+ fi
+done;
diff --git a/t/integration/conftest.py b/t/integration/conftest.py
new file mode 100644
index 00000000..4880e5c0
--- /dev/null
+++ b/t/integration/conftest.py
@@ -0,0 +1,10 @@
+from __future__ import absolute_import, unicode_literals
+
+import os
+import subprocess
+
+
+def pytest_sessionfinish(session, exitstatus):
+ tox_env_dir = os.environ.get('TOX_WORK_DIR')
+ if exitstatus and tox_env_dir:
+ subprocess.call(["bash", "./rabbitmq_logs.sh"])
diff --git a/t/integration/test_py_amqp.py b/t/integration/test_py_amqp.py
new file mode 100644
index 00000000..c247b833
--- /dev/null
+++ b/t/integration/test_py_amqp.py
@@ -0,0 +1,29 @@
+from __future__ import absolute_import, unicode_literals
+
+import os
+import pytest
+import kombu
+
+
+def get_connection(
+ hostname, port, vhost):
+ return kombu.Connection('amqp://{}:{}'.format(hostname, port))
+
+
+@pytest.fixture()
+def connection(request):
+ # this fixture yields plain connections to broker and TLS encrypted
+ return get_connection(
+ hostname=os.environ.get('RABBITMQ_HOST', 'localhost'),
+ port=os.environ.get('RABBITMQ_5672_TCP', '5672'),
+ vhost=getattr(
+ request.config, "slaveinput", {}
+ ).get("slaveid", None),
+ )
+
+
+@pytest.mark.env('py-amqp')
+@pytest.mark.flaky(reruns=5, reruns_delay=2)
+def test_connect(connection):
+ connection.connect()
+ connection.close()
diff --git a/t/integration/tests/__init__.py b/t/integration/tests/__init__.py
deleted file mode 100644
index 2347cc18..00000000
--- a/t/integration/tests/__init__.py
+++ /dev/null
@@ -1,9 +0,0 @@
-from __future__ import absolute_import, unicode_literals
-
-import os
-import sys
-
-sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))
-print(sys.path[0])
-sys.path.insert(0, os.getcwd())
-print(sys.path[0])
diff --git a/t/integration/tests/test_SLMQ.py b/t/integration/tests/test_SLMQ.py
deleted file mode 100644
index 33ba76ad..00000000
--- a/t/integration/tests/test_SLMQ.py
+++ /dev/null
@@ -1,20 +0,0 @@
-from __future__ import absolute_import, unicode_literals
-
-from funtests import transport
-
-from kombu.tests.case import skip
-
-
-@skip.unless_environ('SLMQ_ACCOUNT')
-@skip.unless_environ('SL_USERNAME')
-@skip.unless_environ('SL_API_KEY')
-@skip.unless_environ('SLMQ_HOST')
-@skip.unless_environ('SLMQ_SECURE')
-class test_SLMQ(transport.TransportCase):
- transport = 'SLMQ'
- prefix = 'slmq'
- event_loop_max = 100
- message_size_limit = 4192
- reliable_purge = False
- #: does not guarantee FIFO order, even in simple cases.
- suppress_disorder_warning = True
diff --git a/t/integration/tests/test_SQS.py b/t/integration/tests/test_SQS.py
deleted file mode 100644
index 4949bcad..00000000
--- a/t/integration/tests/test_SQS.py
+++ /dev/null
@@ -1,21 +0,0 @@
-from __future__ import absolute_import, unicode_literals
-
-from funtests import transport
-
-from kombu.tests.case import skip
-
-
-@skip.unless_environ('AWS_ACCESS_KEY_ID')
-@skip.unless_environ('AWS_SECRET_ACCESS_KEY')
-@skip.unless_module('boto3')
-class test_SQS(transport.TransportCase):
- transport = 'SQS'
- prefix = 'sqs'
- event_loop_max = 100
- message_size_limit = 4192 # SQS max body size / 2.
- reliable_purge = False
- #: does not guarantee FIFO order, even in simple cases
- suppress_disorder_warning = True
-
- def after_connect(self, connection):
- connection.channel().sqs
diff --git a/t/integration/tests/test_amqp.py b/t/integration/tests/test_amqp.py
deleted file mode 100644
index 39dfbbf1..00000000
--- a/t/integration/tests/test_amqp.py
+++ /dev/null
@@ -1,8 +0,0 @@
-from __future__ import absolute_import, unicode_literals
-
-from funtests import transport
-
-
-class test_pyamqp(transport.TransportCase):
- transport = 'pyamqp'
- prefix = 'pyamqp'
diff --git a/t/integration/tests/test_azureservicebus.py b/t/integration/tests/test_azureservicebus.py
deleted file mode 100644
index 98dc7340..00000000
--- a/t/integration/tests/test_azureservicebus.py
+++ /dev/null
@@ -1,12 +0,0 @@
-from __future__ import absolute_import, unicode_literals
-
-from t.integration import transport
-
-from case import skip
-
-
-@skip.unless_module('azure.servicebus')
-class test_azureservicebus(transport.TransportCase):
- transport = 'azureservicebus'
- prefix = 'azureservicebus'
- message_size_limit = 32000
diff --git a/t/integration/tests/test_azurestoragequeues.py b/t/integration/tests/test_azurestoragequeues.py
deleted file mode 100644
index eb66c6be..00000000
--- a/t/integration/tests/test_azurestoragequeues.py
+++ /dev/null
@@ -1,16 +0,0 @@
-from __future__ import absolute_import, unicode_literals
-
-from t.integration import transport
-
-from case import skip
-
-
-@skip.unless_module('azure.storage.queue')
-class test_azurestoragequeues(transport.TransportCase):
- transport = 'azurestoragequeues'
- prefix = 'azurestoragequeues'
- event_loop_max = 100
- message_size_limit = 32000
- reliable_purge = False
- #: does not guarantee FIFO order, even in simple cases.
- suppress_disorder_warning = True
diff --git a/t/integration/tests/test_librabbitmq.py b/t/integration/tests/test_librabbitmq.py
deleted file mode 100644
index 11840ca1..00000000
--- a/t/integration/tests/test_librabbitmq.py
+++ /dev/null
@@ -1,11 +0,0 @@
-from __future__ import absolute_import, unicode_literals
-
-from funtests import transport
-
-from kombu.tests.case import skip
-
-
-@skip.unless_module('librabbitmq')
-class test_librabbitmq(transport.TransportCase):
- transport = 'librabbitmq'
- prefix = 'librabbitmq'
diff --git a/t/integration/tests/test_mongodb.py b/t/integration/tests/test_mongodb.py
deleted file mode 100644
index dcf00ef1..00000000
--- a/t/integration/tests/test_mongodb.py
+++ /dev/null
@@ -1,76 +0,0 @@
-from __future__ import absolute_import, unicode_literals
-
-from kombu import Consumer, Producer, Exchange, Queue
-from kombu.five import range
-from kombu.utils.compat import nested
-
-from funtests import transport
-
-from kombu.tests.case import skip
-
-
-@skip.unless_module('pymongo')
-class test_mongodb(transport.TransportCase):
- transport = 'mongodb'
- prefix = 'mongodb'
- event_loop_max = 100
-
- def after_connect(self, connection):
- connection.channel().client # evaluate connection.
- self.c = self.connection # shortcut
-
- def test_fanout(self, name='test_mongodb_fanout'):
- if not self.verify_alive():
- return
- c = self.connection
- self.e = Exchange(name, type='fanout')
- self.q = Queue(name, exchange=self.e, routing_key=name)
- self.q2 = Queue(name + '2', exchange=self.e, routing_key=name + '2')
-
- channel = c.default_channel
- producer = Producer(channel, self.e)
- consumer1 = Consumer(channel, self.q)
- consumer2 = Consumer(channel, self.q2)
- self.q2(channel).declare()
-
- for i in range(10):
- producer.publish({'foo': i}, routing_key=name)
- for i in range(10):
- producer.publish({'foo': i}, routing_key=name + '2')
-
- _received1 = []
- _received2 = []
-
- def callback1(message_data, message):
- _received1.append(message)
- message.ack()
-
- def callback2(message_data, message):
- _received2.append(message)
- message.ack()
-
- consumer1.register_callback(callback1)
- consumer2.register_callback(callback2)
-
- with nested(consumer1, consumer2):
-
- while 1:
- if len(_received1) + len(_received2) == 20:
- break
- c.drain_events(timeout=60)
- self.assertEqual(len(_received1) + len(_received2), 20)
-
- # queue.delete
- for i in range(10):
- producer.publish({'foo': i}, routing_key=name)
- self.assertTrue(self.q(channel).get())
- self.q(channel).delete()
- self.q(channel).declare()
- self.assertIsNone(self.q(channel).get())
-
- # queue.purge
- for i in range(10):
- producer.publish({'foo': i}, routing_key=name + '2')
- self.assertTrue(self.q2(channel).get())
- self.q2(channel).purge()
- self.assertIsNone(self.q2(channel).get())
diff --git a/t/integration/tests/test_pyamqp.py b/t/integration/tests/test_pyamqp.py
deleted file mode 100644
index 39dfbbf1..00000000
--- a/t/integration/tests/test_pyamqp.py
+++ /dev/null
@@ -1,8 +0,0 @@
-from __future__ import absolute_import, unicode_literals
-
-from funtests import transport
-
-
-class test_pyamqp(transport.TransportCase):
- transport = 'pyamqp'
- prefix = 'pyamqp'
diff --git a/t/integration/tests/test_qpid.py b/t/integration/tests/test_qpid.py
deleted file mode 100644
index d7e53e82..00000000
--- a/t/integration/tests/test_qpid.py
+++ /dev/null
@@ -1,11 +0,0 @@
-from __future__ import absolute_import, unicode_literals
-
-from funtests import transport
-
-from kombu.tests.case import skip
-
-
-@skip.unless_module('qpid.messaging')
-class test_qpid(transport.TransportCase):
- transport = 'qpid'
- prefix = 'qpid'
diff --git a/t/integration/tests/test_redis.py b/t/integration/tests/test_redis.py
deleted file mode 100644
index 48c3916e..00000000
--- a/t/integration/tests/test_redis.py
+++ /dev/null
@@ -1,20 +0,0 @@
-from __future__ import absolute_import, unicode_literals
-
-from funtests import transport
-
-from kombu.tests.case import skip
-
-
-@skip.unless_module('redis')
-class test_redis(transport.TransportCase):
- transport = 'redis'
- prefix = 'redis'
-
- def after_connect(self, connection):
- client = connection.channel().client
- client.info()
-
- def test_cannot_connect_raises_connection_error(self):
- conn = self.get_connection(port=65534)
- with self.assertRaises(conn.connection_errors):
- conn.connect()
diff --git a/t/integration/tests/test_sqla.py b/t/integration/tests/test_sqla.py
deleted file mode 100644
index f145c07e..00000000
--- a/t/integration/tests/test_sqla.py
+++ /dev/null
@@ -1,13 +0,0 @@
-from __future__ import absolute_import, unicode_literals
-
-from funtests import transport
-
-from kombu.tests.case import skip
-
-
-@skip.unless_module('sqlalchemy')
-class test_sqla(transport.TransportCase):
- transport = 'sqlalchemy'
- prefix = 'sqlalchemy'
- event_loop_max = 10
- connection_options = {'hostname': 'sqla+sqlite://'}
diff --git a/t/integration/tests/test_zookeeper.py b/t/integration/tests/test_zookeeper.py
deleted file mode 100644
index 147bfdc9..00000000
--- a/t/integration/tests/test_zookeeper.py
+++ /dev/null
@@ -1,15 +0,0 @@
-from __future__ import absolute_import, unicode_literals
-
-from funtests import transport
-
-from kombu.tests.case import skip
-
-
-@skip.unless_module('kazoo')
-class test_zookeeper(transport.TransportCase):
- transport = 'zookeeper'
- prefix = 'zookeeper'
- event_loop_max = 100
-
- def after_connect(self, connection):
- connection.channel().client
diff --git a/t/integration/transport.py b/t/integration/transport.py
deleted file mode 100644
index ef3186a7..00000000
--- a/t/integration/transport.py
+++ /dev/null
@@ -1,305 +0,0 @@
-from __future__ import absolute_import, print_function, unicode_literals
-
-import random
-import socket
-import string
-import time
-import unittest
-import warnings
-import weakref
-
-from case.skip import SkipTest
-
-from hashlib import sha256 as _digest
-
-from kombu import Connection
-from kombu import Exchange, Queue
-from kombu.five import buffer_t, range
-from kombu.utils.encoding import str_to_bytes
-
-
-def _nobuf(x):
- return [str(i) if isinstance(i, buffer_t) else i for i in x]
-
-
-def consumeN(conn, consumer, n=1, timeout=30):
- messages = []
-
- def callback(message_data, message):
- messages.append(message_data)
- message.ack()
-
- prev, consumer.callbacks = consumer.callbacks, [callback]
- consumer.consume()
-
- seconds = 0
- while True:
- try:
- conn.drain_events(timeout=1)
- except socket.timeout:
- seconds += 1
- msg = 'Received %s/%s messages. %s seconds passed.' % (
- len(messages), n, seconds)
- if seconds >= timeout:
- raise socket.timeout(msg)
- if seconds > 1:
- print(msg)
- if len(messages) >= n:
- break
-
- consumer.cancel()
- consumer.callback = prev
- return messages
-
-
-class TransportCase(unittest.TestCase):
- transport = None
- prefix = None
- sep = '.'
- userid = None
- password = None
- event_loop_max = 100
- connection_options = {}
- suppress_disorder_warning = False
- reliable_purge = True
-
- connected = False
- skip_test_reason = None
-
- message_size_limit = None
-
- def before_connect(self):
- pass
-
- def after_connect(self, connection):
- pass
-
- def setUp(self):
- if self.transport:
- try:
- self.before_connect()
- except SkipTest as exc:
- self.skip_test_reason = str(exc)
- else:
- self.do_connect()
- self.exchange = Exchange(self.prefix, 'direct')
- self.queue = Queue(self.prefix, self.exchange, self.prefix)
-
- def purge(self, names):
- chan = self.connection.channel()
- total = 0
- for queue in names:
- while 1:
- # ensure the queue is completly empty
- purged = chan.queue_purge(queue=queue)
- if not purged:
- break
- total += purged
- chan.close()
- return total
-
- def get_connection(self, **options):
- if self.userid:
- options.setdefault('userid', self.userid)
- if self.password:
- options.setdefault('password', self.password)
- return Connection(transport=self.transport, **options)
-
- def do_connect(self):
- self.connection = self.get_connection(**self.connection_options)
- try:
- self.connection.connect()
- self.after_connect(self.connection)
- except self.connection.connection_errors:
- self.skip_test_reason = '{0} transport cannot connect'.format(
- self.transport,
- )
- else:
- self.connected = True
-
- def verify_alive(self):
- if self.transport:
- if not self.connected:
- raise SkipTest(self.skip_test_reason)
- return True
-
- def purge_consumer(self, consumer):
- return self.purge([queue.name for queue in consumer.queues])
-
- def test_produce__consume(self):
- if not self.verify_alive():
- return
- chan1 = self.connection.channel()
- consumer = chan1.Consumer(self.queue)
- self.purge_consumer(consumer)
- producer = chan1.Producer(self.exchange)
- producer.publish({'foo': 'bar'}, routing_key=self.prefix)
- message = consumeN(self.connection, consumer)
- self.assertDictEqual(message[0], {'foo': 'bar'})
- chan1.close()
- self.purge([self.queue.name])
-
- def test_purge(self):
- if not self.verify_alive():
- return
- chan1 = self.connection.channel()
- consumer = chan1.Consumer(self.queue)
- self.purge_consumer(consumer)
-
- producer = chan1.Producer(self.exchange)
- for i in range(10):
- producer.publish({'foo': 'bar'}, routing_key=self.prefix)
- if self.reliable_purge:
- self.assertEqual(consumer.purge(), 10)
- self.assertEqual(consumer.purge(), 0)
- else:
- purged = 0
- while purged < 9:
- purged += self.purge_consumer(consumer)
-
- def _digest(self, data):
- return _digest(str_to_bytes(data)).hexdigest()
-
- def test_produce__consume_large_messages(
- self, bytes=1048576, n=10,
- charset=string.punctuation + string.ascii_letters + string.digits):
- if not self.verify_alive():
- return
- bytes = min(x for x in [bytes, self.message_size_limit] if x)
- messages = [''.join(random.choice(charset)
- for j in range(bytes)) + '--%s' % n
- for i in range(n)]
- digests = []
- chan1 = self.connection.channel()
- consumer = chan1.Consumer(self.queue)
- self.purge_consumer(consumer)
- producer = chan1.Producer(self.exchange)
- for i, message in enumerate(messages):
- producer.publish({'text': message,
- 'i': i}, routing_key=self.prefix)
- digests.append(self._digest(message))
-
- received = [(msg['i'], msg['text'])
- for msg in consumeN(self.connection, consumer, n)]
- self.assertEqual(len(received), n)
- ordering = [i for i, _ in received]
- if ordering != list(range(n)) and not self.suppress_disorder_warning:
- warnings.warn(
- '%s did not deliver messages in FIFO order: %r' % (
- self.transport, ordering))
-
- for i, text in received:
- if text != messages[i]:
- raise AssertionError('%i: %r is not %r' % (
- i, text[-100:], messages[i][-100:]))
- self.assertEqual(self._digest(text), digests[i])
-
- chan1.close()
- self.purge([self.queue.name])
-
- def P(self, rest):
- return '%s%s%s' % (self.prefix, self.sep, rest)
-
- def test_produce__consume_multiple(self):
- if not self.verify_alive():
- return
- chan1 = self.connection.channel()
- producer = chan1.Producer(self.exchange)
- b1 = Queue(self.P('b1'), self.exchange, 'b1')(chan1)
- b2 = Queue(self.P('b2'), self.exchange, 'b2')(chan1)
- b3 = Queue(self.P('b3'), self.exchange, 'b3')(chan1)
- [q.declare() for q in (b1, b2, b3)]
- self.purge([b1.name, b2.name, b3.name])
-
- producer.publish('b1', routing_key='b1')
- producer.publish('b2', routing_key='b2')
- producer.publish('b3', routing_key='b3')
- chan1.close()
-
- chan2 = self.connection.channel()
- consumer = chan2.Consumer([b1, b2, b3])
- messages = consumeN(self.connection, consumer, 3)
- self.assertItemsEqual(_nobuf(messages), ['b1', 'b2', 'b3'])
- chan2.close()
- self.purge([self.P('b1'), self.P('b2'), self.P('b3')])
-
- def test_timeout(self):
- if not self.verify_alive():
- return
- chan = self.connection.channel()
- self.purge([self.queue.name])
- consumer = chan.Consumer(self.queue)
- self.assertRaises(
- socket.timeout, self.connection.drain_events, timeout=0.3,
- )
- consumer.cancel()
- chan.close()
-
- def test_basic_get(self):
- if not self.verify_alive():
- return
- chan1 = self.connection.channel()
- producer = chan1.Producer(self.exchange)
- chan2 = self.connection.channel()
- queue = Queue(self.P('basic_get'), self.exchange, 'basic_get')
- queue = queue(chan2)
- queue.declare()
- producer.publish({'basic.get': 'this'}, routing_key='basic_get')
- chan1.close()
-
- for i in range(self.event_loop_max):
- m = queue.get()
- if m:
- break
- time.sleep(0.1)
- self.assertEqual(m.payload, {'basic.get': 'this'})
- self.purge([queue.name])
- chan2.close()
-
- def test_cyclic_reference_transport(self):
- if not self.verify_alive():
- return
-
- def _createref():
- conn = self.get_connection()
- conn.transport
- conn.close()
- return weakref.ref(conn)
-
- self.assertIsNone(_createref()())
-
- def test_cyclic_reference_connection(self):
- if not self.verify_alive():
- return
-
- def _createref():
- conn = self.get_connection()
- conn.connect()
- conn.close()
- return weakref.ref(conn)
-
- self.assertIsNone(_createref()())
-
- def test_cyclic_reference_channel(self):
- if not self.verify_alive():
- return
-
- def _createref():
- conn = self.get_connection()
- conn.connect()
- chanrefs = []
- try:
- for i in range(100):
- channel = conn.channel()
- chanrefs.append(weakref.ref(channel))
- channel.close()
- finally:
- conn.close()
- return chanrefs
-
- for chanref in _createref():
- self.assertIsNone(chanref())
-
- def tearDown(self):
- if self.transport and self.connected:
- self.connection.close()
diff --git a/tox.ini b/tox.ini
index 4ac097ae..697ddf39 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,34 +1,32 @@
[tox]
envlist =
- 2.7
- pypy
- pypy3
- 3.5
- 3.6
- 3.7-{linux,windows}
- 3.8-{linux,windows}
+ {2.7,pypy,pypy3,3.5,3.6,3.7-{linux,windows},3.8{linux,windows}}-unit
+ {2.7,pypy,pypy3,3.5,3.6,3.7{linux,windows},3.8{linux,windows}}-integration-py-amqp
flake8
flakeplus
apicheck
pydocstyle
+requires = tox-docker
+
[testenv]
sitepackages = False
setenv = C_DEBUG_TEST = 1
passenv =
DISTUTILS_USE_SDK
deps=
+ -r{toxinidir}/requirements/dev.txt
apicheck,2.7,pypy,pypy3,3.5,3.6,3.7-linux,3.7-windows,3.8-linux,3.8-windows: -r{toxinidir}/requirements/default.txt
apicheck,2.7,pypy,pypy3,3.5,3.6,3.7-linux,3.7-windows,3.8-linux,3.8-windows: -r{toxinidir}/requirements/test.txt
apicheck,2.7,pypy,pypy3,3.5,3.6,3.7-linux,3.8-linux: -r{toxinidir}/requirements/test-ci.txt
2.7,pypy: -r{toxinidir}/requirements/test-ci-py2.txt
3.7-windows: -r{toxinidir}/requirements/test-ci-windows.txt
-
apicheck,linkcheck: -r{toxinidir}/requirements/docs.txt
flake8,flakeplus,pydocstyle: -r{toxinidir}/requirements/pkgutils.txt
-commands = pip install -U -r{toxinidir}/requirements/dev.txt
- python -bb -m pytest -rxs -xv --cov=kombu --cov-report=xml --no-cov-on-fail {posargs}
+commands =
+ unit: python -bb -m pytest -rxs -xv --cov=kombu --cov-report=xml --no-cov-on-fail {posargs}
+ integration-py-amqp: py.test -xv -E py-amqp t/integration {posargs:-n2}
basepython =
2.7,flakeplus,flake8,linkcheck,cov: python2.7
@@ -40,6 +38,21 @@ basepython =
install_command = python -m pip --disable-pip-version-check install {opts} {packages}
+docker =
+ integration-py-amqp: rabbitmq:alpine
+
+dockerenv =
+ PYAMQP_INTEGRATION_INSTANCE=1
+ RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=-rabbit tcp_listeners [5672]
+
+[docker:rabbitmq:alpine]
+ports = 5672:5672/tcp
+healthcheck_cmd = /bin/bash -c 'rabbitmq-diagnostics ping -q'
+healthcheck_interval = 10
+healthcheck_timeout = 10
+healthcheck_retries = 30
+healthcheck_start_period = 5
+
[testenv:apicheck]
commands = pip install -U -r{toxinidir}/requirements/dev.txt
sphinx-build -j2 -b apicheck -d {envtmpdir}/doctrees docs docs/_build/apicheck