summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2012-07-05 14:50:35 +0100
committerAsk Solem <ask@celeryproject.org>2012-07-05 14:50:35 +0100
commitd25a12868cae48d8799f3a42633238e8502ef296 (patch)
tree8a17699e3d003c3c3730006cb764854271ac6dfc
parent25d22f3a00c41340a96d77b5a5deba96359eacc4 (diff)
downloadkombu-d25a12868cae48d8799f3a42633238e8502ef296.tar.gz
Adds Bobby Beever to AUTHORS
-rw-r--r--AUTHORS1
-rw-r--r--kombu/tests/test_common.py1
-rw-r--r--kombu/tests/transport/test_filesystem.py37
-rw-r--r--kombu/transport/filesystem.py45
4 files changed, 57 insertions, 27 deletions
diff --git a/AUTHORS b/AUTHORS
index 12d57966..8aedb36d 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -11,6 +11,7 @@ Andrii Kostenko <andrey@kostenko.name>
Andy McCurdy <andy@andymccurdy.com>
Anton Gyllenberg <anton@iki.fi>
Ask Solem <ask@celeryproject.org>
+Bobby Beever <bobby.beever@yahoo.com>
Brian Bernstein
Christophe Chauvet <christophe.chauvet@gmail.com>
Christopher Grebs <cg@webshox.org>
diff --git a/kombu/tests/test_common.py b/kombu/tests/test_common.py
index 10f63cb4..a66ecf39 100644
--- a/kombu/tests/test_common.py
+++ b/kombu/tests/test_common.py
@@ -36,6 +36,7 @@ class test_maybe_declare(TestCase):
client.declared_entities = set()
entity = Mock()
entity.can_cache_declaration = True
+ entity.auto_delete = False
entity.is_bound = True
entity.channel = channel
diff --git a/kombu/tests/transport/test_filesystem.py b/kombu/tests/transport/test_filesystem.py
index 547f5a70..7d004b2a 100644
--- a/kombu/tests/transport/test_filesystem.py
+++ b/kombu/tests/transport/test_filesystem.py
@@ -3,6 +3,8 @@ from __future__ import with_statement
import tempfile
+from nose import SkipTest
+
from kombu.connection import Connection
from kombu.entity import Exchange, Queue
from kombu.messaging import Consumer, Producer
@@ -13,10 +15,21 @@ from kombu.tests.utils import TestCase
class test_FilesystemTransport(TestCase):
def setUp(self):
- data_folder_in = tempfile.mkdtemp()
- data_folder_out = tempfile.mkdtemp()
- self.c = Connection(transport='filesystem', transport_options={'data_folder_in': data_folder_in, 'data_folder_out': data_folder_out})
- self.p = Connection(transport='filesystem', transport_options={'data_folder_in': data_folder_out, 'data_folder_out': data_folder_in})
+ try:
+ data_folder_in = tempfile.mkdtemp()
+ data_folder_out = tempfile.mkdtemp()
+ except Exception:
+ raise SkipTest('filesystem transport: cannot create tempfiles')
+ self.c = Connection(transport='filesystem',
+ transport_options={
+ 'data_folder_in': data_folder_in,
+ 'data_folder_out': data_folder_out,
+ })
+ self.p = Connection(transport='filesystem',
+ transport_options={
+ 'data_folder_in': data_folder_out,
+ 'data_folder_out': data_folder_in,
+ })
self.e = Exchange('test_transport_filesystem')
self.q = Queue('test_transport_filesystem',
exchange=self.e,
@@ -30,7 +43,8 @@ class test_FilesystemTransport(TestCase):
consumer = Consumer(self.c.channel(), self.q, no_ack=True)
for i in range(10):
- producer.publish({'foo': i}, routing_key='test_transport_filesystem')
+ producer.publish({'foo': i},
+ routing_key='test_transport_filesystem')
_received = []
@@ -56,9 +70,11 @@ class test_FilesystemTransport(TestCase):
self.q2(consumer_channel).declare()
for i in range(10):
- producer.publish({'foo': i}, routing_key='test_transport_filesystem')
+ producer.publish({'foo': i},
+ routing_key='test_transport_filesystem')
for i in range(10):
- producer.publish({'foo': i}, routing_key='test_transport_filesystem2')
+ producer.publish({'foo': i},
+ routing_key='test_transport_filesystem2')
_received1 = []
_received2 = []
@@ -93,7 +109,8 @@ class test_FilesystemTransport(TestCase):
# queue.delete
for i in range(10):
- producer.publish({'foo': i}, routing_key='test_transport_filesystem')
+ producer.publish({'foo': i},
+ routing_key='test_transport_filesystem')
self.assertTrue(self.q(consumer_channel).get())
self.q(consumer_channel).delete()
self.q(consumer_channel).declare()
@@ -101,8 +118,8 @@ class test_FilesystemTransport(TestCase):
# queue.purge
for i in range(10):
- producer.publish({'foo': i}, routing_key='test_transport_filesystem2')
+ producer.publish({'foo': i},
+ routing_key='test_transport_filesystem2')
self.assertTrue(self.q2(consumer_channel).get())
self.q2(consumer_channel).purge()
self.assertIsNone(self.q2(consumer_channel).get())
-
diff --git a/kombu/transport/filesystem.py b/kombu/transport/filesystem.py
index bd8db8f6..193126da 100644
--- a/kombu/transport/filesystem.py
+++ b/kombu/transport/filesystem.py
@@ -1,4 +1,5 @@
"""Kombu transport using a filesystem as the message store."""
+from __future__ import absolute_import
from Queue import Empty
@@ -20,11 +21,14 @@ __version__ = ".".join(map(str, VERSION))
# needs win32all to work on Windows
if os.name == 'nt':
- import win32con, win32file, pywintypes
+ import win32con
+ import win32file
+ import pywintypes
LOCK_EX = win32con.LOCKFILE_EXCLUSIVE_LOCK
- LOCK_SH = 0 # the default
- LOCK_NB = win32con.LOCKFILE_FAIL_IMMEDIATELY
+ # 0 is the default
+ LOCK_SH = 0 # noqa
+ LOCK_NB = win32con.LOCKFILE_FAIL_IMMEDIATELY # noqa
__overlapped = pywintypes.OVERLAPPED()
def lock(file, flags):
@@ -38,30 +42,34 @@ if os.name == 'nt':
elif os.name == 'posix':
import fcntl
- from fcntl import LOCK_EX, LOCK_SH, LOCK_NB
+ from fcntl import LOCK_EX, LOCK_SH, LOCK_NB # noqa
- def lock(file, flags):
+ def lock(file, flags): # noqa
fcntl.flock(file.fileno(), flags)
- def unlock(file):
+ def unlock(file): # noqa
fcntl.flock(file.fileno(), fcntl.LOCK_UN)
else:
- raise RuntimeError('Filesystem plugin only defined for nt and posix platforms')
+ raise RuntimeError(
+ 'Filesystem plugin only defined for NT and POSIX platforms')
+
class Channel(virtual.Channel):
def _put(self, queue, payload, **kwargs):
"""Put `message` onto `queue`."""
- filename = '%s_%s.%s.msg' % (int(round(time.time()*1000)), uuid.uuid4(), queue)
+ filename = '%s_%s.%s.msg' % (int(round(time.time() * 1000)),
+ uuid.uuid4(), queue)
filename = os.path.join(self.data_folder_out, filename)
try:
f = open(filename, 'wb')
lock(f, LOCK_EX)
f.write(dumps(payload))
- except IOError, OSError:
- raise Exception('Filename [%s] could not be placed into folder.' % filename)
+ except (IOError, OSError):
+ raise StdChannelError(
+ 'Filename [%s] could not be placed into folder.' % filename)
finally:
unlock(f)
f.close()
@@ -86,9 +94,10 @@ class Channel(virtual.Channel):
try:
# move the file to the tmp/processed folder
- shutil.move(os.path.join(self.data_folder_in, filename), processed_folder)
+ shutil.move(os.path.join(self.data_folder_in, filename),
+ processed_folder)
except IOError:
- pass # file could be locked, or removed in meantime so ignore
+ pass # file could be locked, or removed in meantime so ignore
filename = os.path.join(processed_folder, filename)
try:
@@ -97,8 +106,9 @@ class Channel(virtual.Channel):
f.close()
if not self.store_processed:
os.remove(filename)
- except IOError, OSError:
- raise Exception('Filename [%s] could not be read from queue.' % filename)
+ except (IOError, OSError):
+ raise StdChannelError(
+ 'Filename [%s] could not be read from queue.' % filename)
return loads(payload)
@@ -123,9 +133,9 @@ class Channel(virtual.Channel):
count += 1
except OSError:
- # we simply ignore its existence, as it was probably
- # processed by another worker
- pass
+ # we simply ignore its existence, as it was probably
+ # processed by another worker
+ pass
return count
@@ -166,6 +176,7 @@ class Channel(virtual.Channel):
def processed_folder(self):
return self.transport_options.get('processed_folder', 'processed')
+
class Transport(virtual.Transport):
Channel = Channel