summaryrefslogtreecommitdiff
path: root/kombu/transport/consul.py
diff options
context:
space:
mode:
Diffstat (limited to 'kombu/transport/consul.py')
-rw-r--r--kombu/transport/consul.py21
1 files changed, 10 insertions, 11 deletions
diff --git a/kombu/transport/consul.py b/kombu/transport/consul.py
index 26dca7ac..980487ba 100644
--- a/kombu/transport/consul.py
+++ b/kombu/transport/consul.py
@@ -4,7 +4,6 @@ It uses Consul.io's Key/Value store to transport messages in Queues
It uses python-consul for talking to Consul's HTTP API
"""
-from __future__ import absolute_import, unicode_literals
import uuid
import socket
@@ -47,7 +46,7 @@ class Channel(virtual.Channel):
if consul is None:
raise ImportError('Missing python-consul library')
- super(Channel, self).__init__(*args, **kwargs)
+ super().__init__(*args, **kwargs)
port = self.connection.client.port or self.connection.default_port
host = self.connection.client.hostname or DEFAULT_HOST
@@ -59,10 +58,10 @@ class Channel(virtual.Channel):
self.client = consul.Consul(host=host, port=int(port))
def _lock_key(self, queue):
- return '{0}/{1}.lock'.format(self.prefix, queue)
+ return f'{self.prefix}/{queue}.lock'
def _key_prefix(self, queue):
- return '{0}/{1}'.format(self.prefix, queue)
+ return f'{self.prefix}/{queue}'
def _get_or_create_session(self, queue):
"""Get or create consul session.
@@ -178,13 +177,13 @@ class Channel(virtual.Channel):
This simply writes a key to the K/V store of Consul
"""
- key = '{0}/msg/{1}_{2}'.format(
+ key = '{}/msg/{}_{}'.format(
self._key_prefix(queue),
int(round(monotonic() * 1000)),
uuid.uuid4(),
)
if not self.client.kv.put(key=key, value=dumps(payload), cas=0):
- raise ChannelError('Cannot add key {0!r} to consul'.format(key))
+ raise ChannelError(f'Cannot add key {key!r} to consul')
def _get(self, queue, timeout=None):
"""Get the first available message from the queue.
@@ -193,7 +192,7 @@ class Channel(virtual.Channel):
only one node reads at the same time. This is for read consistency
"""
with self._queue_lock(queue, raising=Empty):
- key = '{0}/msg/'.format(self._key_prefix(queue))
+ key = '{}/msg/'.format(self._key_prefix(queue))
logger.debug('Fetching key %s with index %s', key, self.index)
self.index, data = self.client.kv.get(
key=key, recurse=True,
@@ -219,14 +218,14 @@ class Channel(virtual.Channel):
def _purge(self, queue):
self._destroy_session(queue)
return self.client.kv.delete(
- key='{0}/msg/'.format(self._key_prefix(queue)),
+ key='{}/msg/'.format(self._key_prefix(queue)),
recurse=True,
)
def _size(self, queue):
size = 0
try:
- key = '{0}/msg/'.format(self._key_prefix(queue))
+ key = '{}/msg/'.format(self._key_prefix(queue))
logger.debug('Fetching key recursively %s with index %s',
key, self.index)
self.index, data = self.client.kv.get(
@@ -243,7 +242,7 @@ class Channel(virtual.Channel):
@cached_property
def lock_name(self):
- return '{0}'.format(socket.gethostname())
+ return f'{socket.gethostname()}'
class Transport(virtual.Transport):
@@ -259,7 +258,7 @@ class Transport(virtual.Transport):
if consul is None:
raise ImportError('Missing python-consul library')
- super(Transport, self).__init__(*args, **kwargs)
+ super().__init__(*args, **kwargs)
self.connection_errors = (
virtual.Transport.connection_errors + (