diff options
Diffstat (limited to 'kombu/transport/consul.py')
-rw-r--r-- | kombu/transport/consul.py | 21 |
1 files changed, 10 insertions, 11 deletions
diff --git a/kombu/transport/consul.py b/kombu/transport/consul.py index 26dca7ac..980487ba 100644 --- a/kombu/transport/consul.py +++ b/kombu/transport/consul.py @@ -4,7 +4,6 @@ It uses Consul.io's Key/Value store to transport messages in Queues It uses python-consul for talking to Consul's HTTP API """ -from __future__ import absolute_import, unicode_literals import uuid import socket @@ -47,7 +46,7 @@ class Channel(virtual.Channel): if consul is None: raise ImportError('Missing python-consul library') - super(Channel, self).__init__(*args, **kwargs) + super().__init__(*args, **kwargs) port = self.connection.client.port or self.connection.default_port host = self.connection.client.hostname or DEFAULT_HOST @@ -59,10 +58,10 @@ class Channel(virtual.Channel): self.client = consul.Consul(host=host, port=int(port)) def _lock_key(self, queue): - return '{0}/{1}.lock'.format(self.prefix, queue) + return f'{self.prefix}/{queue}.lock' def _key_prefix(self, queue): - return '{0}/{1}'.format(self.prefix, queue) + return f'{self.prefix}/{queue}' def _get_or_create_session(self, queue): """Get or create consul session. @@ -178,13 +177,13 @@ class Channel(virtual.Channel): This simply writes a key to the K/V store of Consul """ - key = '{0}/msg/{1}_{2}'.format( + key = '{}/msg/{}_{}'.format( self._key_prefix(queue), int(round(monotonic() * 1000)), uuid.uuid4(), ) if not self.client.kv.put(key=key, value=dumps(payload), cas=0): - raise ChannelError('Cannot add key {0!r} to consul'.format(key)) + raise ChannelError(f'Cannot add key {key!r} to consul') def _get(self, queue, timeout=None): """Get the first available message from the queue. @@ -193,7 +192,7 @@ class Channel(virtual.Channel): only one node reads at the same time. This is for read consistency """ with self._queue_lock(queue, raising=Empty): - key = '{0}/msg/'.format(self._key_prefix(queue)) + key = '{}/msg/'.format(self._key_prefix(queue)) logger.debug('Fetching key %s with index %s', key, self.index) self.index, data = self.client.kv.get( key=key, recurse=True, @@ -219,14 +218,14 @@ class Channel(virtual.Channel): def _purge(self, queue): self._destroy_session(queue) return self.client.kv.delete( - key='{0}/msg/'.format(self._key_prefix(queue)), + key='{}/msg/'.format(self._key_prefix(queue)), recurse=True, ) def _size(self, queue): size = 0 try: - key = '{0}/msg/'.format(self._key_prefix(queue)) + key = '{}/msg/'.format(self._key_prefix(queue)) logger.debug('Fetching key recursively %s with index %s', key, self.index) self.index, data = self.client.kv.get( @@ -243,7 +242,7 @@ class Channel(virtual.Channel): @cached_property def lock_name(self): - return '{0}'.format(socket.gethostname()) + return f'{socket.gethostname()}' class Transport(virtual.Transport): @@ -259,7 +258,7 @@ class Transport(virtual.Transport): if consul is None: raise ImportError('Missing python-consul library') - super(Transport, self).__init__(*args, **kwargs) + super().__init__(*args, **kwargs) self.connection_errors = ( virtual.Transport.connection_errors + ( |