path: root/kombu
diff options
authorStephen Milner <>2016-08-26 05:06:54 -0400
committerAsif Saifuddin Auvi <>2016-08-26 15:06:54 +0600
commit7fa3172ead722e9e35e05dc7f8c26c61a74d280b (patch)
treedb16ef937fca5ea6b03a8b781ac4ba7b5e4127b6 /kombu
parentc5886a33efd78955dabd8d6e1ad85d200c293605 (diff)
Add Etcd Transport (#619)
This transport is highly influenced by the Consul Transport. Messages are kept in keys within Etcd through the python-etcd library.
Diffstat (limited to 'kombu')
3 files changed, 362 insertions, 1 deletions
diff --git a/kombu/tests/transport/ b/kombu/tests/transport/
new file mode 100644
index 00000000..7994ec67
--- /dev/null
+++ b/kombu/tests/transport/
@@ -0,0 +1,67 @@
+from __future__ import absolute_import, unicode_literals
+import mock
+from kombu.five import Empty
+from kombu.transport.etcd import Channel, Transport
+from import Case, Mock, skip
+class test_Etcd(Case):
+ def setup(self):
+ self.connection = Mock()
+ self.connection.client.transport_options = {}
+ self.connection.client.port = 2739
+ self.client = self.patch('etcd.Client').return_value
+ = Channel(connection=self.connection)
+ def test_driver_version(self):
+ self.assertTrue(Transport(self.connection.client).driver_version())
+ def test_failed_get(self):
+ = Mock(return_value=False)
+ = IndexError
+ with mock.patch('etcd.Lock'):
+ with self.assertRaises(Empty):
+ def test_test_purge(self):
+ with mock.patch('etcd.Lock'):
+ self.client.delete = Mock(return_value=True)
+ self.assertTrue('foo'))
+ def test_key_prefix(self):
+ key ='myqueue')
+ self.assertEqual(key, 'kombu/myqueue')
+ def test_create_delete_queue(self):
+ queue = 'mynewqueue'
+ with mock.patch('etcd.Lock'):
+ self.client.write.return_value = self.patch('etcd.EtcdResult')
+ self.assertTrue(
+ self.client.delete.return_value = self.patch('etcd.EtcdResult')
+ def test_size(self):
+ with mock.patch('etcd.Lock'):
+ = self.patch(
+ 'etcd.EtcdResult', _children=[{}, {}])
+ self.assertEqual('q'), 2)
+ def test_get(self):
+ with mock.patch('etcd.Lock'):
+ = self.patch(
+ 'etcd.EtcdResult',
+ _children=[{'key': 'myqueue', 'modifyIndex': 1, 'value': '1'}])
+ self.assertIsNotNone('myqueue'))
+ def test_put(self):
+ with mock.patch('etcd.Lock'):
+ self.client.write.return_value = self.patch('etcd.EtcdResult')
+ self.assertIsNone('myqueue', 'mydata'))
diff --git a/kombu/transport/ b/kombu/transport/
index 57b67236..9455275e 100644
--- a/kombu/transport/
+++ b/kombu/transport/
@@ -31,7 +31,8 @@ TRANSPORT_ALIASES = {
'filesystem': 'kombu.transport.filesystem:Transport',
'qpid': 'kombu.transport.qpid:Transport',
'sentinel': 'kombu.transport.redis:SentinelTransport',
- 'consul': 'kombu.transport.consul:Transport'
+ 'consul': 'kombu.transport.consul:Transport',
+ 'etcd': 'kombu.transport.etcd:Transport',
_transport_cache = {}
diff --git a/kombu/transport/ b/kombu/transport/
new file mode 100644
index 00000000..afdd7763
--- /dev/null
+++ b/kombu/transport/
@@ -0,0 +1,293 @@
+Etcd Transport.
+It uses Etcd as a store to transport messages in Queues
+It uses python-etcd for talking to Etcd's HTTP API
+from __future__ import absolute_import, unicode_literals
+import os
+import socket
+from collections import defaultdict
+from contextlib import contextmanager
+from kombu.exceptions import ChannelError
+from kombu.five import Empty
+from kombu.log import get_logger
+from kombu.utils.json import loads, dumps
+from kombu.utils.objects import cached_property
+from . import virtual
+ import etcd
+except ImportError:
+ etcd = None
+logger = get_logger('kombu.transport.etcd')
+DEFAULT_HOST = 'localhost'
+class Channel(virtual.Channel):
+ """
+ Etcd Channel class which talks to the Etcd.
+ """
+ prefix = 'kombu'
+ index = None
+ timeout = 10
+ session_ttl = 30
+ lock_ttl = 10
+ def __init__(self, *args, **kwargs):
+ """
+ Creates a new instance of the etcd.Channel.
+ """
+ if etcd is None:
+ raise ImportError('Missing python-etcd library')
+ super(Channel, self).__init__(*args, **kwargs)
+ port = self.connection.client.port or self.connection.default_port
+ host = self.connection.client.hostname or DEFAULT_HOST
+ logger.debug('Host: %s Port: %s Timeout: %s', host, port, self.timeout)
+ self.queues = defaultdict(dict)
+ self.client = etcd.Client(host=host, port=int(port))
+ def _key_prefix(self, queue):
+ """
+ Creates and returns the `queue` with the proper prefix.
+ Arguments:
+ queue (str): The name of the queue.
+ """
+ return '{0}/{1}'.format(self.prefix, queue)
+ @contextmanager
+ def _queue_lock(self, queue):
+ """Try to acquire a lock on the Queue.
+ It does so by creating a object called 'lock' which is locked by the
+ current session..
+ This way other nodes are not able to write to the lock object which
+ means that they have to wait before the lock is released.
+ Arguments:
+ queue (str): The name of the queue.
+ """
+ lock = etcd.Lock(self.client, queue)
+ lock._uuid = self.lock_value
+ logger.debug('Acquiring lock {0}'.format(
+ lock.acquire(blocking=True, lock_ttl=self.lock_ttl)
+ try:
+ yield
+ finally:
+ logger.debug('Releasing lock {0}'.format(
+ lock.release()
+ def _new_queue(self, queue, **_):
+ """
+ Creates a new `queue` if the `queue` doesn't already exist.
+ Arguments:
+ queue (str): The name of the queue.
+ """
+ self.queues[queue] = queue
+ with self._queue_lock(queue):
+ try:
+ return self.client.write(
+ key=self._key_prefix(queue), dir=True, value=None)
+ except etcd.EtcdNotFile:
+ logger.debug('Queue "{0}" already exists'.format(queue))
+ return
+ def _has_queue(self, queue, **kwargs):
+ """
+ Verify that queue exists.
+ Returns:
+ bool: Should return :const:`True` if the queue exists
+ or :const:`False` otherwise.
+ """
+ try:
+ return True
+ except etcd.EtcdKeyNotFound:
+ return False
+ def _delete(self, queue, *args, **_):
+ """
+ Deletes a `queue`.
+ Arguments:
+ queue (str): The name of the queue.
+ """
+ self.queues.pop(queue, None)
+ self._purge(queue)
+ def _put(self, queue, payload, **_):
+ """
+ Put `message` onto `queue`.
+ This simply writes a key to the Etcd store
+ Arguments:
+ queue (str): The name of the queue.
+ payload (dict): Message data which will be dumped to etcd.
+ """
+ with self._queue_lock(queue):
+ key = self._key_prefix(queue)
+ if not self.client.write(
+ key=key,
+ value=dumps(payload),
+ append=True):
+ raise ChannelError('Cannot add key {0!r} to etcd'.format(key))
+ def _get(self, queue, timeout=None):
+ """
+ Get the first available message from the queue.
+ Before it does so it acquires a lock on the store so
+ only one node reads at the same time. This is for read consistency
+ Arguments:
+ queue (str): The name of the queue.
+ timeout (int): Optional seconds to wait for a response.
+ """
+ with self._queue_lock(queue):
+ key = self._key_prefix(queue)
+ logger.debug('Fetching key %s with index %s', key, self.index)
+ try:
+ result =
+ key=key, recursive=True,
+ index=self.index, timeout=self.timeout)
+ if result is None:
+ raise Empty()
+ item = result._children[-1]
+ logger.debug('Removing key {0}'.format(item['key']))
+ msg_content = loads(item['value'])
+ self.client.delete(key=item['key'])
+ return msg_content
+ except (TypeError, IndexError, etcd.EtcdError) as error:
+ logger.debug('_get failed: {0}:{1}'.format(type(error), error))
+ raise Empty()
+ def _purge(self, queue):
+ """
+ Removes all `message`s from a `queue`.
+ Arguments:
+ queue (str): The name of the queue.
+ """
+ with self._queue_lock(queue):
+ key = self._key_prefix(queue)
+ logger.debug('Purging queue at key {0}'.format(key))
+ return self.client.delete(key=key, recursive=True)
+ def _size(self, queue):
+ """
+ Returns the size of the `queue`.
+ Arguments:
+ queue (str): The name of the queue.
+ """
+ with self._queue_lock(queue):
+ size = 0
+ try:
+ key = self._key_prefix(queue)
+ logger.debug('Fetching key recursively %s with index %s',
+ key, self.index)
+ result =
+ key=key, recursive=True,
+ index=self.index)
+ size = len(result._children)
+ except TypeError:
+ pass
+ logger.debug('Found %s keys under %s with index %s',
+ size, key, self.index)
+ return size
+ @cached_property
+ def lock_value(self):
+ return '{0}.{1}'.format(socket.gethostname(), os.getpid())
+class Transport(virtual.Transport):
+ """
+ Etcd storage Transport for Kombu.
+ """
+ Channel = Channel
+ default_port = DEFAULT_PORT
+ driver_type = 'etcd'
+ driver_name = 'python-etcd'
+ polling_interval = 3
+ implements = virtual.Transport.implements.extend(
+ exchange_type=frozenset(['direct']))
+ def __init__(self, *args, **kwargs):
+ """
+ Creates a new instance of etcd.Transport.
+ """
+ if etcd is None:
+ raise ImportError('Missing python-etcd library')
+ super(Transport, self).__init__(*args, **kwargs)
+ self.connection_errors = (
+ virtual.Transport.connection_errors + (etcd.EtcdError, )
+ )
+ self.channel_errors = (
+ virtual.Transport.channel_errors + (etcd.EtcdError, )
+ )
+ def verify_connection(self, connection):
+ """
+ Verifies the connection works.
+ """
+ port = connection.client.port or self.default_port
+ host = connection.client.hostname or DEFAULT_HOST
+ logger.debug('Verify Etcd connection to %s:%s', host, port)
+ try:
+ etcd.Client(host=host, port=int(port))
+ return True
+ except ValueError:
+ pass
+ return False
+ def driver_version(self):
+ """
+ Returns the version of the etcd library.
+ .. note::
+ python-etcd has no __version__. This is a workaround.
+ """
+ try:
+ import pip.commands.freeze
+ for x in pip.commands.freeze.freeze():
+ if x.startswith('python-etcd'):
+ return x.split('==')[1]
+ except (ImportError, IndexError):
+ logger.warn('Unable to find the python-etcd version.')
+ return 'Unknown'