1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
|
"""Kombu transport using a filesystem as the message store."""
from Queue import Empty
from anyjson import loads, dumps
import os
import shutil
import time
import uuid
import tempfile
from . import virtual
from kombu.exceptions import StdChannelError
from kombu.utils import cached_property
VERSION = (1, 0, 0)
__version__ = ".".join(map(str, VERSION))
# needs win32all to work on Windows
if os.name == 'nt':
import win32con, win32file, pywintypes
LOCK_EX = win32con.LOCKFILE_EXCLUSIVE_LOCK
LOCK_SH = 0 # the default
LOCK_NB = win32con.LOCKFILE_FAIL_IMMEDIATELY
__overlapped = pywintypes.OVERLAPPED()
def lock(file, flags):
hfile = win32file._get_osfhandle(file.fileno())
win32file.LockFileEx(hfile, flags, 0, 0xffff0000, __overlapped)
def unlock(file):
hfile = win32file._get_osfhandle(file.fileno())
win32file.UnlockFileEx(hfile, 0, 0xffff0000, __overlapped)
elif os.name == 'posix':
import fcntl
from fcntl import LOCK_EX, LOCK_SH, LOCK_NB
def lock(file, flags):
fcntl.flock(file.fileno(), flags)
def unlock(file):
fcntl.flock(file.fileno(), fcntl.LOCK_UN)
else:
raise RuntimeError('Filesystem plugin only defined for nt and posix platforms')
class Channel(virtual.Channel):
def _put(self, queue, payload, **kwargs):
"""Put `message` onto `queue`."""
filename = '%s_%s.%s.msg' % (int(round(time.time()*1000)), uuid.uuid4(), queue)
filename = os.path.join(self.data_folder_out, filename)
try:
f = open(filename, 'wb')
lock(f, LOCK_EX)
f.write(dumps(payload))
except IOError, OSError:
raise Exception('Filename [%s] could not be placed into folder.' % filename)
finally:
unlock(f)
f.close()
def _get(self, queue):
"""Get next message from `queue`."""
queue_find = '.' + queue + '.msg'
folder = os.listdir(self.data_folder_in)
folder = sorted(folder)
while len(folder) > 0:
filename = folder.pop(0)
# only handle message for the requested queue
if filename.find(queue_find) < 0:
continue
if self.store_processed:
processed_folder = self.processed_folder
else:
processed_folder = tempfile.gettempdir()
try:
# move the file to the tmp/processed folder
shutil.move(os.path.join(self.data_folder_in, filename), processed_folder)
except IOError:
pass # file could be locked, or removed in meantime so ignore
filename = os.path.join(processed_folder, filename)
try:
f = open(filename, 'rb')
payload = f.read()
f.close()
if not self.store_processed:
os.remove(filename)
except IOError, OSError:
raise Exception('Filename [%s] could not be read from queue.' % filename)
return loads(payload)
raise Empty()
def _purge(self, queue):
"""Remove all messages from `queue`."""
count = 0
queue_find = '.' + queue + '.msg'
folder = os.listdir(self.data_folder_in)
while len(folder) > 0:
filename = folder.pop()
try:
# only purge messages for the requested queue
if filename.find(queue_find) < 0:
continue
filename = os.path.join(self.data_folder_in, filename)
os.remove(filename)
count += 1
except OSError:
# we simply ignore its existence, as it was probably
# processed by another worker
pass
return count
def _size(self, queue):
"""Return the number of messages in `queue` as an :class:`int`."""
count = 0
queue_find = "." + queue + '.msg'
folder = os.listdir(self.data_folder_in)
while len(folder) > 0:
filename = folder.pop()
# only handle message for the requested queue
if filename.find(queue_find) < 0:
continue
count += 1
return count
@property
def transport_options(self):
return self.connection.client.transport_options
@cached_property
def data_folder_in(self):
return self.transport_options.get('data_folder_in', 'data_in')
@cached_property
def data_folder_out(self):
return self.transport_options.get('data_folder_out', 'data_out')
@cached_property
def store_processed(self):
return self.transport_options.get('store_processed', False)
@cached_property
def processed_folder(self):
return self.transport_options.get('processed_folder', 'processed')
class Transport(virtual.Transport):
Channel = Channel
default_port = 0
connection_errors = ()
channel_errors = (StdChannelError, )
driver_type = 'filesystem'
driver_name = 'filesystem'
def driver_version(self):
return 'N/A'
|