summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2012-07-05 14:37:37 +0100
committerAsk Solem <ask@celeryproject.org>2012-07-05 14:37:37 +0100
commit25d22f3a00c41340a96d77b5a5deba96359eacc4 (patch)
treedb93bd82cfd5d9650b18a867831105949fc8e434
parentb4957cedf61ce8263e11fde649b5240e5724d710 (diff)
parentaa973a59293566cbde841d189dff762536288fcf (diff)
downloadkombu-25d22f3a00c41340a96d77b5a5deba96359eacc4.tar.gz
Merge branch 'bobbybeever/master'
-rw-r--r--kombu/tests/transport/test_filesystem.py108
-rw-r--r--kombu/transport/__init__.py1
-rw-r--r--kombu/transport/filesystem.py180
3 files changed, 289 insertions, 0 deletions
diff --git a/kombu/tests/transport/test_filesystem.py b/kombu/tests/transport/test_filesystem.py
new file mode 100644
index 00000000..547f5a70
--- /dev/null
+++ b/kombu/tests/transport/test_filesystem.py
@@ -0,0 +1,108 @@
+from __future__ import absolute_import
+from __future__ import with_statement
+
+import tempfile
+
+from kombu.connection import Connection
+from kombu.entity import Exchange, Queue
+from kombu.messaging import Consumer, Producer
+
+from kombu.tests.utils import TestCase
+
+
+class test_FilesystemTransport(TestCase):
+
+ def setUp(self):
+ data_folder_in = tempfile.mkdtemp()
+ data_folder_out = tempfile.mkdtemp()
+ self.c = Connection(transport='filesystem', transport_options={'data_folder_in': data_folder_in, 'data_folder_out': data_folder_out})
+ self.p = Connection(transport='filesystem', transport_options={'data_folder_in': data_folder_out, 'data_folder_out': data_folder_in})
+ self.e = Exchange('test_transport_filesystem')
+ self.q = Queue('test_transport_filesystem',
+ exchange=self.e,
+ routing_key='test_transport_filesystem')
+ self.q2 = Queue('test_transport_filesystem2',
+ exchange=self.e,
+ routing_key='test_transport_filesystem2')
+
+ def test_produce_consume_noack(self):
+ producer = Producer(self.p.channel(), self.e)
+ consumer = Consumer(self.c.channel(), self.q, no_ack=True)
+
+ for i in range(10):
+ producer.publish({'foo': i}, routing_key='test_transport_filesystem')
+
+ _received = []
+
+ def callback(message_data, message):
+ _received.append(message)
+
+ consumer.register_callback(callback)
+ consumer.consume()
+
+ while 1:
+ if len(_received) == 10:
+ break
+ self.c.drain_events()
+
+ self.assertEqual(len(_received), 10)
+
+ def test_produce_consume(self):
+ producer_channel = self.p.channel()
+ consumer_channel = self.c.channel()
+ producer = Producer(producer_channel, self.e)
+ consumer1 = Consumer(consumer_channel, self.q)
+ consumer2 = Consumer(consumer_channel, self.q2)
+ self.q2(consumer_channel).declare()
+
+ for i in range(10):
+ producer.publish({'foo': i}, routing_key='test_transport_filesystem')
+ for i in range(10):
+ producer.publish({'foo': i}, routing_key='test_transport_filesystem2')
+
+ _received1 = []
+ _received2 = []
+
+ def callback1(message_data, message):
+ _received1.append(message)
+ message.ack()
+
+ def callback2(message_data, message):
+ _received2.append(message)
+ message.ack()
+
+ consumer1.register_callback(callback1)
+ consumer2.register_callback(callback2)
+
+ consumer1.consume()
+ consumer2.consume()
+
+ while 1:
+ if len(_received1) + len(_received2) == 20:
+ break
+ self.c.drain_events()
+
+ self.assertEqual(len(_received1) + len(_received2), 20)
+
+ # compression
+ producer.publish({'compressed': True},
+ routing_key='test_transport_filesystem',
+ compression='zlib')
+ m = self.q(consumer_channel).get()
+ self.assertDictEqual(m.payload, {'compressed': True})
+
+ # queue.delete
+ for i in range(10):
+ producer.publish({'foo': i}, routing_key='test_transport_filesystem')
+ self.assertTrue(self.q(consumer_channel).get())
+ self.q(consumer_channel).delete()
+ self.q(consumer_channel).declare()
+ self.assertIsNone(self.q(consumer_channel).get())
+
+ # queue.purge
+ for i in range(10):
+ producer.publish({'foo': i}, routing_key='test_transport_filesystem2')
+ self.assertTrue(self.q2(consumer_channel).get())
+ self.q2(consumer_channel).purge()
+ self.assertIsNone(self.q2(consumer_channel).get())
+
diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py
index 7605cd83..f3d85ad5 100644
--- a/kombu/transport/__init__.py
+++ b/kombu/transport/__init__.py
@@ -69,6 +69,7 @@ TRANSPORT_ALIASES = {
'ghettoq.taproot.MongoDB': _ghettoq('MongoDB', 'mongodb'),
'ghettoq.taproot.Beanstalk': _ghettoq('Beanstalk', 'beanstalk'),
'ghettoq.taproot.CouchDB': _ghettoq('CouchDB', 'couchdb'),
+ 'filesystem': 'kombu.transport.filesystem.Transport',
}
_transport_cache = {}
diff --git a/kombu/transport/filesystem.py b/kombu/transport/filesystem.py
new file mode 100644
index 00000000..bd8db8f6
--- /dev/null
+++ b/kombu/transport/filesystem.py
@@ -0,0 +1,180 @@
+"""Kombu transport using a filesystem as the message store."""
+
+from Queue import Empty
+
+from anyjson import loads, dumps
+
+import os
+import shutil
+import time
+import uuid
+import tempfile
+
+from . import virtual
+from kombu.exceptions import StdChannelError
+from kombu.utils import cached_property
+
+VERSION = (1, 0, 0)
+__version__ = ".".join(map(str, VERSION))
+
+# needs win32all to work on Windows
+if os.name == 'nt':
+
+ import win32con, win32file, pywintypes
+
+ LOCK_EX = win32con.LOCKFILE_EXCLUSIVE_LOCK
+ LOCK_SH = 0 # the default
+ LOCK_NB = win32con.LOCKFILE_FAIL_IMMEDIATELY
+ __overlapped = pywintypes.OVERLAPPED()
+
+ def lock(file, flags):
+ hfile = win32file._get_osfhandle(file.fileno())
+ win32file.LockFileEx(hfile, flags, 0, 0xffff0000, __overlapped)
+
+ def unlock(file):
+ hfile = win32file._get_osfhandle(file.fileno())
+ win32file.UnlockFileEx(hfile, 0, 0xffff0000, __overlapped)
+
+elif os.name == 'posix':
+
+ import fcntl
+ from fcntl import LOCK_EX, LOCK_SH, LOCK_NB
+
+ def lock(file, flags):
+ fcntl.flock(file.fileno(), flags)
+
+ def unlock(file):
+ fcntl.flock(file.fileno(), fcntl.LOCK_UN)
+else:
+ raise RuntimeError('Filesystem plugin only defined for nt and posix platforms')
+
+class Channel(virtual.Channel):
+
+ def _put(self, queue, payload, **kwargs):
+ """Put `message` onto `queue`."""
+
+ filename = '%s_%s.%s.msg' % (int(round(time.time()*1000)), uuid.uuid4(), queue)
+ filename = os.path.join(self.data_folder_out, filename)
+
+ try:
+ f = open(filename, 'wb')
+ lock(f, LOCK_EX)
+ f.write(dumps(payload))
+ except IOError, OSError:
+ raise Exception('Filename [%s] could not be placed into folder.' % filename)
+ finally:
+ unlock(f)
+ f.close()
+
+ def _get(self, queue):
+ """Get next message from `queue`."""
+
+ queue_find = '.' + queue + '.msg'
+ folder = os.listdir(self.data_folder_in)
+ folder = sorted(folder)
+ while len(folder) > 0:
+ filename = folder.pop(0)
+
+ # only handle message for the requested queue
+ if filename.find(queue_find) < 0:
+ continue
+
+ if self.store_processed:
+ processed_folder = self.processed_folder
+ else:
+ processed_folder = tempfile.gettempdir()
+
+ try:
+ # move the file to the tmp/processed folder
+ shutil.move(os.path.join(self.data_folder_in, filename), processed_folder)
+ except IOError:
+ pass # file could be locked, or removed in meantime so ignore
+
+ filename = os.path.join(processed_folder, filename)
+ try:
+ f = open(filename, 'rb')
+ payload = f.read()
+ f.close()
+ if not self.store_processed:
+ os.remove(filename)
+ except IOError, OSError:
+ raise Exception('Filename [%s] could not be read from queue.' % filename)
+
+ return loads(payload)
+
+ raise Empty()
+
+ def _purge(self, queue):
+ """Remove all messages from `queue`."""
+ count = 0
+ queue_find = '.' + queue + '.msg'
+
+ folder = os.listdir(self.data_folder_in)
+ while len(folder) > 0:
+ filename = folder.pop()
+ try:
+ # only purge messages for the requested queue
+ if filename.find(queue_find) < 0:
+ continue
+
+ filename = os.path.join(self.data_folder_in, filename)
+ os.remove(filename)
+
+ count += 1
+
+ except OSError:
+ # we simply ignore its existence, as it was probably
+ # processed by another worker
+ pass
+
+ return count
+
+ def _size(self, queue):
+ """Return the number of messages in `queue` as an :class:`int`."""
+ count = 0
+
+ queue_find = "." + queue + '.msg'
+ folder = os.listdir(self.data_folder_in)
+ while len(folder) > 0:
+ filename = folder.pop()
+
+ # only handle message for the requested queue
+ if filename.find(queue_find) < 0:
+ continue
+
+ count += 1
+
+ return count
+
+ @property
+ def transport_options(self):
+ return self.connection.client.transport_options
+
+ @cached_property
+ def data_folder_in(self):
+ return self.transport_options.get('data_folder_in', 'data_in')
+
+ @cached_property
+ def data_folder_out(self):
+ return self.transport_options.get('data_folder_out', 'data_out')
+
+ @cached_property
+ def store_processed(self):
+ return self.transport_options.get('store_processed', False)
+
+ @cached_property
+ def processed_folder(self):
+ return self.transport_options.get('processed_folder', 'processed')
+
+class Transport(virtual.Transport):
+ Channel = Channel
+
+ default_port = 0
+ connection_errors = ()
+ channel_errors = (StdChannelError, )
+
+ driver_type = 'filesystem'
+ driver_name = 'filesystem'
+
+ def driver_version(self):
+ return 'N/A'