summaryrefslogtreecommitdiff
path: root/kombu/tests/transport/test_filesystem.py
blob: 6a8d64d1b33a6f4ff279708a05061f3e1f0ce1c1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from __future__ import absolute_import

import tempfile

from nose import SkipTest

from kombu.connection import Connection
from kombu.entity import Exchange, Queue
from kombu.messaging import Consumer, Producer

from kombu.tests.utils import TestCase


class test_FilesystemTransport(TestCase):

    def setUp(self):
        try:
            data_folder_in = tempfile.mkdtemp()
            data_folder_out = tempfile.mkdtemp()
        except Exception:
            raise SkipTest('filesystem transport: cannot create tempfiles')
        self.c = Connection(transport='filesystem',
                            transport_options={
                                'data_folder_in': data_folder_in,
                                'data_folder_out': data_folder_out,
                            })
        self.p = Connection(transport='filesystem',
                            transport_options={
                                'data_folder_in': data_folder_out,
                                'data_folder_out': data_folder_in,
                            })
        self.e = Exchange('test_transport_filesystem')
        self.q = Queue('test_transport_filesystem',
                       exchange=self.e,
                       routing_key='test_transport_filesystem')
        self.q2 = Queue('test_transport_filesystem2',
                        exchange=self.e,
                        routing_key='test_transport_filesystem2')

    def test_produce_consume_noack(self):
        producer = Producer(self.p.channel(), self.e)
        consumer = Consumer(self.c.channel(), self.q, no_ack=True)

        for i in range(10):
            producer.publish({'foo': i},
                             routing_key='test_transport_filesystem')

        _received = []

        def callback(message_data, message):
            _received.append(message)

        consumer.register_callback(callback)
        consumer.consume()

        while 1:
            if len(_received) == 10:
                break
            self.c.drain_events()

        self.assertEqual(len(_received), 10)

    def test_produce_consume(self):
        producer_channel = self.p.channel()
        consumer_channel = self.c.channel()
        producer = Producer(producer_channel, self.e)
        consumer1 = Consumer(consumer_channel, self.q)
        consumer2 = Consumer(consumer_channel, self.q2)
        self.q2(consumer_channel).declare()

        for i in range(10):
            producer.publish({'foo': i},
                             routing_key='test_transport_filesystem')
        for i in range(10):
            producer.publish({'foo': i},
                             routing_key='test_transport_filesystem2')

        _received1 = []
        _received2 = []

        def callback1(message_data, message):
            _received1.append(message)
            message.ack()

        def callback2(message_data, message):
            _received2.append(message)
            message.ack()

        consumer1.register_callback(callback1)
        consumer2.register_callback(callback2)

        consumer1.consume()
        consumer2.consume()

        while 1:
            if len(_received1) + len(_received2) == 20:
                break
            self.c.drain_events()

        self.assertEqual(len(_received1) + len(_received2), 20)

        # compression
        producer.publish({'compressed': True},
                         routing_key='test_transport_filesystem',
                         compression='zlib')
        m = self.q(consumer_channel).get()
        self.assertDictEqual(m.payload, {'compressed': True})

        # queue.delete
        for i in range(10):
            producer.publish({'foo': i},
                             routing_key='test_transport_filesystem')
        self.assertTrue(self.q(consumer_channel).get())
        self.q(consumer_channel).delete()
        self.q(consumer_channel).declare()
        self.assertIsNone(self.q(consumer_channel).get())

        # queue.purge
        for i in range(10):
            producer.publish({'foo': i},
                             routing_key='test_transport_filesystem2')
        self.assertTrue(self.q2(consumer_channel).get())
        self.q2(consumer_channel).purge()
        self.assertIsNone(self.q2(consumer_channel).get())