diff options
Diffstat (limited to 't/unit/transport/test_filesystem.py')
-rw-r--r-- | t/unit/transport/test_filesystem.py | 142 |
1 files changed, 142 insertions, 0 deletions
diff --git a/t/unit/transport/test_filesystem.py b/t/unit/transport/test_filesystem.py new file mode 100644 index 00000000..52a925f0 --- /dev/null +++ b/t/unit/transport/test_filesystem.py @@ -0,0 +1,142 @@ +from __future__ import absolute_import, unicode_literals + +import tempfile + +from case import skip +from case.skip import SkipTest + +from kombu import Connection, Exchange, Queue, Consumer, Producer + + +@skip.if_win32() +class test_FilesystemTransport: + + def setup(self): + self.channels = set() + try: + data_folder_in = tempfile.mkdtemp() + data_folder_out = tempfile.mkdtemp() + except Exception: + raise SkipTest('filesystem transport: cannot create tempfiles') + self.c = Connection(transport='filesystem', + transport_options={ + 'data_folder_in': data_folder_in, + 'data_folder_out': data_folder_out, + }) + self.channels.add(self.c.default_channel) + self.p = Connection(transport='filesystem', + transport_options={ + 'data_folder_in': data_folder_out, + 'data_folder_out': data_folder_in, + }) + self.channels.add(self.p.default_channel) + self.e = Exchange('test_transport_filesystem') + self.q = Queue('test_transport_filesystem', + exchange=self.e, + routing_key='test_transport_filesystem') + self.q2 = Queue('test_transport_filesystem2', + exchange=self.e, + routing_key='test_transport_filesystem2') + + def teardown(self): + # make sure we don't attempt to restore messages at shutdown. + for channel in self.channels: + try: + channel._qos._dirty.clear() + except AttributeError: + pass + try: + channel._qos._delivered.clear() + except AttributeError: + pass + + def _add_channel(self, channel): + self.channels.add(channel) + return channel + + def test_produce_consume_noack(self): + producer = Producer(self._add_channel(self.p.channel()), self.e) + consumer = Consumer(self._add_channel(self.c.channel()), self.q, + no_ack=True) + + for i in range(10): + producer.publish({'foo': i}, + routing_key='test_transport_filesystem') + + _received = [] + + def callback(message_data, message): + _received.append(message) + + consumer.register_callback(callback) + consumer.consume() + + while 1: + if len(_received) == 10: + break + self.c.drain_events() + + assert len(_received) == 10 + + def test_produce_consume(self): + producer_channel = self._add_channel(self.p.channel()) + consumer_channel = self._add_channel(self.c.channel()) + producer = Producer(producer_channel, self.e) + consumer1 = Consumer(consumer_channel, self.q) + consumer2 = Consumer(consumer_channel, self.q2) + self.q2(consumer_channel).declare() + + for i in range(10): + producer.publish({'foo': i}, + routing_key='test_transport_filesystem') + for i in range(10): + producer.publish({'foo': i}, + routing_key='test_transport_filesystem2') + + _received1 = [] + _received2 = [] + + def callback1(message_data, message): + _received1.append(message) + message.ack() + + def callback2(message_data, message): + _received2.append(message) + message.ack() + + consumer1.register_callback(callback1) + consumer2.register_callback(callback2) + + consumer1.consume() + consumer2.consume() + + while 1: + if len(_received1) + len(_received2) == 20: + break + self.c.drain_events() + + assert len(_received1) + len(_received2) == 20 + + # compression + producer.publish({'compressed': True}, + routing_key='test_transport_filesystem', + compression='zlib') + m = self.q(consumer_channel).get() + assert m.payload == {'compressed': True} + + # queue.delete + for i in range(10): + producer.publish({'foo': i}, + routing_key='test_transport_filesystem') + assert self.q(consumer_channel).get() + self.q(consumer_channel).delete() + self.q(consumer_channel).declare() + assert self.q(consumer_channel).get() is None + + # queue.purge + for i in range(10): + producer.publish({'foo': i}, + routing_key='test_transport_filesystem2') + assert self.q2(consumer_channel).get() + self.q2(consumer_channel).purge() + assert self.q2(consumer_channel).get() is None |