diff options
-rw-r--r-- | kombu/tests/transport/test_filesystem.py | 108 | ||||
-rw-r--r-- | kombu/transport/__init__.py | 1 | ||||
-rw-r--r-- | kombu/transport/filesystem.py | 180 |
3 files changed, 289 insertions, 0 deletions
diff --git a/kombu/tests/transport/test_filesystem.py b/kombu/tests/transport/test_filesystem.py new file mode 100644 index 00000000..547f5a70 --- /dev/null +++ b/kombu/tests/transport/test_filesystem.py @@ -0,0 +1,108 @@ +from __future__ import absolute_import +from __future__ import with_statement + +import tempfile + +from kombu.connection import Connection +from kombu.entity import Exchange, Queue +from kombu.messaging import Consumer, Producer + +from kombu.tests.utils import TestCase + + +class test_FilesystemTransport(TestCase): + + def setUp(self): + data_folder_in = tempfile.mkdtemp() + data_folder_out = tempfile.mkdtemp() + self.c = Connection(transport='filesystem', transport_options={'data_folder_in': data_folder_in, 'data_folder_out': data_folder_out}) + self.p = Connection(transport='filesystem', transport_options={'data_folder_in': data_folder_out, 'data_folder_out': data_folder_in}) + self.e = Exchange('test_transport_filesystem') + self.q = Queue('test_transport_filesystem', + exchange=self.e, + routing_key='test_transport_filesystem') + self.q2 = Queue('test_transport_filesystem2', + exchange=self.e, + routing_key='test_transport_filesystem2') + + def test_produce_consume_noack(self): + producer = Producer(self.p.channel(), self.e) + consumer = Consumer(self.c.channel(), self.q, no_ack=True) + + for i in range(10): + producer.publish({'foo': i}, routing_key='test_transport_filesystem') + + _received = [] + + def callback(message_data, message): + _received.append(message) + + consumer.register_callback(callback) + consumer.consume() + + while 1: + if len(_received) == 10: + break + self.c.drain_events() + + self.assertEqual(len(_received), 10) + + def test_produce_consume(self): + producer_channel = self.p.channel() + consumer_channel = self.c.channel() + producer = Producer(producer_channel, self.e) + consumer1 = Consumer(consumer_channel, self.q) + consumer2 = Consumer(consumer_channel, self.q2) + self.q2(consumer_channel).declare() + + for i in range(10): + producer.publish({'foo': i}, routing_key='test_transport_filesystem') + for i in range(10): + producer.publish({'foo': i}, routing_key='test_transport_filesystem2') + + _received1 = [] + _received2 = [] + + def callback1(message_data, message): + _received1.append(message) + message.ack() + + def callback2(message_data, message): + _received2.append(message) + message.ack() + + consumer1.register_callback(callback1) + consumer2.register_callback(callback2) + + consumer1.consume() + consumer2.consume() + + while 1: + if len(_received1) + len(_received2) == 20: + break + self.c.drain_events() + + self.assertEqual(len(_received1) + len(_received2), 20) + + # compression + producer.publish({'compressed': True}, + routing_key='test_transport_filesystem', + compression='zlib') + m = self.q(consumer_channel).get() + self.assertDictEqual(m.payload, {'compressed': True}) + + # queue.delete + for i in range(10): + producer.publish({'foo': i}, routing_key='test_transport_filesystem') + self.assertTrue(self.q(consumer_channel).get()) + self.q(consumer_channel).delete() + self.q(consumer_channel).declare() + self.assertIsNone(self.q(consumer_channel).get()) + + # queue.purge + for i in range(10): + producer.publish({'foo': i}, routing_key='test_transport_filesystem2') + self.assertTrue(self.q2(consumer_channel).get()) + self.q2(consumer_channel).purge() + self.assertIsNone(self.q2(consumer_channel).get()) + diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index 7605cd83..f3d85ad5 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -69,6 +69,7 @@ TRANSPORT_ALIASES = { 'ghettoq.taproot.MongoDB': _ghettoq('MongoDB', 'mongodb'), 'ghettoq.taproot.Beanstalk': _ghettoq('Beanstalk', 'beanstalk'), 'ghettoq.taproot.CouchDB': _ghettoq('CouchDB', 'couchdb'), + 'filesystem': 'kombu.transport.filesystem.Transport', } _transport_cache = {} diff --git a/kombu/transport/filesystem.py b/kombu/transport/filesystem.py new file mode 100644 index 00000000..bd8db8f6 --- /dev/null +++ b/kombu/transport/filesystem.py @@ -0,0 +1,180 @@ +"""Kombu transport using a filesystem as the message store.""" + +from Queue import Empty + +from anyjson import loads, dumps + +import os +import shutil +import time +import uuid +import tempfile + +from . import virtual +from kombu.exceptions import StdChannelError +from kombu.utils import cached_property + +VERSION = (1, 0, 0) +__version__ = ".".join(map(str, VERSION)) + +# needs win32all to work on Windows +if os.name == 'nt': + + import win32con, win32file, pywintypes + + LOCK_EX = win32con.LOCKFILE_EXCLUSIVE_LOCK + LOCK_SH = 0 # the default + LOCK_NB = win32con.LOCKFILE_FAIL_IMMEDIATELY + __overlapped = pywintypes.OVERLAPPED() + + def lock(file, flags): + hfile = win32file._get_osfhandle(file.fileno()) + win32file.LockFileEx(hfile, flags, 0, 0xffff0000, __overlapped) + + def unlock(file): + hfile = win32file._get_osfhandle(file.fileno()) + win32file.UnlockFileEx(hfile, 0, 0xffff0000, __overlapped) + +elif os.name == 'posix': + + import fcntl + from fcntl import LOCK_EX, LOCK_SH, LOCK_NB + + def lock(file, flags): + fcntl.flock(file.fileno(), flags) + + def unlock(file): + fcntl.flock(file.fileno(), fcntl.LOCK_UN) +else: + raise RuntimeError('Filesystem plugin only defined for nt and posix platforms') + +class Channel(virtual.Channel): + + def _put(self, queue, payload, **kwargs): + """Put `message` onto `queue`.""" + + filename = '%s_%s.%s.msg' % (int(round(time.time()*1000)), uuid.uuid4(), queue) + filename = os.path.join(self.data_folder_out, filename) + + try: + f = open(filename, 'wb') + lock(f, LOCK_EX) + f.write(dumps(payload)) + except IOError, OSError: + raise Exception('Filename [%s] could not be placed into folder.' % filename) + finally: + unlock(f) + f.close() + + def _get(self, queue): + """Get next message from `queue`.""" + + queue_find = '.' + queue + '.msg' + folder = os.listdir(self.data_folder_in) + folder = sorted(folder) + while len(folder) > 0: + filename = folder.pop(0) + + # only handle message for the requested queue + if filename.find(queue_find) < 0: + continue + + if self.store_processed: + processed_folder = self.processed_folder + else: + processed_folder = tempfile.gettempdir() + + try: + # move the file to the tmp/processed folder + shutil.move(os.path.join(self.data_folder_in, filename), processed_folder) + except IOError: + pass # file could be locked, or removed in meantime so ignore + + filename = os.path.join(processed_folder, filename) + try: + f = open(filename, 'rb') + payload = f.read() + f.close() + if not self.store_processed: + os.remove(filename) + except IOError, OSError: + raise Exception('Filename [%s] could not be read from queue.' % filename) + + return loads(payload) + + raise Empty() + + def _purge(self, queue): + """Remove all messages from `queue`.""" + count = 0 + queue_find = '.' + queue + '.msg' + + folder = os.listdir(self.data_folder_in) + while len(folder) > 0: + filename = folder.pop() + try: + # only purge messages for the requested queue + if filename.find(queue_find) < 0: + continue + + filename = os.path.join(self.data_folder_in, filename) + os.remove(filename) + + count += 1 + + except OSError: + # we simply ignore its existence, as it was probably + # processed by another worker + pass + + return count + + def _size(self, queue): + """Return the number of messages in `queue` as an :class:`int`.""" + count = 0 + + queue_find = "." + queue + '.msg' + folder = os.listdir(self.data_folder_in) + while len(folder) > 0: + filename = folder.pop() + + # only handle message for the requested queue + if filename.find(queue_find) < 0: + continue + + count += 1 + + return count + + @property + def transport_options(self): + return self.connection.client.transport_options + + @cached_property + def data_folder_in(self): + return self.transport_options.get('data_folder_in', 'data_in') + + @cached_property + def data_folder_out(self): + return self.transport_options.get('data_folder_out', 'data_out') + + @cached_property + def store_processed(self): + return self.transport_options.get('store_processed', False) + + @cached_property + def processed_folder(self): + return self.transport_options.get('processed_folder', 'processed') + +class Transport(virtual.Transport): + Channel = Channel + + default_port = 0 + connection_errors = () + channel_errors = (StdChannelError, ) + + driver_type = 'filesystem' + driver_name = 'filesystem' + + def driver_version(self): + return 'N/A' |