diff options
Diffstat (limited to 'kombu')
35 files changed, 77 insertions, 93 deletions
diff --git a/kombu/connection.py b/kombu/connection.py index 47857840..045ca7cb 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -28,6 +28,7 @@ from .log import get_logger from .transport import get_transport_cls, supports_librabbitmq from .utils import cached_property, retry_over_time, shufflecycle from .utils.compat import OrderedDict +from .utils.functional import promise from .utils.url import parse_url __all__ = ['Connection', 'ConnectionPool', 'ChannelPool'] diff --git a/kombu/five.py b/kombu/five.py index f5454a94..3107a71b 100644 --- a/kombu/five.py +++ b/kombu/five.py @@ -20,14 +20,14 @@ except NameError: # pragma: no cover from imp import reload # noqa try: - from UserList import UserList # noqa -except ImportError: # pragma: no cover from collections import UserList # noqa +except ImportError: # pragma: no cover + from UserList import UserList # noqa try: - from UserDict import UserDict # noqa -except ImportError: # pragma: no cover from collections import UserDict # noqa +except ImportError: # pragma: no cover + from UserDict import UserDict # noqa if PY3: diff --git a/kombu/tests/mocks.py b/kombu/tests/mocks.py index 4d38da56..836457eb 100644 --- a/kombu/tests/mocks.py +++ b/kombu/tests/mocks.py @@ -22,15 +22,15 @@ class Message(base.Message): class Channel(base.StdChannel): open = True throw_decode_error = False - _ids = count(1).next + _ids = count(1) def __init__(self, connection): self.connection = connection self.called = [] - self.deliveries = count(1).next + self.deliveries = count(1) self.to_deliver = [] self.events = {'basic_return': set()} - self.channel_id = self._ids() + self.channel_id = next(self._ids) def _called(self, name): self.called.append(name) @@ -105,7 +105,7 @@ class Channel(base.StdChannel): def message_to_python(self, message, *args, **kwargs): self._called('message_to_python') return Message(self, body=anyjson.dumps(message), - delivery_tag=self.deliveries(), + delivery_tag=next(self.deliveries), throw_decode_error=self.throw_decode_error, content_type='application/json', content_encoding='utf-8') diff --git a/kombu/tests/test_common.py b/kombu/tests/test_common.py index d315fdbb..c61a2076 100644 --- a/kombu/tests/test_common.py +++ b/kombu/tests/test_common.py @@ -1,5 +1,4 @@ from __future__ import absolute_import -from __future__ import with_statement import socket @@ -157,13 +156,13 @@ class test_replies(TestCase): body, message = Mock(), Mock() itermessages.return_value = [(body, message)] it = collect_replies(conn, channel, queue, no_ack=False) - m = it.next() + m = next(it) self.assertIs(m, body) itermessages.assert_called_with(conn, channel, queue, no_ack=False) message.ack.assert_called_with() with self.assertRaises(StopIteration): - it.next() + next(it) channel.after_reply_message_received.assert_called_with(queue.name) @@ -173,7 +172,7 @@ class test_replies(TestCase): body, message = Mock(), Mock() itermessages.return_value = [(body, message)] it = collect_replies(conn, channel, queue) - m = it.next() + m = next(it) self.assertIs(m, body) itermessages.assert_called_with(conn, channel, queue, no_ack=True) self.assertFalse(message.ack.called) @@ -184,7 +183,7 @@ class test_replies(TestCase): itermessages.return_value = [] it = collect_replies(conn, channel, queue) with self.assertRaises(StopIteration): - it.next() + next(it) self.assertFalse(channel.after_reply_message_received.called) @@ -318,11 +317,11 @@ class test_itermessages(TestCase): it = common.itermessages(conn, channel, 'q', limit=1, Consumer=MockConsumer) - ret = it.next() + ret = next(it) self.assertTupleEqual(ret, ('body', 'message')) with self.assertRaises(StopIteration): - it.next() + next(it) def test_when_raises_socket_timeout(self): conn = self.MockConnection() @@ -333,7 +332,7 @@ class test_itermessages(TestCase): Consumer=MockConsumer) with self.assertRaises(StopIteration): - it.next() + next(it) @patch('kombu.common.deque') def test_when_raises_IndexError(self, deque): @@ -345,7 +344,7 @@ class test_itermessages(TestCase): Consumer=MockConsumer) with self.assertRaises(StopIteration): - it.next() + next(it) class test_entry_to_queue(TestCase): diff --git a/kombu/tests/test_compat.py b/kombu/tests/test_compat.py index 7e80e21f..790e76ce 100644 --- a/kombu/tests/test_compat.py +++ b/kombu/tests/test_compat.py @@ -1,5 +1,4 @@ from __future__ import absolute_import -from __future__ import with_statement from mock import patch @@ -31,7 +30,7 @@ class test_misc(TestCase): conn = MyConnection() consumer = Consumer() it = compat._iterconsume(conn, consumer) - self.assertEqual(it.next(), 1) + self.assertEqual(next(it), 1) self.assertTrue(consumer.active) it2 = compat._iterconsume(conn, consumer, limit=10) @@ -243,7 +242,7 @@ class test_Consumer(TestCase): c = C(self.connection, queue=n, exchange=n, routing_key='rkey') - self.assertEqual(c.wait(10), range(10)) + self.assertEqual(c.wait(10), list(range(10))) c.close() def test_iterqueue(self, n='test_iterqueue'): @@ -258,7 +257,7 @@ class test_Consumer(TestCase): c = C(self.connection, queue=n, exchange=n, routing_key='rkey') - self.assertEqual(list(c.iterqueue(limit=10)), range(10)) + self.assertEqual(list(c.iterqueue(limit=10)), list(range(10))) c.close() diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py index f3c4120a..be195c64 100644 --- a/kombu/tests/test_connection.py +++ b/kombu/tests/test_connection.py @@ -1,5 +1,4 @@ from __future__ import absolute_import -from __future__ import with_statement import errno import pickle @@ -11,6 +10,7 @@ from nose import SkipTest from kombu import Connection, Consumer, Producer, parse_url from kombu.connection import Resource +from kombu.five import items from .mocks import Transport from .utils import TestCase @@ -60,7 +60,7 @@ class test_connection_utils(TestCase): def assert_info(self, conn, **fields): info = conn.info() - for field, expected in fields.iteritems(): + for field, expected in items(fields): self.assertEqual(info[field], expected) def test_rabbitmq_example_urls(self): diff --git a/kombu/tests/test_entities.py b/kombu/tests/test_entities.py index 8e89f84e..efd27e8c 100644 --- a/kombu/tests/test_entities.py +++ b/kombu/tests/test_entities.py @@ -1,5 +1,4 @@ from __future__ import absolute_import -from __future__ import with_statement import pickle diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py index 0fb8a657..296b02a2 100644 --- a/kombu/tests/test_messaging.py +++ b/kombu/tests/test_messaging.py @@ -1,5 +1,5 @@ from __future__ import absolute_import -from __future__ import with_statement +from __future__ import unicode_literals import anyjson import pickle @@ -68,7 +68,7 @@ class test_Producer(TestCase): 'p.declare() declares exchange') def test_prepare(self): - message = {u'the quick brown fox': u'jumps over the lazy dog'} + message = {'the quick brown fox': 'jumps over the lazy dog'} channel = self.connection.channel() p = Producer(channel, self.exchange, serializer='json') m, ctype, cencoding = p._prepare(message, headers={}) @@ -77,7 +77,7 @@ class test_Producer(TestCase): self.assertEqual(cencoding, 'utf-8') def test_prepare_compression(self): - message = {u'the quick brown fox': u'jumps over the lazy dog'} + message = {'the quick brown fox': 'jumps over the lazy dog'} channel = self.connection.channel() p = Producer(channel, self.exchange, serializer='json') headers = {} @@ -107,7 +107,7 @@ class test_Producer(TestCase): self.assertEqual(cencoding, 'alien') def test_prepare_is_already_unicode(self): - message = u'the quick brown fox' + message = 'the quick brown fox' channel = self.connection.channel() p = Producer(channel, self.exchange, serializer='json') m, ctype, cencoding = p._prepare(message, content_type='text/plain') @@ -177,7 +177,7 @@ class test_Producer(TestCase): def test_publish(self): channel = self.connection.channel() p = Producer(channel, self.exchange, serializer='json') - message = {u'the quick brown fox': u'jumps over the lazy dog'} + message = {'the quick brown fox': 'jumps over the lazy dog'} ret = p.publish(message, routing_key='process') self.assertIn('prepare_message', channel) self.assertIn('basic_publish', channel) @@ -409,11 +409,11 @@ class test_Consumer(TestCase): message.payload # trigger cache consumer.register_callback(callback) - consumer._receive_callback({u'foo': u'bar'}) + consumer._receive_callback({'foo': 'bar'}) self.assertIn('basic_ack', channel) self.assertIn('message_to_python', channel) - self.assertEqual(received[0], {u'foo': u'bar'}) + self.assertEqual(received[0], {'foo': 'bar'}) def test_basic_ack_twice(self): channel = self.connection.channel() diff --git a/kombu/tests/test_pidbox.py b/kombu/tests/test_pidbox.py index d250eedb..28512023 100644 --- a/kombu/tests/test_pidbox.py +++ b/kombu/tests/test_pidbox.py @@ -1,5 +1,4 @@ from __future__ import absolute_import -from __future__ import with_statement import socket diff --git a/kombu/tests/test_pools.py b/kombu/tests/test_pools.py index 5bdde622..a9386602 100644 --- a/kombu/tests/test_pools.py +++ b/kombu/tests/test_pools.py @@ -1,5 +1,4 @@ from __future__ import absolute_import -from __future__ import with_statement from kombu import Connection, Producer from kombu import pools diff --git a/kombu/tests/test_serialization.py b/kombu/tests/test_serialization.py index de7ed343..79e5fc02 100644 --- a/kombu/tests/test_serialization.py +++ b/kombu/tests/test_serialization.py @@ -1,7 +1,7 @@ #!/usr/bin/python # -*- coding: utf-8 -*- from __future__ import absolute_import -from __future__ import with_statement +from __future__ import unicode_literals import sys @@ -14,9 +14,9 @@ from .utils import TestCase from .utils import mask_modules, skip_if_not_module # For content_encoding tests -unicode_string = u'abcdé\u8463' +unicode_string = 'abcdé\u8463' unicode_string_as_utf8 = unicode_string.encode('utf-8') -latin_string = u'abcdé' +latin_string = 'abcdé' latin_string_as_latin1 = latin_string.encode('latin-1') latin_string_as_utf8 = latin_string.encode('utf-8') @@ -26,7 +26,7 @@ py_data = { 'string': 'The quick brown fox jumps over the lazy dog', 'int': 10, 'float': 3.14159265, - 'unicode': u'Thé quick brown fox jumps over thé lazy dog', + 'unicode': 'Thé quick brown fox jumps over thé lazy dog', 'list': ['george', 'jerry', 'elaine', 'cosmo'], } diff --git a/kombu/tests/test_simple.py b/kombu/tests/test_simple.py index 26b09080..381d8a20 100644 --- a/kombu/tests/test_simple.py +++ b/kombu/tests/test_simple.py @@ -1,9 +1,7 @@ from __future__ import absolute_import -from __future__ import with_statement - -from Queue import Empty from kombu import Connection, Exchange, Queue +from kombu.five import Empty from .utils import TestCase from .utils import Mock diff --git a/kombu/tests/test_utils.py b/kombu/tests/test_utils.py index 298fc2bc..9dda129c 100644 --- a/kombu/tests/test_utils.py +++ b/kombu/tests/test_utils.py @@ -1,5 +1,5 @@ from __future__ import absolute_import -from __future__ import with_statement +from __future__ import unicode_literals import pickle import sys @@ -13,6 +13,7 @@ else: from StringIO import StringIO, StringIO as BytesIO # noqa from kombu import utils +from kombu.five import string_t from kombu.utils.compat import next from .utils import ( @@ -62,7 +63,7 @@ class test_utils(TestCase): [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]) def test_reprkwargs(self): - self.assertTrue(utils.reprkwargs({'foo': 'bar', 1: 2, u'k': 'v'})) + self.assertTrue(utils.reprkwargs({'foo': 'bar', 1: 2, 'k': 'v'})) def test_reprcall(self): self.assertTrue( @@ -93,7 +94,7 @@ class test_UUID(TestCase): self.assertIsNone(ctypes) tid = uuid() self.assertTrue(tid) - self.assertIsInstance(tid, basestring) + self.assertIsInstance(tid, string_t) try: with_ctypes_masked() @@ -108,8 +109,8 @@ class test_Misc(TestCase): def f(**kwargs): return kwargs - kw = {u'foo': 'foo', - u'bar': 'bar'} + kw = {'foo': 'foo', + 'bar': 'bar'} self.assertTrue(f(**utils.kwdict(kw))) @@ -193,11 +194,11 @@ class test_retry_over_time(TestCase): def test_simple(self): prev_count, utils.count = utils.count, Mock() try: - utils.count.return_value = range(1) + utils.count.return_value = list(range(1)) x = utils.retry_over_time(self.myfun, self.Predicate, errback=None, interval_max=14) self.assertIsNone(x) - utils.count.return_value = range(10) + utils.count.return_value = list(range(10)) cb = Mock() x = utils.retry_over_time(self.myfun, self.Predicate, errback=self.errback, callback=cb, @@ -346,16 +347,16 @@ class test_shufflecycle(TestCase): def test_shuffles(self): prev_repeat, utils.repeat = utils.repeat, Mock() try: - utils.repeat.return_value = range(10) + utils.repeat.return_value = list(range(10)) values = set(['A', 'B', 'C']) cycle = utils.shufflecycle(values) seen = set() - for i in xrange(10): - cycle.next() + for i in range(10): + next(cycle) utils.repeat.assert_called_with(None) self.assertTrue(seen.issubset(values)) with self.assertRaises(StopIteration): - cycle.next() - cycle.next() + next(cycle) + next(cycle) finally: utils.repeat = prev_repeat diff --git a/kombu/tests/transport/test_base.py b/kombu/tests/transport/test_base.py index c34871d7..2ba7156a 100644 --- a/kombu/tests/transport/test_base.py +++ b/kombu/tests/transport/test_base.py @@ -1,5 +1,4 @@ from __future__ import absolute_import -from __future__ import with_statement from kombu import Connection, Consumer, Producer, Queue from kombu.transport.base import Message, StdChannel, Transport diff --git a/kombu/tests/transport/test_filesystem.py b/kombu/tests/transport/test_filesystem.py index b1d7c0cd..7bdad8f5 100644 --- a/kombu/tests/transport/test_filesystem.py +++ b/kombu/tests/transport/test_filesystem.py @@ -1,5 +1,4 @@ from __future__ import absolute_import -from __future__ import with_statement import tempfile diff --git a/kombu/tests/transport/test_memory.py b/kombu/tests/transport/test_memory.py index d138e494..970681d7 100644 --- a/kombu/tests/transport/test_memory.py +++ b/kombu/tests/transport/test_memory.py @@ -1,5 +1,4 @@ from __future__ import absolute_import -from __future__ import with_statement import socket diff --git a/kombu/tests/transport/test_pyamqp.py b/kombu/tests/transport/test_pyamqp.py index bd4e86bf..33c813c3 100644 --- a/kombu/tests/transport/test_pyamqp.py +++ b/kombu/tests/transport/test_pyamqp.py @@ -1,5 +1,4 @@ from __future__ import absolute_import -from __future__ import with_statement import sys diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py index 90e3a318..71ffaf7f 100644 --- a/kombu/tests/transport/test_redis.py +++ b/kombu/tests/transport/test_redis.py @@ -1,5 +1,4 @@ from __future__ import absolute_import -from __future__ import with_statement import socket import types @@ -7,10 +6,10 @@ import types from anyjson import dumps from collections import defaultdict from itertools import count -from Queue import Empty, Queue as _Queue from kombu import Connection, Exchange, Queue, Consumer, Producer from kombu.exceptions import InconsistencyError, VersionMismatch +from kombu.five import Empty, Queue as _Queue from kombu.utils import eventio # patch poll from kombu.tests.utils import TestCase @@ -129,10 +128,10 @@ class Client(object): class _socket(object): blocking = True - next_fileno = count(30).next + filenos = count(30) def __init__(self, *args): - self._fileno = self.next_fileno() + self._fileno = next(self.filenos()) self.data = [] def fileno(self): diff --git a/kombu/tests/transport/test_sqlalchemy.py b/kombu/tests/transport/test_sqlalchemy.py index 27cc4323..f3d7d239 100644 --- a/kombu/tests/transport/test_sqlalchemy.py +++ b/kombu/tests/transport/test_sqlalchemy.py @@ -1,5 +1,4 @@ from __future__ import absolute_import -from __future__ import with_statement from mock import patch from nose import SkipTest diff --git a/kombu/tests/transport/test_transport.py b/kombu/tests/transport/test_transport.py index 11cdcbe3..608b9767 100644 --- a/kombu/tests/transport/test_transport.py +++ b/kombu/tests/transport/test_transport.py @@ -1,5 +1,4 @@ from __future__ import absolute_import -from __future__ import with_statement from mock import patch diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py index e219198f..8b6cff68 100644 --- a/kombu/tests/transport/virtual/test_base.py +++ b/kombu/tests/transport/virtual/test_base.py @@ -1,5 +1,4 @@ from __future__ import absolute_import -from __future__ import with_statement import warnings @@ -67,7 +66,7 @@ class test_QoS(TestCase): self.q.append(i + 1, uuid()) self.assertFalse(self.q.can_consume()) - tag1 = iter(self.q._delivered).next() + tag1 = next(iter(self.q._delivered)) self.q.ack(tag1) self.assertTrue(self.q.can_consume()) diff --git a/kombu/tests/transport/virtual/test_exchange.py b/kombu/tests/transport/virtual/test_exchange.py index 90127ba0..3c58e10c 100644 --- a/kombu/tests/transport/virtual/test_exchange.py +++ b/kombu/tests/transport/virtual/test_exchange.py @@ -1,5 +1,4 @@ from __future__ import absolute_import -from __future__ import with_statement from kombu import Connection from kombu.transport.virtual import exchange diff --git a/kombu/tests/transport/virtual/test_scheduling.py b/kombu/tests/transport/virtual/test_scheduling.py index 6f2d24f1..1a6c32bc 100644 --- a/kombu/tests/transport/virtual/test_scheduling.py +++ b/kombu/tests/transport/virtual/test_scheduling.py @@ -1,5 +1,4 @@ from __future__ import absolute_import -from __future__ import with_statement from kombu.transport.virtual.scheduling import FairCycle diff --git a/kombu/tests/utilities/test_amq_manager.py b/kombu/tests/utilities/test_amq_manager.py index ccf4ec08..e030810e 100644 --- a/kombu/tests/utilities/test_amq_manager.py +++ b/kombu/tests/utilities/test_amq_manager.py @@ -1,5 +1,4 @@ from __future__ import absolute_import -from __future__ import with_statement from mock import patch diff --git a/kombu/tests/utilities/test_debug.py b/kombu/tests/utilities/test_debug.py index b36f9fcf..78c9f029 100644 --- a/kombu/tests/utilities/test_debug.py +++ b/kombu/tests/utilities/test_debug.py @@ -1,5 +1,4 @@ from __future__ import absolute_import -from __future__ import with_statement import logging diff --git a/kombu/tests/utilities/test_encoding.py b/kombu/tests/utilities/test_encoding.py index b2942b67..c07bf315 100644 --- a/kombu/tests/utilities/test_encoding.py +++ b/kombu/tests/utilities/test_encoding.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- from __future__ import absolute_import -from __future__ import with_statement +from __future__ import unicode_literals import sys @@ -8,6 +8,7 @@ from contextlib import contextmanager from mock import patch from nose import SkipTest +from kombu.five import bytes_t, string_t from kombu.utils.encoding import safe_str from kombu.tests.utils import TestCase @@ -46,12 +47,11 @@ class test_encoding_utils(TestCase): def test_str_to_bytes(self): with clean_encoding() as e: - self.assertIsInstance(e.str_to_bytes(u'foobar'), str) - self.assertIsInstance(e.str_to_bytes('foobar'), str) + self.assertIsInstance(e.str_to_bytes('foobar'), bytes_t) def test_from_utf8(self): with clean_encoding() as e: - self.assertIsInstance(e.from_utf8(u'foobar'), str) + self.assertIsInstance(e.from_utf8('foobar'), bytes_t) def test_default_encode(self): with clean_encoding() as e: @@ -64,12 +64,12 @@ class test_safe_str(TestCase): self.assertEqual(safe_str('foo'), 'foo') def test_when_unicode(self): - self.assertIsInstance(safe_str(u'foo'), str) + self.assertIsInstance(safe_str('foo'), string_t) def test_when_containing_high_chars(self): - s = u'The quiæk fåx jømps øver the lazy dåg' + s = 'The quiæk fåx jømps øver the lazy dåg' res = safe_str(s) - self.assertIsInstance(res, str) + self.assertIsInstance(res, string_t) def test_when_not_string(self): o = object() diff --git a/kombu/tests/utils.py b/kombu/tests/utils.py index a8f92a37..e568e500 100644 --- a/kombu/tests/utils.py +++ b/kombu/tests/utils.py @@ -1,17 +1,17 @@ from __future__ import absolute_import -import __builtin__ import os import sys import types from functools import wraps -from StringIO import StringIO import mock from nose import SkipTest +from kombu.five import builtins, string_t, StringIO + try: import unittest unittest.skip @@ -76,7 +76,7 @@ def module_exists(*modules): @wraps(fun) def __inner(*args, **kwargs): for module in modules: - if isinstance(module, basestring): + if isinstance(module, string_t): module = types.ModuleType(module) sys.modules[module.__name__] = module try: @@ -95,7 +95,7 @@ def mask_modules(*modnames): @wraps(fun) def __inner(*args, **kwargs): - realimport = __builtin__.__import__ + realimport = builtins.__import__ def myimp(name, *args, **kwargs): if name in modnames: @@ -103,11 +103,11 @@ def mask_modules(*modnames): else: return realimport(name, *args, **kwargs) - __builtin__.__import__ = myimp + builtins.__import__ = myimp try: return fun(*args, **kwargs) finally: - __builtin__.__import__ = realimport + builtins.__import__ = realimport return __inner return _inner diff --git a/kombu/transport/SLMQ.py b/kombu/transport/SLMQ.py index 75d8f0b8..c9acc71a 100644 --- a/kombu/transport/SLMQ.py +++ b/kombu/transport/SLMQ.py @@ -10,13 +10,12 @@ from __future__ import absolute_import import socket import string -from Queue import Empty - from anyjson import loads, dumps import softlayer_messaging import os +from kombu.five import Empty, text_t from kombu.utils import cached_property # , uuid from kombu.utils.encoding import safe_str @@ -54,7 +53,7 @@ class Channel(virtual.Channel): def entity_name(self, name, table=CHARS_REPLACE_TABLE): """Format AMQP queue name into a valid SLQS queue name.""" - return unicode(safe_str(name)).translate(table) + return text_t(safe_str(name)).translate(table) def _new_queue(self, queue, **kwargs): """Ensures a queue exists in SLQS.""" diff --git a/kombu/transport/amqplib.py b/kombu/transport/amqplib.py index 99d4de65..00099694 100644 --- a/kombu/transport/amqplib.py +++ b/kombu/transport/amqplib.py @@ -331,7 +331,7 @@ class Transport(base.Transport): def establish_connection(self): """Establish connection to the AMQP broker.""" conninfo = self.client - for name, default_value in self.default_connection_params.items(): + for name, default_value in items(self.default_connection_params): if not getattr(conninfo, name, None): setattr(conninfo, name, default_value) if conninfo.hostname == 'localhost': diff --git a/kombu/transport/librabbitmq.py b/kombu/transport/librabbitmq.py index acfa33e7..e3211d65 100644 --- a/kombu/transport/librabbitmq.py +++ b/kombu/transport/librabbitmq.py @@ -22,6 +22,7 @@ except ImportError: raise ImportError("No module named librabbitmq") from kombu.exceptions import StdConnectionError, StdChannelError +from kombu.five import items from kombu.utils.amq_manager import get_manager from . import base @@ -95,7 +96,7 @@ class Transport(base.Transport): def establish_connection(self): """Establish connection to the AMQP broker.""" conninfo = self.client - for name, default_value in self.default_connection_params.items(): + for name, default_value in items(self.default_connection_params): if not getattr(conninfo, name, None): setattr(conninfo, name, default_value) conn = self.Connection(host=conninfo.host, diff --git a/kombu/transport/memory.py b/kombu/transport/memory.py index 514b361a..2b763726 100644 --- a/kombu/transport/memory.py +++ b/kombu/transport/memory.py @@ -7,7 +7,7 @@ In-memory transport. """ from __future__ import absolute_import -from kombu.five import Queue +from kombu.five import Queue, values from . import virtual @@ -36,7 +36,7 @@ class Channel(virtual.Channel): pass def _put_fanout(self, exchange, message, **kwargs): - for queue in self.queues.values(): + for queue in values(self.queues): queue.put(message) def _put(self, queue, message, **kwargs): @@ -56,7 +56,7 @@ class Channel(virtual.Channel): def close(self): super(Channel, self).close() - for queue in self.queues.values(): + for queue in values(self.queues): queue.empty() self.queues = {} diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index 758c96c2..0a66f9d4 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -48,7 +48,7 @@ class Channel(virtual.Channel): def _get(self, queue): try: if queue in self._fanout_queues: - msg = self._queue_cursors[queue].next() + msg = next(self._queue_cursors[queue]) self._queue_readcounts[queue] += 1 return loads(msg['payload']) else: diff --git a/kombu/transport/pyamqp.py b/kombu/transport/pyamqp.py index 06d6fc60..27fb81c8 100644 --- a/kombu/transport/pyamqp.py +++ b/kombu/transport/pyamqp.py @@ -14,6 +14,7 @@ from kombu.exceptions import ( StdChannelError, VersionMismatch, ) +from kombu.five import items from kombu.utils.amq_manager import get_manager from . import base @@ -96,7 +97,7 @@ class Transport(base.Transport): def establish_connection(self): """Establish connection to the AMQP broker.""" conninfo = self.client - for name, default_value in self.default_connection_params.items(): + for name, default_value in items(self.default_connection_params): if not getattr(conninfo, name, None): setattr(conninfo, name, default_value) if conninfo.hostname == 'localhost': diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index 62d684ae..74f60b6f 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -201,7 +201,7 @@ class QoS(object): unrestored = self.restore_unacked() if unrestored: - errors, messages = zip(*unrestored) + errors, messages = list(zip(*unrestored)) say('UNABLE TO RESTORE {0} MESSAGES: {1}', len(errors), errors) emergency_dump_state(messages) diff --git a/kombu/transport/zmq.py b/kombu/transport/zmq.py index e7a78f97..315e571b 100644 --- a/kombu/transport/zmq.py +++ b/kombu/transport/zmq.py @@ -140,7 +140,7 @@ class Client(object): return self.sink.recv(flags=zmq.NOBLOCK) except ZMQError as exc: if exc.errno == zmq.EAGAIN: - raise socket.error(errno.EAGAIN, e.strerror) + raise socket.error(errno.EAGAIN, exc.strerror) else: raise |