diff options
Diffstat (limited to 'moved/transport/pyro.py')
-rw-r--r-- | moved/transport/pyro.py | 94 |
1 files changed, 94 insertions, 0 deletions
diff --git a/moved/transport/pyro.py b/moved/transport/pyro.py new file mode 100644 index 00000000..c52532aa --- /dev/null +++ b/moved/transport/pyro.py @@ -0,0 +1,94 @@ +"""Pyro transport. + +Requires the :mod:`Pyro4` library to be installed. +""" +import sys + +from kombu.utils.objects import cached_property + +from . import virtual + +try: + import Pyro4 as pyro + from Pyro4.errors import NamingError +except ImportError: # pragma: no cover + pyro = NamingError = None # noqa + +DEFAULT_PORT = 9090 +E_LOOKUP = """\ +Unable to locate pyro nameserver {0.virtual_host} on host {0.hostname}\ +""" + + +class Channel(virtual.Channel): + """Pyro Channel.""" + + def queues(self): + return self.shared_queues.get_queue_names() + + def _new_queue(self, queue, **kwargs): + if queue not in self.queues(): + self.shared_queues.new_queue(queue) + + def _get(self, queue, timeout=None): + queue = self._queue_for(queue) + msg = self.shared_queues._get(queue) + return msg + + def _queue_for(self, queue): + if queue not in self.queues(): + self.shared_queues.new_queue(queue) + return queue + + def _put(self, queue, message, **kwargs): + queue = self._queue_for(queue) + self.shared_queues._put(queue, message) + + def _size(self, queue): + return self.shared_queues._size(queue) + + def _delete(self, queue, *args, **kwargs): + self.shared_queues._delete(queue) + + def _purge(self, queue): + return self.shared_queues._purge(queue) + + def after_reply_message_received(self, queue): + ... + + @cached_property + def shared_queues(self): + return self.connection.shared_queues + + +class Transport(virtual.Transport): + """Pyro Transport.""" + + Channel = Channel + + #: memory backend state is global. + state = virtual.BrokerState() + + default_port = DEFAULT_PORT + + driver_type = driver_name = 'pyro' + + def _open(self): + conninfo = self.client + pyro.config.HMAC_KEY = conninfo.virtual_host + try: + nameserver = pyro.locateNS(host=conninfo.hostname, + port=self.default_port) + # name of registered pyro object + uri = nameserver.lookup(conninfo.virtual_host) + return pyro.Proxy(uri) + except NamingError: + raise NamingError(E_LOOKUP.format(conninfo)).with_traceback( + sys.exc_info()[2]) + + def driver_version(self): + return pyro.__version__ + + @cached_property + def shared_queues(self): + return self._open() |