summaryrefslogtreecommitdiff
path: root/kombu/transport
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2013-02-12 16:37:01 +0000
committerAsk Solem <ask@celeryproject.org>2013-02-12 16:37:01 +0000
commitb0565f750929275b035108de4880e78c1327dbd0 (patch)
tree8971fe3047b305322f0d160b7523d019bace76bd /kombu/transport
parentd84c1fca5b7fb50610f90f2447ba30d9641e5fb4 (diff)
downloadkombu-b0565f750929275b035108de4880e78c1327dbd0.tar.gz
python 3 fixes
Diffstat (limited to 'kombu/transport')
-rw-r--r--kombu/transport/SLMQ.py5
-rw-r--r--kombu/transport/amqplib.py2
-rw-r--r--kombu/transport/librabbitmq.py3
-rw-r--r--kombu/transport/memory.py6
-rw-r--r--kombu/transport/mongodb.py2
-rw-r--r--kombu/transport/pyamqp.py3
-rw-r--r--kombu/transport/virtual/__init__.py2
-rw-r--r--kombu/transport/zmq.py2
8 files changed, 13 insertions, 12 deletions
diff --git a/kombu/transport/SLMQ.py b/kombu/transport/SLMQ.py
index 75d8f0b8..c9acc71a 100644
--- a/kombu/transport/SLMQ.py
+++ b/kombu/transport/SLMQ.py
@@ -10,13 +10,12 @@ from __future__ import absolute_import
import socket
import string
-from Queue import Empty
-
from anyjson import loads, dumps
import softlayer_messaging
import os
+from kombu.five import Empty, text_t
from kombu.utils import cached_property # , uuid
from kombu.utils.encoding import safe_str
@@ -54,7 +53,7 @@ class Channel(virtual.Channel):
def entity_name(self, name, table=CHARS_REPLACE_TABLE):
"""Format AMQP queue name into a valid SLQS queue name."""
- return unicode(safe_str(name)).translate(table)
+ return text_t(safe_str(name)).translate(table)
def _new_queue(self, queue, **kwargs):
"""Ensures a queue exists in SLQS."""
diff --git a/kombu/transport/amqplib.py b/kombu/transport/amqplib.py
index 99d4de65..00099694 100644
--- a/kombu/transport/amqplib.py
+++ b/kombu/transport/amqplib.py
@@ -331,7 +331,7 @@ class Transport(base.Transport):
def establish_connection(self):
"""Establish connection to the AMQP broker."""
conninfo = self.client
- for name, default_value in self.default_connection_params.items():
+ for name, default_value in items(self.default_connection_params):
if not getattr(conninfo, name, None):
setattr(conninfo, name, default_value)
if conninfo.hostname == 'localhost':
diff --git a/kombu/transport/librabbitmq.py b/kombu/transport/librabbitmq.py
index acfa33e7..e3211d65 100644
--- a/kombu/transport/librabbitmq.py
+++ b/kombu/transport/librabbitmq.py
@@ -22,6 +22,7 @@ except ImportError:
raise ImportError("No module named librabbitmq")
from kombu.exceptions import StdConnectionError, StdChannelError
+from kombu.five import items
from kombu.utils.amq_manager import get_manager
from . import base
@@ -95,7 +96,7 @@ class Transport(base.Transport):
def establish_connection(self):
"""Establish connection to the AMQP broker."""
conninfo = self.client
- for name, default_value in self.default_connection_params.items():
+ for name, default_value in items(self.default_connection_params):
if not getattr(conninfo, name, None):
setattr(conninfo, name, default_value)
conn = self.Connection(host=conninfo.host,
diff --git a/kombu/transport/memory.py b/kombu/transport/memory.py
index 514b361a..2b763726 100644
--- a/kombu/transport/memory.py
+++ b/kombu/transport/memory.py
@@ -7,7 +7,7 @@ In-memory transport.
"""
from __future__ import absolute_import
-from kombu.five import Queue
+from kombu.five import Queue, values
from . import virtual
@@ -36,7 +36,7 @@ class Channel(virtual.Channel):
pass
def _put_fanout(self, exchange, message, **kwargs):
- for queue in self.queues.values():
+ for queue in values(self.queues):
queue.put(message)
def _put(self, queue, message, **kwargs):
@@ -56,7 +56,7 @@ class Channel(virtual.Channel):
def close(self):
super(Channel, self).close()
- for queue in self.queues.values():
+ for queue in values(self.queues):
queue.empty()
self.queues = {}
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py
index 758c96c2..0a66f9d4 100644
--- a/kombu/transport/mongodb.py
+++ b/kombu/transport/mongodb.py
@@ -48,7 +48,7 @@ class Channel(virtual.Channel):
def _get(self, queue):
try:
if queue in self._fanout_queues:
- msg = self._queue_cursors[queue].next()
+ msg = next(self._queue_cursors[queue])
self._queue_readcounts[queue] += 1
return loads(msg['payload'])
else:
diff --git a/kombu/transport/pyamqp.py b/kombu/transport/pyamqp.py
index 06d6fc60..27fb81c8 100644
--- a/kombu/transport/pyamqp.py
+++ b/kombu/transport/pyamqp.py
@@ -14,6 +14,7 @@ from kombu.exceptions import (
StdChannelError,
VersionMismatch,
)
+from kombu.five import items
from kombu.utils.amq_manager import get_manager
from . import base
@@ -96,7 +97,7 @@ class Transport(base.Transport):
def establish_connection(self):
"""Establish connection to the AMQP broker."""
conninfo = self.client
- for name, default_value in self.default_connection_params.items():
+ for name, default_value in items(self.default_connection_params):
if not getattr(conninfo, name, None):
setattr(conninfo, name, default_value)
if conninfo.hostname == 'localhost':
diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py
index 62d684ae..74f60b6f 100644
--- a/kombu/transport/virtual/__init__.py
+++ b/kombu/transport/virtual/__init__.py
@@ -201,7 +201,7 @@ class QoS(object):
unrestored = self.restore_unacked()
if unrestored:
- errors, messages = zip(*unrestored)
+ errors, messages = list(zip(*unrestored))
say('UNABLE TO RESTORE {0} MESSAGES: {1}',
len(errors), errors)
emergency_dump_state(messages)
diff --git a/kombu/transport/zmq.py b/kombu/transport/zmq.py
index e7a78f97..315e571b 100644
--- a/kombu/transport/zmq.py
+++ b/kombu/transport/zmq.py
@@ -140,7 +140,7 @@ class Client(object):
return self.sink.recv(flags=zmq.NOBLOCK)
except ZMQError as exc:
if exc.errno == zmq.EAGAIN:
- raise socket.error(errno.EAGAIN, e.strerror)
+ raise socket.error(errno.EAGAIN, exc.strerror)
else:
raise