diff options
Diffstat (limited to 'kombu/transport')
-rw-r--r-- | kombu/transport/SLMQ.py | 5 | ||||
-rw-r--r-- | kombu/transport/amqplib.py | 2 | ||||
-rw-r--r-- | kombu/transport/librabbitmq.py | 3 | ||||
-rw-r--r-- | kombu/transport/memory.py | 6 | ||||
-rw-r--r-- | kombu/transport/mongodb.py | 2 | ||||
-rw-r--r-- | kombu/transport/pyamqp.py | 3 | ||||
-rw-r--r-- | kombu/transport/virtual/__init__.py | 2 | ||||
-rw-r--r-- | kombu/transport/zmq.py | 2 |
8 files changed, 13 insertions, 12 deletions
diff --git a/kombu/transport/SLMQ.py b/kombu/transport/SLMQ.py index 75d8f0b8..c9acc71a 100644 --- a/kombu/transport/SLMQ.py +++ b/kombu/transport/SLMQ.py @@ -10,13 +10,12 @@ from __future__ import absolute_import import socket import string -from Queue import Empty - from anyjson import loads, dumps import softlayer_messaging import os +from kombu.five import Empty, text_t from kombu.utils import cached_property # , uuid from kombu.utils.encoding import safe_str @@ -54,7 +53,7 @@ class Channel(virtual.Channel): def entity_name(self, name, table=CHARS_REPLACE_TABLE): """Format AMQP queue name into a valid SLQS queue name.""" - return unicode(safe_str(name)).translate(table) + return text_t(safe_str(name)).translate(table) def _new_queue(self, queue, **kwargs): """Ensures a queue exists in SLQS.""" diff --git a/kombu/transport/amqplib.py b/kombu/transport/amqplib.py index 99d4de65..00099694 100644 --- a/kombu/transport/amqplib.py +++ b/kombu/transport/amqplib.py @@ -331,7 +331,7 @@ class Transport(base.Transport): def establish_connection(self): """Establish connection to the AMQP broker.""" conninfo = self.client - for name, default_value in self.default_connection_params.items(): + for name, default_value in items(self.default_connection_params): if not getattr(conninfo, name, None): setattr(conninfo, name, default_value) if conninfo.hostname == 'localhost': diff --git a/kombu/transport/librabbitmq.py b/kombu/transport/librabbitmq.py index acfa33e7..e3211d65 100644 --- a/kombu/transport/librabbitmq.py +++ b/kombu/transport/librabbitmq.py @@ -22,6 +22,7 @@ except ImportError: raise ImportError("No module named librabbitmq") from kombu.exceptions import StdConnectionError, StdChannelError +from kombu.five import items from kombu.utils.amq_manager import get_manager from . import base @@ -95,7 +96,7 @@ class Transport(base.Transport): def establish_connection(self): """Establish connection to the AMQP broker.""" conninfo = self.client - for name, default_value in self.default_connection_params.items(): + for name, default_value in items(self.default_connection_params): if not getattr(conninfo, name, None): setattr(conninfo, name, default_value) conn = self.Connection(host=conninfo.host, diff --git a/kombu/transport/memory.py b/kombu/transport/memory.py index 514b361a..2b763726 100644 --- a/kombu/transport/memory.py +++ b/kombu/transport/memory.py @@ -7,7 +7,7 @@ In-memory transport. """ from __future__ import absolute_import -from kombu.five import Queue +from kombu.five import Queue, values from . import virtual @@ -36,7 +36,7 @@ class Channel(virtual.Channel): pass def _put_fanout(self, exchange, message, **kwargs): - for queue in self.queues.values(): + for queue in values(self.queues): queue.put(message) def _put(self, queue, message, **kwargs): @@ -56,7 +56,7 @@ class Channel(virtual.Channel): def close(self): super(Channel, self).close() - for queue in self.queues.values(): + for queue in values(self.queues): queue.empty() self.queues = {} diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index 758c96c2..0a66f9d4 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -48,7 +48,7 @@ class Channel(virtual.Channel): def _get(self, queue): try: if queue in self._fanout_queues: - msg = self._queue_cursors[queue].next() + msg = next(self._queue_cursors[queue]) self._queue_readcounts[queue] += 1 return loads(msg['payload']) else: diff --git a/kombu/transport/pyamqp.py b/kombu/transport/pyamqp.py index 06d6fc60..27fb81c8 100644 --- a/kombu/transport/pyamqp.py +++ b/kombu/transport/pyamqp.py @@ -14,6 +14,7 @@ from kombu.exceptions import ( StdChannelError, VersionMismatch, ) +from kombu.five import items from kombu.utils.amq_manager import get_manager from . import base @@ -96,7 +97,7 @@ class Transport(base.Transport): def establish_connection(self): """Establish connection to the AMQP broker.""" conninfo = self.client - for name, default_value in self.default_connection_params.items(): + for name, default_value in items(self.default_connection_params): if not getattr(conninfo, name, None): setattr(conninfo, name, default_value) if conninfo.hostname == 'localhost': diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index 62d684ae..74f60b6f 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -201,7 +201,7 @@ class QoS(object): unrestored = self.restore_unacked() if unrestored: - errors, messages = zip(*unrestored) + errors, messages = list(zip(*unrestored)) say('UNABLE TO RESTORE {0} MESSAGES: {1}', len(errors), errors) emergency_dump_state(messages) diff --git a/kombu/transport/zmq.py b/kombu/transport/zmq.py index e7a78f97..315e571b 100644 --- a/kombu/transport/zmq.py +++ b/kombu/transport/zmq.py @@ -140,7 +140,7 @@ class Client(object): return self.sink.recv(flags=zmq.NOBLOCK) except ZMQError as exc: if exc.errno == zmq.EAGAIN: - raise socket.error(errno.EAGAIN, e.strerror) + raise socket.error(errno.EAGAIN, exc.strerror) else: raise |