summaryrefslogtreecommitdiff
path: root/t/unit/transport/test_filesystem.py
diff options
context:
space:
mode:
Diffstat (limited to 't/unit/transport/test_filesystem.py')
-rw-r--r--t/unit/transport/test_filesystem.py142
1 files changed, 142 insertions, 0 deletions
diff --git a/t/unit/transport/test_filesystem.py b/t/unit/transport/test_filesystem.py
new file mode 100644
index 00000000..52a925f0
--- /dev/null
+++ b/t/unit/transport/test_filesystem.py
@@ -0,0 +1,142 @@
+from __future__ import absolute_import, unicode_literals
+
+import tempfile
+
+from case import skip
+from case.skip import SkipTest
+
+from kombu import Connection, Exchange, Queue, Consumer, Producer
+
+
+@skip.if_win32()
+class test_FilesystemTransport:
+
+ def setup(self):
+ self.channels = set()
+ try:
+ data_folder_in = tempfile.mkdtemp()
+ data_folder_out = tempfile.mkdtemp()
+ except Exception:
+ raise SkipTest('filesystem transport: cannot create tempfiles')
+ self.c = Connection(transport='filesystem',
+ transport_options={
+ 'data_folder_in': data_folder_in,
+ 'data_folder_out': data_folder_out,
+ })
+ self.channels.add(self.c.default_channel)
+ self.p = Connection(transport='filesystem',
+ transport_options={
+ 'data_folder_in': data_folder_out,
+ 'data_folder_out': data_folder_in,
+ })
+ self.channels.add(self.p.default_channel)
+ self.e = Exchange('test_transport_filesystem')
+ self.q = Queue('test_transport_filesystem',
+ exchange=self.e,
+ routing_key='test_transport_filesystem')
+ self.q2 = Queue('test_transport_filesystem2',
+ exchange=self.e,
+ routing_key='test_transport_filesystem2')
+
+ def teardown(self):
+ # make sure we don't attempt to restore messages at shutdown.
+ for channel in self.channels:
+ try:
+ channel._qos._dirty.clear()
+ except AttributeError:
+ pass
+ try:
+ channel._qos._delivered.clear()
+ except AttributeError:
+ pass
+
+ def _add_channel(self, channel):
+ self.channels.add(channel)
+ return channel
+
+ def test_produce_consume_noack(self):
+ producer = Producer(self._add_channel(self.p.channel()), self.e)
+ consumer = Consumer(self._add_channel(self.c.channel()), self.q,
+ no_ack=True)
+
+ for i in range(10):
+ producer.publish({'foo': i},
+ routing_key='test_transport_filesystem')
+
+ _received = []
+
+ def callback(message_data, message):
+ _received.append(message)
+
+ consumer.register_callback(callback)
+ consumer.consume()
+
+ while 1:
+ if len(_received) == 10:
+ break
+ self.c.drain_events()
+
+ assert len(_received) == 10
+
+ def test_produce_consume(self):
+ producer_channel = self._add_channel(self.p.channel())
+ consumer_channel = self._add_channel(self.c.channel())
+ producer = Producer(producer_channel, self.e)
+ consumer1 = Consumer(consumer_channel, self.q)
+ consumer2 = Consumer(consumer_channel, self.q2)
+ self.q2(consumer_channel).declare()
+
+ for i in range(10):
+ producer.publish({'foo': i},
+ routing_key='test_transport_filesystem')
+ for i in range(10):
+ producer.publish({'foo': i},
+ routing_key='test_transport_filesystem2')
+
+ _received1 = []
+ _received2 = []
+
+ def callback1(message_data, message):
+ _received1.append(message)
+ message.ack()
+
+ def callback2(message_data, message):
+ _received2.append(message)
+ message.ack()
+
+ consumer1.register_callback(callback1)
+ consumer2.register_callback(callback2)
+
+ consumer1.consume()
+ consumer2.consume()
+
+ while 1:
+ if len(_received1) + len(_received2) == 20:
+ break
+ self.c.drain_events()
+
+ assert len(_received1) + len(_received2) == 20
+
+ # compression
+ producer.publish({'compressed': True},
+ routing_key='test_transport_filesystem',
+ compression='zlib')
+ m = self.q(consumer_channel).get()
+ assert m.payload == {'compressed': True}
+
+ # queue.delete
+ for i in range(10):
+ producer.publish({'foo': i},
+ routing_key='test_transport_filesystem')
+ assert self.q(consumer_channel).get()
+ self.q(consumer_channel).delete()
+ self.q(consumer_channel).declare()
+ assert self.q(consumer_channel).get() is None
+
+ # queue.purge
+ for i in range(10):
+ producer.publish({'foo': i},
+ routing_key='test_transport_filesystem2')
+ assert self.q2(consumer_channel).get()
+ self.q2(consumer_channel).purge()
+ assert self.q2(consumer_channel).get() is None