1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
"""
kombu.transport.beanstalk
=========================
Beanstalk transport.
:copyright: (c) 2010 - 2011 by David Ziegler.
:license: BSD, see LICENSE for more details.
"""
import socket
from Queue import Empty
from anyjson import serialize, deserialize
from beanstalkc import Connection, BeanstalkcException, SocketError
from kombu.transport import virtual
DEFAULT_PORT = 11300
__author__ = "David Ziegler <david.ziegler@gmail.com>"
class Channel(virtual.Channel):
_client = None
def _parse_job(self, job):
item, dest = None, None
if job:
try:
item = deserialize(job.body)
dest = job.stats()["tube"]
except Exception:
job.bury()
else:
job.delete()
else:
raise Empty()
return item, dest
def _put(self, queue, message, **kwargs):
priority = message["properties"]["delivery_info"]["priority"]
self.client.use(queue)
self.client.put(serialize(message), priority=priority)
def _get(self, queue):
if queue not in self.client.watching():
self.client.watch(queue)
[self.client.ignore(active)
for active in self.client.watching()
if active != queue]
job = self.client.reserve(timeout=1)
item, dest = self._parse_job(job)
return item
def _get_many(self, queues, timeout=1):
# timeout of None will cause beanstalk to timeout waiting
# for a new request
if timeout is None:
timeout = 1
watching = self.client.watching()
[self.client.watch(active)
for active in queues
if active not in watching]
job = self.client.reserve(timeout=timeout)
return self._parse_job(job)
def _purge(self, queue):
if queue not in self.client.watching():
self.client.watch(queue)
[self.client.ignore(active)
for active in self.client.watching()
if active != queue]
count = 0
while 1:
job = self.client.reserve(timeout=1)
if job:
job.delete()
count += 1
else:
break
return count
def _size(self, queue):
return 0
def _open(self):
conninfo = self.connection.client
port = conninfo.port or DEFAULT_PORT
conn = Connection(host=conninfo.hostname, port=port)
conn.connect()
return conn
def close(self):
if self._client is not None:
return self._client.close()
super(Channel, self).close()
@property
def client(self):
if self._client is None:
self._client = self._open()
return self._client
class Transport(virtual.Transport):
Channel = Channel
interval = 1
default_port = DEFAULT_PORT
connection_errors = (socket.error,
SocketError,
IOError)
channel_errors = (socket.error,
IOError,
SocketError,
BeanstalkcException)
|