summaryrefslogtreecommitdiff
path: root/moved/transport/zookeeper.py
diff options
context:
space:
mode:
Diffstat (limited to 'moved/transport/zookeeper.py')
-rw-r--r--moved/transport/zookeeper.py198
1 files changed, 198 insertions, 0 deletions
diff --git a/moved/transport/zookeeper.py b/moved/transport/zookeeper.py
new file mode 100644
index 00000000..cf9f0dc8
--- /dev/null
+++ b/moved/transport/zookeeper.py
@@ -0,0 +1,198 @@
+"""Zookeeper transport.
+
+:copyright: (c) 2010 - 2013 by Mahendra M.
+:license: BSD, see LICENSE for more details.
+
+**Synopsis**
+
+Connects to a zookeeper node as <server>:<port>/<vhost>
+The <vhost> becomes the base for all the other znodes. So we can use
+it like a vhost.
+
+This uses the built-in kazoo recipe for queues
+
+**References**
+
+- https://zookeeper.apache.org/doc/trunk/recipes.html#sc_recipes_Queues
+- https://kazoo.readthedocs.io/en/latest/api/recipe/queue.html
+
+**Limitations**
+This queue does not offer reliable consumption. An entry is removed from
+the queue prior to being processed. So if an error occurs, the consumer
+has to re-queue the item or it will be lost.
+"""
+
+import os
+import socket
+
+from queue import Empty
+
+from kombu.utils.encoding import bytes_to_str
+from kombu.utils.json import loads, dumps
+
+from . import virtual
+
+try:
+ import kazoo
+ from kazoo.client import KazooClient
+ from kazoo.recipe.queue import Queue
+
+ KZ_CONNECTION_ERRORS = (
+ kazoo.exceptions.SystemErrorException,
+ kazoo.exceptions.ConnectionLossException,
+ kazoo.exceptions.MarshallingErrorException,
+ kazoo.exceptions.UnimplementedException,
+ kazoo.exceptions.OperationTimeoutException,
+ kazoo.exceptions.NoAuthException,
+ kazoo.exceptions.InvalidACLException,
+ kazoo.exceptions.AuthFailedException,
+ kazoo.exceptions.SessionExpiredException,
+ )
+
+ KZ_CHANNEL_ERRORS = (
+ kazoo.exceptions.RuntimeInconsistencyException,
+ kazoo.exceptions.DataInconsistencyException,
+ kazoo.exceptions.BadArgumentsException,
+ kazoo.exceptions.MarshallingErrorException,
+ kazoo.exceptions.UnimplementedException,
+ kazoo.exceptions.OperationTimeoutException,
+ kazoo.exceptions.ApiErrorException,
+ kazoo.exceptions.NoNodeException,
+ kazoo.exceptions.NoAuthException,
+ kazoo.exceptions.NodeExistsException,
+ kazoo.exceptions.NoChildrenForEphemeralsException,
+ kazoo.exceptions.NotEmptyException,
+ kazoo.exceptions.SessionExpiredException,
+ kazoo.exceptions.InvalidCallbackException,
+ socket.error,
+ )
+except ImportError:
+ kazoo = None # noqa
+ KZ_CONNECTION_ERRORS = KZ_CHANNEL_ERRORS = () # noqa
+
+DEFAULT_PORT = 2181
+
+__author__ = 'Mahendra M <mahendra.m@gmail.com>'
+
+
+class Channel(virtual.Channel):
+ """Zookeeper Channel."""
+
+ _client = None
+ _queues = {}
+
+ def _get_path(self, queue_name):
+ return os.path.join(self.vhost, queue_name)
+
+ def _get_queue(self, queue_name):
+ queue = self._queues.get(queue_name, None)
+
+ if queue is None:
+ queue = Queue(self.client, self._get_path(queue_name))
+ self._queues[queue_name] = queue
+
+ # Ensure that the queue is created
+ len(queue)
+
+ return queue
+
+ def _put(self, queue, message, **kwargs):
+ return self._get_queue(queue).put(
+ dumps(message),
+ priority=self._get_message_priority(message, reverse=True),
+ )
+
+ def _get(self, queue):
+ queue = self._get_queue(queue)
+ msg = queue.get()
+
+ if msg is None:
+ raise Empty()
+
+ return loads(bytes_to_str(msg))
+
+ def _purge(self, queue):
+ count = 0
+ queue = self._get_queue(queue)
+
+ while True:
+ msg = queue.get()
+ if msg is None:
+ break
+ count += 1
+
+ return count
+
+ def _delete(self, queue, *args, **kwargs):
+ if self._has_queue(queue):
+ self._purge(queue)
+ self.client.delete(self._get_path(queue))
+
+ def _size(self, queue):
+ queue = self._get_queue(queue)
+ return len(queue)
+
+ def _new_queue(self, queue, **kwargs):
+ if not self._has_queue(queue):
+ queue = self._get_queue(queue)
+
+ def _has_queue(self, queue):
+ return self.client.exists(self._get_path(queue)) is not None
+
+ def _open(self):
+ conninfo = self.connection.client
+ self.vhost = os.path.join('/', conninfo.virtual_host[0:-1])
+ hosts = []
+ if conninfo.alt:
+ for host_port in conninfo.alt:
+ if host_port.startswith('zookeeper://'):
+ host_port = host_port[len('zookeeper://'):]
+ if not host_port:
+ continue
+ try:
+ host, port = host_port.split(':', 1)
+ host_port = (host, int(port))
+ except ValueError:
+ if host_port == conninfo.hostname:
+ host_port = (host_port, conninfo.port or DEFAULT_PORT)
+ else:
+ host_port = (host_port, DEFAULT_PORT)
+ hosts.append(host_port)
+ host_port = (conninfo.hostname, conninfo.port or DEFAULT_PORT)
+ if host_port not in hosts:
+ hosts.insert(0, host_port)
+ conn_str = ','.join(['%s:%s' % (h, p) for h, p in hosts])
+ conn = KazooClient(conn_str)
+ conn.start()
+ return conn
+
+ @property
+ def client(self):
+ if self._client is None:
+ self._client = self._open()
+ return self._client
+
+
+class Transport(virtual.Transport):
+ """Zookeeper Transport."""
+
+ Channel = Channel
+ polling_interval = 1
+ default_port = DEFAULT_PORT
+ connection_errors = (
+ virtual.Transport.connection_errors + KZ_CONNECTION_ERRORS
+ )
+ channel_errors = (
+ virtual.Transport.channel_errors + KZ_CHANNEL_ERRORS
+ )
+ driver_type = 'zookeeper'
+ driver_name = 'kazoo'
+
+ def __init__(self, *args, **kwargs):
+ if kazoo is None:
+ raise ImportError('The kazoo library is not installed')
+
+ super().__init__(*args, **kwargs)
+
+ def driver_version(self):
+ return kazoo.__version__