blob: 6e6585eeba93ac31081b6315d8aca2f5979b03bb (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
from __future__ import absolute_import, unicode_literals
import os
import pytest
import kombu
from .common import BasicFunctionality
def get_connection(
hostname, port, vhost):
return kombu.Connection('pyamqp://{}:{}'.format(hostname, port))
@pytest.fixture()
def connection(request):
# this fixture yields plain connections to broker and TLS encrypted
return get_connection(
hostname=os.environ.get('RABBITMQ_HOST', 'localhost'),
port=os.environ.get('RABBITMQ_5672_TCP', '5672'),
vhost=getattr(
request.config, "slaveinput", {}
).get("slaveid", None),
)
@pytest.mark.env('py-amqp')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_PyAMQPBasicFunctionality(BasicFunctionality):
pass
|