diff options
author | Ask Solem <ask@celeryproject.org> | 2016-08-22 11:17:59 -0700 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2016-08-23 11:38:54 -0700 |
commit | 6d72e40e5b05d87ac82de739ce9bd854f1679888 (patch) | |
tree | b0b71b33c24fc024fd781ae16fc84af7850a5358 /t/unit/transport/test_filesystem.py | |
parent | d7ffc4d75b6c78255eec41bcdc6b2d54b828f7f5 (diff) | |
download | kombu-6d72e40e5b05d87ac82de739ce9bd854f1679888.tar.gz |
Rewrite tests to use py.test
NOTE: test_qpid was removed as it's breaking horribly.
It's currently trying to import symbols from kombu.transport.qpid
that don't exist.
assertRaises -> pytest.raises
assertTrue
assertTrue - multiline
assertFalse
assertFalse - multiline
assertIsNone
assertIsNone - multiline
assertEqual
assertEqual - multiline
assertNotEqual
assertNotEqual - multiline
assertGreater - multiline
assertIn
assertIn - multiline
assertNotIn
assertNotIn - multiline
assertIsInstance
assertIsInstance - multiline
assertIsNot
assertIsNot - multiline
assertIsNotNone
assertIs
assertIs - multiline
Manual changes
Now depends on case 1.3.1
Diffstat (limited to 't/unit/transport/test_filesystem.py')
-rw-r--r-- | t/unit/transport/test_filesystem.py | 142 |
1 files changed, 142 insertions, 0 deletions
diff --git a/t/unit/transport/test_filesystem.py b/t/unit/transport/test_filesystem.py new file mode 100644 index 00000000..52a925f0 --- /dev/null +++ b/t/unit/transport/test_filesystem.py @@ -0,0 +1,142 @@ +from __future__ import absolute_import, unicode_literals + +import tempfile + +from case import skip +from case.skip import SkipTest + +from kombu import Connection, Exchange, Queue, Consumer, Producer + + +@skip.if_win32() +class test_FilesystemTransport: + + def setup(self): + self.channels = set() + try: + data_folder_in = tempfile.mkdtemp() + data_folder_out = tempfile.mkdtemp() + except Exception: + raise SkipTest('filesystem transport: cannot create tempfiles') + self.c = Connection(transport='filesystem', + transport_options={ + 'data_folder_in': data_folder_in, + 'data_folder_out': data_folder_out, + }) + self.channels.add(self.c.default_channel) + self.p = Connection(transport='filesystem', + transport_options={ + 'data_folder_in': data_folder_out, + 'data_folder_out': data_folder_in, + }) + self.channels.add(self.p.default_channel) + self.e = Exchange('test_transport_filesystem') + self.q = Queue('test_transport_filesystem', + exchange=self.e, + routing_key='test_transport_filesystem') + self.q2 = Queue('test_transport_filesystem2', + exchange=self.e, + routing_key='test_transport_filesystem2') + + def teardown(self): + # make sure we don't attempt to restore messages at shutdown. + for channel in self.channels: + try: + channel._qos._dirty.clear() + except AttributeError: + pass + try: + channel._qos._delivered.clear() + except AttributeError: + pass + + def _add_channel(self, channel): + self.channels.add(channel) + return channel + + def test_produce_consume_noack(self): + producer = Producer(self._add_channel(self.p.channel()), self.e) + consumer = Consumer(self._add_channel(self.c.channel()), self.q, + no_ack=True) + + for i in range(10): + producer.publish({'foo': i}, + routing_key='test_transport_filesystem') + + _received = [] + + def callback(message_data, message): + _received.append(message) + + consumer.register_callback(callback) + consumer.consume() + + while 1: + if len(_received) == 10: + break + self.c.drain_events() + + assert len(_received) == 10 + + def test_produce_consume(self): + producer_channel = self._add_channel(self.p.channel()) + consumer_channel = self._add_channel(self.c.channel()) + producer = Producer(producer_channel, self.e) + consumer1 = Consumer(consumer_channel, self.q) + consumer2 = Consumer(consumer_channel, self.q2) + self.q2(consumer_channel).declare() + + for i in range(10): + producer.publish({'foo': i}, + routing_key='test_transport_filesystem') + for i in range(10): + producer.publish({'foo': i}, + routing_key='test_transport_filesystem2') + + _received1 = [] + _received2 = [] + + def callback1(message_data, message): + _received1.append(message) + message.ack() + + def callback2(message_data, message): + _received2.append(message) + message.ack() + + consumer1.register_callback(callback1) + consumer2.register_callback(callback2) + + consumer1.consume() + consumer2.consume() + + while 1: + if len(_received1) + len(_received2) == 20: + break + self.c.drain_events() + + assert len(_received1) + len(_received2) == 20 + + # compression + producer.publish({'compressed': True}, + routing_key='test_transport_filesystem', + compression='zlib') + m = self.q(consumer_channel).get() + assert m.payload == {'compressed': True} + + # queue.delete + for i in range(10): + producer.publish({'foo': i}, + routing_key='test_transport_filesystem') + assert self.q(consumer_channel).get() + self.q(consumer_channel).delete() + self.q(consumer_channel).declare() + assert self.q(consumer_channel).get() is None + + # queue.purge + for i in range(10): + producer.publish({'foo': i}, + routing_key='test_transport_filesystem2') + assert self.q2(consumer_channel).get() + self.q2(consumer_channel).purge() + assert self.q2(consumer_channel).get() is None |