summaryrefslogtreecommitdiff
path: root/kombu/transport/pyro.py
blob: 7e022273118db6a0f1acc59137b8b16fb95f2998 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
"""Pyro transport, and Kombu Broker daemon.

Requires the :mod:`Pyro4` library to be installed.

To use the Pyro transport with Kombu, use an url of the form:
``pyro://localhost/kombu.broker``

The hostname is where the transport will be looking for a Pyro name server,
which is used in turn to locate the kombu.broker Pyro service.
This broker can be launched by simply executing this transport module directly,
with the command: ``python -m kombu.transport.pyro``
"""

from __future__ import absolute_import, unicode_literals

import sys

from kombu.five import reraise, Queue, Empty
from kombu.utils.objects import cached_property
from kombu.log import get_logger
from . import virtual

try:
    import Pyro4 as pyro
    from Pyro4.errors import NamingError
    from Pyro4.util import SerializerBase
except ImportError:          # pragma: no cover
    pyro = NamingError = SerializerBase = None  # noqa

DEFAULT_PORT = 9090
E_NAMESERVER = """\
Unable to locate pyro nameserver on host {0.hostname}\
"""
E_LOOKUP = """\
Unable to lookup '{0.virtual_host}' in pyro nameserver on host {0.hostname}\
"""

logger = get_logger(__name__)


class Channel(virtual.Channel):
    """Pyro Channel."""

    def close(self):
        super(Channel, self).close()
        if self.shared_queues:
            self.shared_queues._pyroRelease()

    def queues(self):
        return self.shared_queues.get_queue_names()

    def _new_queue(self, queue, **kwargs):
        if queue not in self.queues():
            self.shared_queues.new_queue(queue)

    def _has_queue(self, queue, **kwargs):
        return self.shared_queues.has_queue(queue)

    def _get(self, queue, timeout=None):
        queue = self._queue_for(queue)
        return self.shared_queues.get(queue)

    def _queue_for(self, queue):
        if queue not in self.queues():
            self.shared_queues.new_queue(queue)
        return queue

    def _put(self, queue, message, **kwargs):
        queue = self._queue_for(queue)
        self.shared_queues.put(queue, message)

    def _size(self, queue):
        return self.shared_queues.size(queue)

    def _delete(self, queue, *args, **kwargs):
        self.shared_queues.delete(queue)

    def _purge(self, queue):
        return self.shared_queues.purge(queue)

    def after_reply_message_received(self, queue):
        pass

    @cached_property
    def shared_queues(self):
        return self.connection.shared_queues


class Transport(virtual.Transport):
    """Pyro Transport."""

    Channel = Channel

    #: memory backend state is global.
    state = virtual.BrokerState()

    default_port = DEFAULT_PORT

    driver_type = driver_name = 'pyro'

    def _open(self):
        logger.debug("trying Pyro nameserver to find the broker daemon")
        conninfo = self.client
        try:
            nameserver = pyro.locateNS(host=conninfo.hostname,
                                       port=self.default_port)
        except NamingError:
            reraise(NamingError, NamingError(E_NAMESERVER.format(conninfo)),
                    sys.exc_info()[2])
        try:
            # name of registered pyro object
            uri = nameserver.lookup(conninfo.virtual_host)
            return pyro.Proxy(uri)
        except NamingError:
            reraise(NamingError, NamingError(E_LOOKUP.format(conninfo)),
                    sys.exc_info()[2])

    def driver_version(self):
        return pyro.__version__

    @cached_property
    def shared_queues(self):
        return self._open()


if pyro is not None:
    SerializerBase.register_dict_to_class("queue.Empty",
                                          lambda cls, data: Empty())

    @pyro.expose
    @pyro.behavior(instance_mode="single")
    class KombuBroker(object):
        """Kombu Broker used by the Pyro transport.

        You have to run this as a separate (Pyro) service.
        """

        def __init__(self):
            self.queues = {}

        def get_queue_names(self):
            return list(self.queues)

        def new_queue(self, queue):
            if queue in self.queues:
                return   # silently ignore the fact that queue already exists
            self.queues[queue] = Queue()

        def has_queue(self, queue):
            return queue in self.queues

        def get(self, queue):
            return self.queues[queue].get(block=False)

        def put(self, queue, message):
            self.queues[queue].put(message)

        def size(self, queue):
            return self.queues[queue].qsize()

        def delete(self, queue):
            del self.queues[queue]

        def purge(self, queue):
            while True:
                try:
                    self.queues[queue].get(blocking=False)
                except Empty:
                    break


# launch a Kombu Broker daemon with the command:
# ``python -m kombu.transport.pyro``
if __name__ == "__main__":
    print("Launching Broker for Kombu's Pyro transport.")
    with pyro.Daemon() as daemon:
        print("(Expecting a Pyro name server at {0}:{1})"
              .format(pyro.config.NS_HOST, pyro.config.NS_PORT))
        with pyro.locateNS() as ns:
            print("You can connect with Kombu using the url "
                  "'pyro://{0}/kombu.broker'".format(pyro.config.NS_HOST))
            uri = daemon.register(KombuBroker)
            ns.register("kombu.broker", uri)
        daemon.requestLoop()