1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
from __future__ import annotations
import os
import pytest
import kombu
from .common import (BaseExchangeTypes, BaseFailover, BaseMessage,
BasePriority, BaseTimeToLive, BasicFunctionality)
def get_connection(hostname, port, vhost):
return kombu.Connection(f'pyamqp://{hostname}:{port}')
def get_failover_connection(hostname, port, vhost):
return kombu.Connection(
f'pyamqp://localhost:12345;pyamqp://{hostname}:{port}'
)
@pytest.fixture()
def invalid_connection():
return kombu.Connection('pyamqp://localhost:12345')
@pytest.fixture()
def connection(request):
return get_connection(
hostname=os.environ.get('RABBITMQ_HOST', 'localhost'),
port=os.environ.get('RABBITMQ_5672_TCP', '5672'),
vhost=getattr(
request.config, "slaveinput", {}
).get("slaveid", None),
)
@pytest.fixture()
def failover_connection(request):
return get_failover_connection(
hostname=os.environ.get('RABBITMQ_HOST', 'localhost'),
port=os.environ.get('RABBITMQ_5672_TCP', '5672'),
vhost=getattr(
request.config, "slaveinput", {}
).get("slaveid", None),
)
@pytest.mark.env('py-amqp')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_PyAMQPBasicFunctionality(BasicFunctionality):
pass
@pytest.mark.env('py-amqp')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_PyAMQPBaseExchangeTypes(BaseExchangeTypes):
pass
@pytest.mark.env('py-amqp')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_PyAMQPTimeToLive(BaseTimeToLive):
pass
@pytest.mark.env('py-amqp')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_PyAMQPPriority(BasePriority):
pass
@pytest.mark.env('py-amqp')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_PyAMQPFailover(BaseFailover):
pass
@pytest.mark.env('py-amqp')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_PyAMQPMessage(BaseMessage):
pass
|