summaryrefslogtreecommitdiff
path: root/kombu/transport/beanstalk.py
blob: 216a021ac48d5312a36fb9818007ed72b01aaf74 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""
kombu.transport.beanstalk
=========================

Beanstalk transport.

:copyright: (c) 2010 - 2011 by David Ziegler.
:license: BSD, see LICENSE for more details.

"""
import socket

from Queue import Empty

from anyjson import serialize, deserialize
from beanstalkc import Connection, BeanstalkcException, SocketError

from kombu.transport import virtual

DEFAULT_PORT = 11300

__author__ = "David Ziegler <david.ziegler@gmail.com>"


class Channel(virtual.Channel):
    _client = None

    def _parse_job(self, job):
        item, dest = None, None
        if job:
            try:
                item = deserialize(job.body)
                dest = job.stats()["tube"]
            except Exception:
                job.bury()
            else:
                job.delete()
        else:
            raise Empty()
        return item, dest

    def _put(self, queue, message, **kwargs):
        priority = message["properties"]["delivery_info"]["priority"]
        self.client.use(queue)
        self.client.put(serialize(message), priority=priority)

    def _get(self, queue):
        if queue not in self.client.watching():
            self.client.watch(queue)

        [self.client.ignore(active)
            for active in self.client.watching()
                if active != queue]

        job = self.client.reserve(timeout=1)
        item, dest = self._parse_job(job)
        return item

    def _get_many(self, queues, timeout=1):
        # timeout of None will cause beanstalk to timeout waiting
        # for a new request
        if timeout is None:
            timeout = 1

        watching = self.client.watching()
        [self.client.watch(active)
            for active in queues
                if active not in watching]

        job = self.client.reserve(timeout=timeout)
        return self._parse_job(job)

    def _purge(self, queue):
        if queue not in self.client.watching():
            self.client.watch(queue)

        [self.client.ignore(active)
                for active in self.client.watching()
                    if active != queue]
        count = 0
        while 1:
            job = self.client.reserve(timeout=1)
            if job:
                job.delete()
                count += 1
            else:
                break
        return count

    def _size(self, queue):
        return 0

    def _open(self):
        conninfo = self.connection.client
        port = conninfo.port or DEFAULT_PORT
        conn = Connection(host=conninfo.hostname, port=port)
        conn.connect()
        return conn

    def close(self):
        if self._client is not None:
            return self._client.close()
        super(Channel, self).close()

    @property
    def client(self):
        if self._client is None:
            self._client = self._open()
        return self._client


class Transport(virtual.Transport):
    Channel = Channel

    interval = 1
    default_port = DEFAULT_PORT
    connection_errors = (socket.error,
                         SocketError,
                         IOError)
    channel_errors = (socket.error,
                      IOError,
                      SocketError,
                      BeanstalkcException)