blob: 4dd357b6f94fe639a0463245e6adc95ed7b3e78c (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
from funtests import transport
from nose import SkipTest
from kombu.exceptions import VersionMismatch
class test_pika_blocking(transport.TransportCase):
transport = "syncpika"
prefix = "syncpika"
def before_connect(self):
try:
from kombu.transport import pika
except VersionMismatch:
raise SkipTest("Pika version mismatch")
def test_produce__consume_large_messages(self, *args, **kwargs):
raise SkipTest("test currently fails for sync pika")
def test_cyclic_reference_channel(self, *args, **kwargs):
raise SkipTest("known memory leak")
class test_pika_async(transport.TransportCase):
transport = "pika"
prefix = "pika"
def before_connect(self):
try:
from kombu.transport import pika
except VersionMismatch:
raise SkipTest("Pika version mismatch")
def test_produce__consume_large_messages(self, *args, **kwargs):
raise SkipTest("test currently fails for async pika")
def test_cyclic_reference_channel(self, *args, **kwargs):
raise SkipTest("known memory leak")
|