1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
from __future__ import absolute_import
import tempfile
from nose import SkipTest
from kombu.connection import Connection
from kombu.entity import Exchange, Queue
from kombu.messaging import Consumer, Producer
from kombu.tests.utils import TestCase
class test_FilesystemTransport(TestCase):
def setUp(self):
try:
data_folder_in = tempfile.mkdtemp()
data_folder_out = tempfile.mkdtemp()
except Exception:
raise SkipTest('filesystem transport: cannot create tempfiles')
self.c = Connection(transport='filesystem',
transport_options={
'data_folder_in': data_folder_in,
'data_folder_out': data_folder_out,
})
self.p = Connection(transport='filesystem',
transport_options={
'data_folder_in': data_folder_out,
'data_folder_out': data_folder_in,
})
self.e = Exchange('test_transport_filesystem')
self.q = Queue('test_transport_filesystem',
exchange=self.e,
routing_key='test_transport_filesystem')
self.q2 = Queue('test_transport_filesystem2',
exchange=self.e,
routing_key='test_transport_filesystem2')
def test_produce_consume_noack(self):
producer = Producer(self.p.channel(), self.e)
consumer = Consumer(self.c.channel(), self.q, no_ack=True)
for i in range(10):
producer.publish({'foo': i},
routing_key='test_transport_filesystem')
_received = []
def callback(message_data, message):
_received.append(message)
consumer.register_callback(callback)
consumer.consume()
while 1:
if len(_received) == 10:
break
self.c.drain_events()
self.assertEqual(len(_received), 10)
def test_produce_consume(self):
producer_channel = self.p.channel()
consumer_channel = self.c.channel()
producer = Producer(producer_channel, self.e)
consumer1 = Consumer(consumer_channel, self.q)
consumer2 = Consumer(consumer_channel, self.q2)
self.q2(consumer_channel).declare()
for i in range(10):
producer.publish({'foo': i},
routing_key='test_transport_filesystem')
for i in range(10):
producer.publish({'foo': i},
routing_key='test_transport_filesystem2')
_received1 = []
_received2 = []
def callback1(message_data, message):
_received1.append(message)
message.ack()
def callback2(message_data, message):
_received2.append(message)
message.ack()
consumer1.register_callback(callback1)
consumer2.register_callback(callback2)
consumer1.consume()
consumer2.consume()
while 1:
if len(_received1) + len(_received2) == 20:
break
self.c.drain_events()
self.assertEqual(len(_received1) + len(_received2), 20)
# compression
producer.publish({'compressed': True},
routing_key='test_transport_filesystem',
compression='zlib')
m = self.q(consumer_channel).get()
self.assertDictEqual(m.payload, {'compressed': True})
# queue.delete
for i in range(10):
producer.publish({'foo': i},
routing_key='test_transport_filesystem')
self.assertTrue(self.q(consumer_channel).get())
self.q(consumer_channel).delete()
self.q(consumer_channel).declare()
self.assertIsNone(self.q(consumer_channel).get())
# queue.purge
for i in range(10):
producer.publish({'foo': i},
routing_key='test_transport_filesystem2')
self.assertTrue(self.q2(consumer_channel).get())
self.q2(consumer_channel).purge()
self.assertIsNone(self.q2(consumer_channel).get())
|