summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--AUTHORS1
-rw-r--r--docs/_ext/applyxrefs.py4
-rw-r--r--docs/_ext/literals_to_xrefs.py6
-rw-r--r--docs/conf.py4
-rw-r--r--examples/simple_task_queue/client.py9
-rwxr-xr-xextra/release/bump_version.py2
-rwxr-xr-xextra/release/flakeplus.py2
-rw-r--r--funtests/setup.py2
-rw-r--r--funtests/transport.py18
-rw-r--r--kombu/messaging.py5
-rw-r--r--kombu/tests/__init__.py5
-rw-r--r--kombu/tests/mocks.py4
-rw-r--r--kombu/tests/test_common.py17
-rw-r--r--kombu/tests/test_compat.py7
-rw-r--r--kombu/tests/test_connection.py8
-rw-r--r--kombu/tests/test_entities.py9
-rw-r--r--kombu/tests/test_messaging.py20
-rw-r--r--kombu/tests/test_pidbox.py9
-rw-r--r--kombu/tests/test_pools.py2
-rw-r--r--kombu/tests/test_serialization.py151
-rw-r--r--kombu/tests/test_simple.py4
-rw-r--r--kombu/tests/test_utils.py42
-rw-r--r--kombu/tests/transport/test_amqplib.py5
-rw-r--r--kombu/tests/transport/test_base.py1
-rw-r--r--kombu/tests/transport/test_filesystem.py1
-rw-r--r--kombu/tests/transport/test_memory.py1
-rw-r--r--kombu/tests/transport/test_pyamqp.py6
-rw-r--r--kombu/tests/transport/test_redis.py36
-rw-r--r--kombu/tests/transport/test_sqlalchemy.py1
-rw-r--r--kombu/tests/transport/test_transport.py1
-rw-r--r--kombu/tests/transport/virtual/test_base.py15
-rw-r--r--kombu/tests/transport/virtual/test_exchange.py29
-rw-r--r--kombu/tests/transport/virtual/test_scheduling.py1
-rw-r--r--kombu/tests/utilities/test_amq_manager.py1
-rw-r--r--kombu/tests/utilities/test_debug.py1
-rw-r--r--kombu/tests/utilities/test_encoding.py13
-rw-r--r--kombu/tests/utils.py22
-rw-r--r--kombu/transport/SQS.py7
-rw-r--r--kombu/transport/virtual/__init__.py11
-rw-r--r--kombu/utils/amq_manager.py2
-rw-r--r--pavement.py9
-rw-r--r--setup.py5
42 files changed, 292 insertions, 207 deletions
diff --git a/AUTHORS b/AUTHORS
index cc72758a..25214355 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -5,6 +5,7 @@
Adam Nelson <adam@varud.com>
Adam Wentz
+Alex Koshelev <daevaorn@gmail.com>
Alexandre Bourget <alexandre.bourget@savoirfairelinux.com>
Andrew Watts
Andrey Antukh <niwi@niwi.be>
diff --git a/docs/_ext/applyxrefs.py b/docs/_ext/applyxrefs.py
index 027490b8..93222a33 100644
--- a/docs/_ext/applyxrefs.py
+++ b/docs/_ext/applyxrefs.py
@@ -5,9 +5,7 @@ import os
testing = False
-DONT_TOUCH = (
- './index.txt',
- )
+DONT_TOUCH = ('./index.txt', )
def target_name(fn):
diff --git a/docs/_ext/literals_to_xrefs.py b/docs/_ext/literals_to_xrefs.py
index e9dc7ca0..f1497565 100644
--- a/docs/_ext/literals_to_xrefs.py
+++ b/docs/_ext/literals_to_xrefs.py
@@ -95,8 +95,10 @@ def fixliterals(fname):
replace_type in ("class", "func", "meth"):
default = default[:-2]
replace_value = raw_input(
- colorize("Text <target> [", fg="yellow") + default + \
- colorize("]: ", fg="yellow")).strip()
+ colorize("Text <target> [", fg="yellow") +
+ default +
+ colorize("]: ", fg="yellow"),
+ ).strip()
if not replace_value:
replace_value = default
new.append(":%s:`%s`" % (replace_type, replace_value))
diff --git a/docs/conf.py b/docs/conf.py
index 27534746..bf39ded2 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -62,8 +62,8 @@ html_use_modindex = True
html_use_index = True
latex_documents = [
- ('index', 'Kombu.tex', ur'Kombu Documentation',
- ur'Ask Solem', 'manual'),
+ ('index', 'Kombu.tex', u'Kombu Documentation',
+ u'Ask Solem', 'manual'),
]
html_theme = "celery"
diff --git a/examples/simple_task_queue/client.py b/examples/simple_task_queue/client.py
index 8ed33f36..8cad069e 100644
--- a/examples/simple_task_queue/client.py
+++ b/examples/simple_task_queue/client.py
@@ -14,10 +14,11 @@ def send_as_task(connection, fun, args=(), kwargs={}, priority='mid'):
with producers[connection].acquire(block=True) as producer:
maybe_declare(task_exchange, producer.channel)
- producer.publish(payload, serializer='pickle',
- compression='bzip2',
- exchange=task_exchange,
- routing_key=routing_key)
+ producer.publish(payload,
+ serializer='pickle',
+ compression='bzip2',
+ exchange=task_exchange,
+ routing_key=routing_key)
if __name__ == '__main__':
from kombu import Connection
diff --git a/extra/release/bump_version.py b/extra/release/bump_version.py
index abda578e..24d0ebcb 100755
--- a/extra/release/bump_version.py
+++ b/extra/release/bump_version.py
@@ -147,7 +147,7 @@ def bump(*files, **kwargs):
v.write(next)
print(cmd("git", "commit", "-m", "Bumps version to %s" % (to_str(next), ),
- *[f.filename for f in files]))
+ *[f.filename for f in files]))
print(cmd("git", "tag", "v%s" % (to_str(next), )))
diff --git a/extra/release/flakeplus.py b/extra/release/flakeplus.py
index 6fe1f1fc..4e1efd47 100755
--- a/extra/release/flakeplus.py
+++ b/extra/release/flakeplus.py
@@ -37,7 +37,7 @@ class FlakePP(object):
re_with = compile(RE_WITH)
re_noqa = compile(RE_NOQA)
map = {"abs": True, "print": False,
- "with": False, "with-used": False}
+ "with": False, "with-used": False}
def __init__(self, verbose=False):
self.verbose = verbose
diff --git a/funtests/setup.py b/funtests/setup.py
index fdb7a98c..a7ad8b4e 100644
--- a/funtests/setup.py
+++ b/funtests/setup.py
@@ -8,7 +8,7 @@ except ImportError:
from ez_setup import use_setuptools
use_setuptools()
from setuptools import setup # noqa
- from setuptools.command.install import install # noqa
+ from setuptools.command.install import install # noqa
class no_install(install):
diff --git a/funtests/transport.py b/funtests/transport.py
index 5924a72a..f6ae1e17 100644
--- a/funtests/transport.py
+++ b/funtests/transport.py
@@ -40,7 +40,7 @@ def consumeN(conn, consumer, n=1, timeout=30):
except socket.timeout:
seconds += 1
msg = "Received %s/%s messages. %s seconds passed." % (
- len(messages), n, seconds)
+ len(messages), n, seconds)
if seconds >= timeout:
raise socket.timeout(msg)
if seconds > 1:
@@ -113,7 +113,7 @@ class TransportCase(unittest.TestCase):
self.after_connect(self.connection)
except self.connection.connection_errors:
self.skip_test_reason = "%s transport can't connect" % (
- self.transport, )
+ self.transport, )
else:
self.connected = True
@@ -161,14 +161,15 @@ class TransportCase(unittest.TestCase):
return _digest(data).hexdigest()
@skip_if_quick
- def test_produce__consume_large_messages(self, bytes=1048576, n=10,
+ def test_produce__consume_large_messages(
+ self, bytes=1048576, n=10,
charset=string.punctuation + string.letters + string.digits):
if not self.verify_alive():
return
bytes = min(filter(None, [bytes, self.message_size_limit]))
messages = ["".join(random.choice(charset)
- for j in xrange(bytes)) + "--%s" % n
- for i in xrange(n)]
+ for j in xrange(bytes)) + "--%s" % n
+ for i in xrange(n)]
digests = []
chan1 = self.connection.channel()
consumer = chan1.Consumer(self.queue)
@@ -180,7 +181,7 @@ class TransportCase(unittest.TestCase):
digests.append(self._digest(message))
received = [(msg["i"], msg["text"])
- for msg in consumeN(self.connection, consumer, n)]
+ for msg in consumeN(self.connection, consumer, n)]
self.assertEqual(len(received), n)
ordering = [i for i, _ in received]
if ordering != range(n) and not self.suppress_disorder_warning:
@@ -229,8 +230,9 @@ class TransportCase(unittest.TestCase):
chan = self.connection.channel()
self.purge([self.queue.name])
consumer = chan.Consumer(self.queue)
- self.assertRaises(socket.timeout, self.connection.drain_events,
- timeout=0.3)
+ self.assertRaises(
+ socket.timeout, self.connection.drain_events, timeout=0.3,
+ )
consumer.cancel()
chan.close()
diff --git a/kombu/messaging.py b/kombu/messaging.py
index b3fba4bc..e39625a3 100644
--- a/kombu/messaging.py
+++ b/kombu/messaging.py
@@ -380,7 +380,10 @@ class Consumer(object):
return self
def __exit__(self, *exc_info):
- self.cancel()
+ try:
+ self.cancel()
+ except Exception:
+ pass
def add_queue(self, queue):
queue = queue(self.channel)
diff --git a/kombu/tests/__init__.py b/kombu/tests/__init__.py
index f6db3faa..6a13a750 100644
--- a/kombu/tests/__init__.py
+++ b/kombu/tests/__init__.py
@@ -1,4 +1,4 @@
-from __future__ import absolute_import, print_function
+from __future__ import absolute_import
import anyjson
import os
@@ -33,8 +33,7 @@ def find_distribution_modules(name=__name__, file=__file__):
def import_all_modules(name=__name__, file=__file__, skip=[]):
for module in find_distribution_modules(name, file):
if module not in skip:
- print('preimporting {0} for coverage...'.format(module),
- file=sys.stderr)
+ print('preimporting %r for coverage...' % (module, ))
try:
__import__(module)
except (ImportError, VersionMismatch, AttributeError):
diff --git a/kombu/tests/mocks.py b/kombu/tests/mocks.py
index 5366226a..4d38da56 100644
--- a/kombu/tests/mocks.py
+++ b/kombu/tests/mocks.py
@@ -27,7 +27,7 @@ class Channel(base.StdChannel):
def __init__(self, connection):
self.connection = connection
self.called = []
- self.deliveries = count(1)
+ self.deliveries = count(1).next
self.to_deliver = []
self.events = {'basic_return': set()}
self.channel_id = self._ids()
@@ -105,7 +105,7 @@ class Channel(base.StdChannel):
def message_to_python(self, message, *args, **kwargs):
self._called('message_to_python')
return Message(self, body=anyjson.dumps(message),
- delivery_tag=next(self.deliveries),
+ delivery_tag=self.deliveries(),
throw_decode_error=self.throw_decode_error,
content_type='application/json',
content_encoding='utf-8')
diff --git a/kombu/tests/test_common.py b/kombu/tests/test_common.py
index 6f1311d3..978687e3 100644
--- a/kombu/tests/test_common.py
+++ b/kombu/tests/test_common.py
@@ -1,4 +1,5 @@
from __future__ import absolute_import
+from __future__ import with_statement
import socket
@@ -156,13 +157,13 @@ class test_replies(TestCase):
body, message = Mock(), Mock()
itermessages.return_value = [(body, message)]
it = collect_replies(conn, channel, queue, no_ack=False)
- m = next(it)
+ m = it.next()
self.assertIs(m, body)
itermessages.assert_called_with(conn, channel, queue, no_ack=False)
message.ack.assert_called_with()
with self.assertRaises(StopIteration):
- next(it)
+ it.next()
channel.after_reply_message_received.assert_called_with(queue.name)
@@ -172,7 +173,7 @@ class test_replies(TestCase):
body, message = Mock(), Mock()
itermessages.return_value = [(body, message)]
it = collect_replies(conn, channel, queue)
- m = next(it)
+ m = it.next()
self.assertIs(m, body)
itermessages.assert_called_with(conn, channel, queue, no_ack=True)
self.assertFalse(message.ack.called)
@@ -183,7 +184,7 @@ class test_replies(TestCase):
itermessages.return_value = []
it = collect_replies(conn, channel, queue)
with self.assertRaises(StopIteration):
- next(it)
+ it.next()
self.assertFalse(channel.after_reply_message_received.called)
@@ -317,11 +318,11 @@ class test_itermessages(TestCase):
it = common.itermessages(conn, channel, 'q', limit=1,
Consumer=MockConsumer)
- ret = next(it)
+ ret = it.next()
self.assertTupleEqual(ret, ('body', 'message'))
with self.assertRaises(StopIteration):
- next(it)
+ it.next()
def test_when_raises_socket_timeout(self):
conn = self.MockConnection()
@@ -332,7 +333,7 @@ class test_itermessages(TestCase):
Consumer=MockConsumer)
with self.assertRaises(StopIteration):
- next(it)
+ it.next()
@patch('kombu.common.deque')
def test_when_raises_IndexError(self, deque):
@@ -344,7 +345,7 @@ class test_itermessages(TestCase):
Consumer=MockConsumer)
with self.assertRaises(StopIteration):
- next(it)
+ it.next()
class test_entry_to_queue(TestCase):
diff --git a/kombu/tests/test_compat.py b/kombu/tests/test_compat.py
index 790e76ce..7e80e21f 100644
--- a/kombu/tests/test_compat.py
+++ b/kombu/tests/test_compat.py
@@ -1,4 +1,5 @@
from __future__ import absolute_import
+from __future__ import with_statement
from mock import patch
@@ -30,7 +31,7 @@ class test_misc(TestCase):
conn = MyConnection()
consumer = Consumer()
it = compat._iterconsume(conn, consumer)
- self.assertEqual(next(it), 1)
+ self.assertEqual(it.next(), 1)
self.assertTrue(consumer.active)
it2 = compat._iterconsume(conn, consumer, limit=10)
@@ -242,7 +243,7 @@ class test_Consumer(TestCase):
c = C(self.connection,
queue=n, exchange=n, routing_key='rkey')
- self.assertEqual(c.wait(10), list(range(10)))
+ self.assertEqual(c.wait(10), range(10))
c.close()
def test_iterqueue(self, n='test_iterqueue'):
@@ -257,7 +258,7 @@ class test_Consumer(TestCase):
c = C(self.connection,
queue=n, exchange=n, routing_key='rkey')
- self.assertEqual(list(c.iterqueue(limit=10)), list(range(10)))
+ self.assertEqual(list(c.iterqueue(limit=10)), range(10))
c.close()
diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py
index d0aac2ed..f3c4120a 100644
--- a/kombu/tests/test_connection.py
+++ b/kombu/tests/test_connection.py
@@ -1,4 +1,5 @@
from __future__ import absolute_import
+from __future__ import with_statement
import errno
import pickle
@@ -10,7 +11,6 @@ from nose import SkipTest
from kombu import Connection, Consumer, Producer, parse_url
from kombu.connection import Resource
-from kombu.five import items
from .mocks import Transport
from .utils import TestCase
@@ -43,7 +43,7 @@ class test_connection_utils(TestCase):
def test_parse_generated_as_uri(self):
conn = Connection(self.url)
info = conn.info()
- for k, v in items(self.expected):
+ for k, v in self.expected.items():
self.assertEqual(info[k], v)
# by default almost the same- no password
self.assertEqual(conn.as_uri(), self.nopass)
@@ -60,7 +60,7 @@ class test_connection_utils(TestCase):
def assert_info(self, conn, **fields):
info = conn.info()
- for field, expected in items(fields):
+ for field, expected in fields.iteritems():
self.assertEqual(info[field], expected)
def test_rabbitmq_example_urls(self):
@@ -505,7 +505,7 @@ class ResourceCase(TestCase):
return
P = self.create_resource(10, 0)
self.assertState(P, 10, 0)
- chans = [P.acquire() for _ in range(10)]
+ chans = [P.acquire() for _ in xrange(10)]
self.assertState(P, 0, 10)
with self.assertRaises(P.LimitExceeded):
P.acquire()
diff --git a/kombu/tests/test_entities.py b/kombu/tests/test_entities.py
index ff1a160f..8e89f84e 100644
--- a/kombu/tests/test_entities.py
+++ b/kombu/tests/test_entities.py
@@ -1,4 +1,5 @@
from __future__ import absolute_import
+from __future__ import with_statement
import pickle
@@ -243,7 +244,8 @@ class test_Queue(TestCase):
arguments=None,
type='direct',
durable=True,
- ), chan.exchange_declare.call_args_list,
+ ),
+ chan.exchange_declare.call_args_list,
)
def test_can_cache_declaration(self):
@@ -265,9 +267,8 @@ class test_Queue(TestCase):
)
def test_binds_at_instantiation(self):
- self.assertTrue(
- Queue('foo', self.exchange, channel=get_conn().channel()).is_bound,
- )
+ self.assertTrue(Queue('foo', self.exchange,
+ channel=get_conn().channel()).is_bound)
def test_also_binds_exchange(self):
chan = get_conn().channel()
diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py
index 256352b3..0fb8a657 100644
--- a/kombu/tests/test_messaging.py
+++ b/kombu/tests/test_messaging.py
@@ -1,4 +1,5 @@
-from __future__ import absolute_import, unicode_literals
+from __future__ import absolute_import
+from __future__ import with_statement
import anyjson
import pickle
@@ -67,7 +68,7 @@ class test_Producer(TestCase):
'p.declare() declares exchange')
def test_prepare(self):
- message = {'the quick brown fox': 'jumps over the lazy dog'}
+ message = {u'the quick brown fox': u'jumps over the lazy dog'}
channel = self.connection.channel()
p = Producer(channel, self.exchange, serializer='json')
m, ctype, cencoding = p._prepare(message, headers={})
@@ -76,7 +77,7 @@ class test_Producer(TestCase):
self.assertEqual(cencoding, 'utf-8')
def test_prepare_compression(self):
- message = {'the quick brown fox': 'jumps over the lazy dog'}
+ message = {u'the quick brown fox': u'jumps over the lazy dog'}
channel = self.connection.channel()
p = Producer(channel, self.exchange, serializer='json')
headers = {}
@@ -106,16 +107,15 @@ class test_Producer(TestCase):
self.assertEqual(cencoding, 'alien')
def test_prepare_is_already_unicode(self):
- message = 'the quick brown fox'
+ message = u'the quick brown fox'
channel = self.connection.channel()
p = Producer(channel, self.exchange, serializer='json')
m, ctype, cencoding = p._prepare(message, content_type='text/plain')
self.assertEqual(m, message.encode('utf-8'))
self.assertEqual(ctype, 'text/plain')
self.assertEqual(cencoding, 'utf-8')
- m, ctype, cencoding = p._prepare(
- message, content_type='text/plain', content_encoding='utf-8',
- )
+ m, ctype, cencoding = p._prepare(message, content_type='text/plain',
+ content_encoding='utf-8')
self.assertEqual(m, message.encode('utf-8'))
self.assertEqual(ctype, 'text/plain')
self.assertEqual(cencoding, 'utf-8')
@@ -177,7 +177,7 @@ class test_Producer(TestCase):
def test_publish(self):
channel = self.connection.channel()
p = Producer(channel, self.exchange, serializer='json')
- message = {'the quick brown fox': 'jumps over the lazy dog'}
+ message = {u'the quick brown fox': u'jumps over the lazy dog'}
ret = p.publish(message, routing_key='process')
self.assertIn('prepare_message', channel)
self.assertIn('basic_publish', channel)
@@ -409,11 +409,11 @@ class test_Consumer(TestCase):
message.payload # trigger cache
consumer.register_callback(callback)
- consumer._receive_callback({'foo': 'bar'})
+ consumer._receive_callback({u'foo': u'bar'})
self.assertIn('basic_ack', channel)
self.assertIn('message_to_python', channel)
- self.assertEqual(received[0], {'foo': 'bar'})
+ self.assertEqual(received[0], {u'foo': u'bar'})
def test_basic_ack_twice(self):
channel = self.connection.channel()
diff --git a/kombu/tests/test_pidbox.py b/kombu/tests/test_pidbox.py
index 1eaec780..d250eedb 100644
--- a/kombu/tests/test_pidbox.py
+++ b/kombu/tests/test_pidbox.py
@@ -1,4 +1,5 @@
from __future__ import absolute_import
+from __future__ import with_statement
import socket
@@ -28,9 +29,11 @@ class test_Mailbox(TestCase):
self.handlers = {'mymethod': self._handler}
self.bound = self.mailbox(self.connection)
self.default_chan = self.connection.channel()
- self.node = self.bound.Node('test_pidbox', state=self.state,
- handlers=self.handlers,
- channel=self.default_chan)
+ self.node = self.bound.Node(
+ 'test_pidbox',
+ state=self.state, handlers=self.handlers,
+ channel=self.default_chan,
+ )
def test_reply__collect(self):
mailbox = pidbox.Mailbox('test_reply__collect')(self.connection)
diff --git a/kombu/tests/test_pools.py b/kombu/tests/test_pools.py
index a9386602..1ec34395 100644
--- a/kombu/tests/test_pools.py
+++ b/kombu/tests/test_pools.py
@@ -1,4 +1,4 @@
-from __future__ import absolute_import
+from __future__ import with_statement
from kombu import Connection, Producer
from kombu import pools
diff --git a/kombu/tests/test_serialization.py b/kombu/tests/test_serialization.py
index 3d920cf8..de7ed343 100644
--- a/kombu/tests/test_serialization.py
+++ b/kombu/tests/test_serialization.py
@@ -1,29 +1,22 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
-from __future__ import absolute_import, unicode_literals
-
-from kombu.serialization import (
- registry,
- register,
- SerializerNotInstalled,
- raw_encode,
- register_yaml,
- register_msgpack,
- decode,
- bytes_t,
- pickle,
- pickle_protocol,
- unregister,
- register_pickle,
-)
+from __future__ import absolute_import
+from __future__ import with_statement
+
+import sys
+
+from kombu.serialization import (registry, register, SerializerNotInstalled,
+ raw_encode, register_yaml, register_msgpack,
+ decode, bytes_t, pickle, pickle_protocol,
+ unregister, register_pickle)
from .utils import TestCase
from .utils import mask_modules, skip_if_not_module
# For content_encoding tests
-unicode_string = 'abcdé\u8463'
+unicode_string = u'abcdé\u8463'
unicode_string_as_utf8 = unicode_string.encode('utf-8')
-latin_string = 'abcdé'
+latin_string = u'abcdé'
latin_string_as_latin1 = latin_string.encode('latin-1')
latin_string_as_utf8 = latin_string.encode('utf-8')
@@ -33,39 +26,48 @@ py_data = {
'string': 'The quick brown fox jumps over the lazy dog',
'int': 10,
'float': 3.14159265,
- 'unicode': 'Thé quick brown fox jumps over thé lazy dog',
+ 'unicode': u'Thé quick brown fox jumps over thé lazy dog',
'list': ['george', 'jerry', 'elaine', 'cosmo'],
}
# JSON serialization tests
-json_data = ('{"int": 10, "float": 3.1415926500000002, '
- '"list": ["george", "jerry", "elaine", "cosmo"], '
- '"string": "The quick brown fox jumps over the lazy '
- 'dog", "unicode": "Th\\u00e9 quick brown fox jumps over '
- 'th\\u00e9 lazy dog"}')
+json_data = """\
+{"int": 10, "float": 3.1415926500000002, \
+"list": ["george", "jerry", "elaine", "cosmo"], \
+"string": "The quick brown fox jumps over the lazy \
+dog", "unicode": "Th\\u00e9 quick brown fox jumps over \
+th\\u00e9 lazy dog"}\
+"""
# Pickle serialization tests
pickle_data = pickle.dumps(py_data, protocol=pickle_protocol)
# YAML serialization tests
-yaml_data = ('float: 3.1415926500000002\nint: 10\n'
- 'list: [george, jerry, elaine, cosmo]\n'
- 'string: The quick brown fox jumps over the lazy dog\n'
- 'unicode: "Th\\xE9 quick brown fox '
- 'jumps over th\\xE9 lazy dog"\n')
+yaml_data = """\
+float: 3.1415926500000002
+int: 10
+list: [george, jerry, elaine, cosmo]
+string: The quick brown fox jumps over the lazy dog
+unicode: "Th\\xE9 quick brown fox jumps over th\\xE9 lazy dog"
+"""
-msgpack_py_data = {
- 'string': 'The quick brown fox jumps over the lazy dog',
- 'int': 10,
- 'float': 3.14159265,
- 'unicode': 'Thé quick brown fox jumps over thé lazy dog',
- 'list': ['george', 'jerry', 'elaine', 'cosmo'],
-}
+msgpack_py_data = dict(py_data)
# msgpack only supports tuples
msgpack_py_data['list'] = tuple(msgpack_py_data['list'])
# Unicode chars are lost in transmit :(
msgpack_py_data['unicode'] = 'Th quick brown fox jumps over th lazy dog'
+msgpack_data = """\
+\x85\xa3int\n\xa5float\xcb@\t!\xfbS\xc8\xd4\xf1\xa4list\
+\x94\xa6george\xa5jerry\xa6elaine\xa5cosmo\xa6string\xda\
+\x00+The quick brown fox jumps over the lazy dog\xa7unicode\
+\xda\x00)Th quick brown fox jumps over th lazy dog\
+"""
+
+
+def say(m):
+ sys.stderr.write('%s\n' % (m, ))
+
registry.register('testS', lambda s: s, lambda s: 'decoded',
'application/testS', 'utf-8')
@@ -91,11 +93,13 @@ class test_Serialization(TestCase):
registry.disable('testS')
with self.assertRaises(SerializerNotInstalled):
- registry.decode('xxd', 'application/testS', 'utf-8',
- force=False)
+ registry.decode(
+ 'xxd', 'application/testS', 'utf-8', force=False,
+ )
- ret = registry.decode('xxd', 'application/testS', 'utf-8',
- force=True)
+ ret = registry.decode(
+ 'xxd', 'application/testS', 'utf-8', force=True,
+ )
self.assertEqual(ret, 'decoded')
finally:
disabled.clear()
@@ -106,17 +110,15 @@ class test_Serialization(TestCase):
def test_content_type_decoding(self):
self.assertEqual(
unicode_string,
- registry.decode(
- unicode_string_as_utf8,
- content_type='plain/text',
- content_encoding='utf-8'),
+ registry.decode(unicode_string_as_utf8,
+ content_type='plain/text',
+ content_encoding='utf-8'),
)
self.assertEqual(
latin_string,
- registry.decode(
- latin_string_as_latin1,
- content_type='application/data',
- content_encoding='latin-1'),
+ registry.decode(latin_string_as_latin1,
+ content_type='application/data',
+ content_encoding='latin-1'),
)
def test_content_type_binary(self):
@@ -158,10 +160,9 @@ class test_Serialization(TestCase):
def test_json_decode(self):
self.assertEqual(
py_data,
- registry.decode(
- json_data,
- content_type='application/json',
- content_encoding='utf-8'),
+ registry.decode(json_data,
+ content_type='application/json',
+ content_encoding='utf-8'),
)
def test_json_encode(self):
@@ -174,18 +175,35 @@ class test_Serialization(TestCase):
registry.decode(
json_data,
content_type='application/json',
- content_encoding='utf-8'),
+ content_encoding='utf-8',
+ ),
)
@skip_if_not_module('msgpack')
def test_msgpack_decode(self):
register_msgpack()
- decoded = registry.decode(
- registry.encode(msgpack_py_data, serializer='msgpack')[-1],
- content_type='application/x-msgpack',
- content_encoding='binary',
+ self.assertEqual(
+ msgpack_py_data,
+ registry.decode(msgpack_data,
+ content_type='application/x-msgpack',
+ content_encoding='binary'),
+ )
+
+ @skip_if_not_module('msgpack')
+ def test_msgpack_encode(self):
+ register_msgpack()
+ self.assertEqual(
+ registry.decode(
+ registry.encode(msgpack_py_data, serializer='msgpack')[-1],
+ content_type='application/x-msgpack',
+ content_encoding='binary',
+ ),
+ registry.decode(
+ msgpack_data,
+ content_type='application/x-msgpack',
+ content_encoding='binary',
+ ),
)
- self.assertEqual(msgpack_py_data, decoded)
@skip_if_not_module('yaml')
def test_yaml_decode(self):
@@ -209,16 +227,16 @@ class test_Serialization(TestCase):
registry.decode(
yaml_data,
content_type='application/x-yaml',
- content_encoding='utf-8'),
+ content_encoding='utf-8',
+ ),
)
def test_pickle_decode(self):
self.assertEqual(
py_data,
- registry.decode(
- pickle_data,
- content_type='application/x-python-serialize',
- content_encoding='binary'),
+ registry.decode(pickle_data,
+ content_type='application/x-python-serialize',
+ content_encoding='binary'),
)
def test_pickle_encode(self):
@@ -248,9 +266,10 @@ class test_Serialization(TestCase):
registry.encode('foo', serializer='nonexisting')
def test_raw_encode(self):
- self.assertTupleEqual(raw_encode('foo'.encode('utf-8')),
- ('application/data', 'binary',
- 'foo'.encode('utf-8')))
+ self.assertTupleEqual(
+ raw_encode('foo'.encode('utf-8')),
+ ('application/data', 'binary', 'foo'.encode('utf-8')),
+ )
@mask_modules('yaml')
def test_register_yaml__no_yaml(self):
diff --git a/kombu/tests/test_simple.py b/kombu/tests/test_simple.py
index 381d8a20..26b09080 100644
--- a/kombu/tests/test_simple.py
+++ b/kombu/tests/test_simple.py
@@ -1,7 +1,9 @@
from __future__ import absolute_import
+from __future__ import with_statement
+
+from Queue import Empty
from kombu import Connection, Exchange, Queue
-from kombu.five import Empty
from .utils import TestCase
from .utils import Mock
diff --git a/kombu/tests/test_utils.py b/kombu/tests/test_utils.py
index 338c58e7..298fc2bc 100644
--- a/kombu/tests/test_utils.py
+++ b/kombu/tests/test_utils.py
@@ -1,14 +1,19 @@
-from __future__ import absolute_import, unicode_literals
+from __future__ import absolute_import
+from __future__ import with_statement
import pickle
import sys
from functools import wraps
-from io import BytesIO, StringIO
from mock import Mock, patch
+if sys.version_info >= (3, 0):
+ from io import StringIO, BytesIO
+else:
+ from StringIO import StringIO, StringIO as BytesIO # noqa
+
from kombu import utils
-from kombu.five import string_t
+from kombu.utils.compat import next
from .utils import (
TestCase,
@@ -57,7 +62,7 @@ class test_utils(TestCase):
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0])
def test_reprkwargs(self):
- self.assertTrue(utils.reprkwargs({'foo': 'bar', 1: 2, 'k': 'v'}))
+ self.assertTrue(utils.reprkwargs({'foo': 'bar', 1: 2, u'k': 'v'}))
def test_reprcall(self):
self.assertTrue(
@@ -88,7 +93,7 @@ class test_UUID(TestCase):
self.assertIsNone(ctypes)
tid = uuid()
self.assertTrue(tid)
- self.assertIsInstance(tid, string_t)
+ self.assertIsInstance(tid, basestring)
try:
with_ctypes_masked()
@@ -103,8 +108,8 @@ class test_Misc(TestCase):
def f(**kwargs):
return kwargs
- kw = {'foo': 'foo',
- 'bar': 'bar'}
+ kw = {u'foo': 'foo',
+ u'bar': 'bar'}
self.assertTrue(f(**utils.kwdict(kw)))
@@ -142,8 +147,7 @@ class test_emergency_dump_state(TestCase):
{'foo': 'bar'},
open_file=lambda n, m: fh, dump=raise_something
)
- # replace at the end removes u' from repr on Py2
- self.assertIn("'foo': 'bar'", fh.getvalue().replace("u'", "'"))
+ self.assertIn("'foo': 'bar'", fh.getvalue())
self.assertTrue(stderr.getvalue())
self.assertFalse(stdout.getvalue())
@@ -196,8 +200,8 @@ class test_retry_over_time(TestCase):
utils.count.return_value = range(10)
cb = Mock()
x = utils.retry_over_time(self.myfun, self.Predicate,
- errback=self.errback, interval_max=14,
- callback=cb)
+ errback=self.errback, callback=cb,
+ interval_max=14)
self.assertEqual(x, 42)
self.assertEqual(self.index, 9)
cb.assert_called_with()
@@ -287,8 +291,10 @@ class test_symbol_by_name(TestCase):
def test_returns_default(self):
default = object()
- self.assertIs(utils.symbol_by_name('xyz.ryx.qedoa.weq:foz',
- default=default), default)
+ self.assertIs(
+ utils.symbol_by_name('xyz.ryx.qedoa.weq:foz', default=default),
+ default,
+ )
def test_no_default(self):
with self.assertRaises(ImportError):
@@ -302,17 +308,19 @@ class test_symbol_by_name(TestCase):
def test_package(self):
from kombu.entity import Exchange
- self.assertIs(utils.symbol_by_name('.entity:Exchange',
- package='kombu'), Exchange)
+ self.assertIs(
+ utils.symbol_by_name('.entity:Exchange', package='kombu'),
+ Exchange,
+ )
self.assertTrue(utils.symbol_by_name(':Consumer', package='kombu'))
class test_ChannelPromise(TestCase):
def test_repr(self):
- self.assertIn(
- "'foo'",
+ self.assertEqual(
repr(utils.ChannelPromise(lambda: 'foo')),
+ "<promise: 'foo'>",
)
diff --git a/kombu/tests/transport/test_amqplib.py b/kombu/tests/transport/test_amqplib.py
index 626050fc..185cc349 100644
--- a/kombu/tests/transport/test_amqplib.py
+++ b/kombu/tests/transport/test_amqplib.py
@@ -57,11 +57,10 @@ class test_Channel(amqplibCase):
self.assertFalse(self.channel.no_ack_consumers)
def test_prepare_message(self):
- x = self.channel.prepare_message(
+ self.assertTrue(self.channel.prepare_message(
'foobar', 10, 'application/data', 'utf-8',
properties={},
- )
- self.assertTrue(x)
+ ))
def test_message_to_python(self):
message = Mock()
diff --git a/kombu/tests/transport/test_base.py b/kombu/tests/transport/test_base.py
index 2ba7156a..c34871d7 100644
--- a/kombu/tests/transport/test_base.py
+++ b/kombu/tests/transport/test_base.py
@@ -1,4 +1,5 @@
from __future__ import absolute_import
+from __future__ import with_statement
from kombu import Connection, Consumer, Producer, Queue
from kombu.transport.base import Message, StdChannel, Transport
diff --git a/kombu/tests/transport/test_filesystem.py b/kombu/tests/transport/test_filesystem.py
index 7bdad8f5..b1d7c0cd 100644
--- a/kombu/tests/transport/test_filesystem.py
+++ b/kombu/tests/transport/test_filesystem.py
@@ -1,4 +1,5 @@
from __future__ import absolute_import
+from __future__ import with_statement
import tempfile
diff --git a/kombu/tests/transport/test_memory.py b/kombu/tests/transport/test_memory.py
index 970681d7..d138e494 100644
--- a/kombu/tests/transport/test_memory.py
+++ b/kombu/tests/transport/test_memory.py
@@ -1,4 +1,5 @@
from __future__ import absolute_import
+from __future__ import with_statement
import socket
diff --git a/kombu/tests/transport/test_pyamqp.py b/kombu/tests/transport/test_pyamqp.py
index 21764c83..bd4e86bf 100644
--- a/kombu/tests/transport/test_pyamqp.py
+++ b/kombu/tests/transport/test_pyamqp.py
@@ -1,4 +1,5 @@
from __future__ import absolute_import
+from __future__ import with_statement
import sys
@@ -49,11 +50,10 @@ class test_Channel(TestCase):
self.assertFalse(self.channel.no_ack_consumers)
def test_prepare_message(self):
- x = self.channel.prepare_message(
+ self.assertTrue(self.channel.prepare_message(
'foobar', 10, 'application/data', 'utf-8',
properties={},
- )
- self.assertTrue(x)
+ ))
def test_message_to_python(self):
message = Mock()
diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py
index 8cb8fab5..90e3a318 100644
--- a/kombu/tests/transport/test_redis.py
+++ b/kombu/tests/transport/test_redis.py
@@ -1,4 +1,5 @@
from __future__ import absolute_import
+from __future__ import with_statement
import socket
import types
@@ -6,10 +7,10 @@ import types
from anyjson import dumps
from collections import defaultdict
from itertools import count
+from Queue import Empty, Queue as _Queue
from kombu import Connection, Exchange, Queue, Consumer, Producer
from kombu.exceptions import InconsistencyError, VersionMismatch
-from kombu.five import Empty, Queue as _Queue
from kombu.utils import eventio # patch poll
from kombu.tests.utils import TestCase
@@ -128,10 +129,10 @@ class Client(object):
class _socket(object):
blocking = True
- filenos = count(30)
+ next_fileno = count(30).next
def __init__(self, *args):
- self._fileno = next(self.filenos)
+ self._fileno = self.next_fileno()
self.data = []
def fileno(self):
@@ -264,24 +265,28 @@ class test_Channel(TestCase):
self.assertDictEqual(
self.channel._handle_message(
self.channel.subclient,
- ['pmessage', 'pattern', 'channel', 'data']
+ ['pmessage', 'pattern', 'channel', 'data'],
),
- {'type': 'pmessage',
- 'pattern': 'pattern',
- 'channel': 'channel',
- 'data': 'data'},
+ {
+ 'type': 'pmessage',
+ 'pattern': 'pattern',
+ 'channel': 'channel',
+ 'data': 'data',
+ },
)
def test_handle_message(self):
self.assertDictEqual(
self.channel._handle_message(
self.channel.subclient,
- ['type', 'channel', 'data']
+ ['type', 'channel', 'data'],
),
- {'type': 'type',
- 'pattern': None,
- 'channel': 'channel',
- 'data': 'data'},
+ {
+ 'type': 'type',
+ 'pattern': None,
+ 'channel': 'channel',
+ 'data': 'data',
+ },
)
def test_brpop_start_but_no_queues(self):
@@ -290,9 +295,8 @@ class test_Channel(TestCase):
def test_receive(self):
s = self.channel.subclient = Mock()
self.channel._fanout_to_queue['a'] = 'b'
- s.parse_response.return_value = [
- 'message', 'a', dumps({'hello': 'world'}),
- ]
+ s.parse_response.return_value = ['message', 'a',
+ dumps({'hello': 'world'})]
payload, queue = self.channel._receive()
self.assertDictEqual(payload, {'hello': 'world'})
self.assertEqual(queue, 'b')
diff --git a/kombu/tests/transport/test_sqlalchemy.py b/kombu/tests/transport/test_sqlalchemy.py
index f3d7d239..27cc4323 100644
--- a/kombu/tests/transport/test_sqlalchemy.py
+++ b/kombu/tests/transport/test_sqlalchemy.py
@@ -1,4 +1,5 @@
from __future__ import absolute_import
+from __future__ import with_statement
from mock import patch
from nose import SkipTest
diff --git a/kombu/tests/transport/test_transport.py b/kombu/tests/transport/test_transport.py
index 608b9767..11cdcbe3 100644
--- a/kombu/tests/transport/test_transport.py
+++ b/kombu/tests/transport/test_transport.py
@@ -1,4 +1,5 @@
from __future__ import absolute_import
+from __future__ import with_statement
from mock import patch
diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py
index 571c6e17..28714cc4 100644
--- a/kombu/tests/transport/virtual/test_base.py
+++ b/kombu/tests/transport/virtual/test_base.py
@@ -1,4 +1,5 @@
from __future__ import absolute_import
+from __future__ import with_statement
import warnings
@@ -8,7 +9,9 @@ from kombu import Connection
from kombu.exceptions import StdChannelError
from kombu.transport import virtual
from kombu.utils import uuid
+from kombu.compression import compress
+from kombu.tests.compat import catch_warnings
from kombu.tests.utils import TestCase
from kombu.tests.utils import Mock, redirect_stdouts
@@ -64,7 +67,7 @@ class test_QoS(TestCase):
self.q.append(i + 1, uuid())
self.assertFalse(self.q.can_consume())
- tag1 = next(iter(self.q._delivered))
+ tag1 = iter(self.q._delivered).next()
self.q.ack(tag1)
self.assertTrue(self.q.can_consume())
@@ -120,13 +123,15 @@ class test_Message(TestCase):
def test_serializable(self):
c = client().channel()
- data = c.prepare_message('the quick brown fox...')
+ body, content_type = compress('the quick brown fox...', 'gzip')
+ data = c.prepare_message(body, headers={'compression': content_type})
tag = data['properties']['delivery_tag'] = uuid()
message = c.message_to_python(data)
dict_ = message.serializable()
self.assertEqual(dict_['body'],
'the quick brown fox...'.encode('utf-8'))
self.assertEqual(dict_['properties']['delivery_tag'], tag)
+ self.assertFalse('compression' in dict_['headers'])
class test_AbstractChannel(TestCase):
@@ -304,8 +309,8 @@ class test_Channel(TestCase):
consumer_tag = uuid()
- c.basic_consume(n + '2', False, consumer_tag=consumer_tag,
- callback=lambda *a: None)
+ c.basic_consume(n + '2', False,
+ consumer_tag=consumer_tag, callback=lambda *a: None)
self.assertIn(n + '2', c._active_queues)
r2, _ = c.drain_events()
r2 = c.message_to_python(r2)
@@ -406,7 +411,7 @@ class test_Channel(TestCase):
def test_lookup__undeliverable(self, n='test_lookup__undeliverable'):
warnings.resetwarnings()
- with warnings.catch_warnings(record=True) as log:
+ with catch_warnings(record=True) as log:
self.assertListEqual(
self.channel._lookup(n, n, 'ae.undeliver'),
['ae.undeliver'],
diff --git a/kombu/tests/transport/virtual/test_exchange.py b/kombu/tests/transport/virtual/test_exchange.py
index d59aaa06..90127ba0 100644
--- a/kombu/tests/transport/virtual/test_exchange.py
+++ b/kombu/tests/transport/virtual/test_exchange.py
@@ -1,4 +1,5 @@
from __future__ import absolute_import
+from __future__ import with_statement
from kombu import Connection
from kombu.transport.virtual import exchange
@@ -68,8 +69,10 @@ class test_Fanout(ExchangeCase):
class test_Topic(ExchangeCase):
type = exchange.TopicExchange
- table = [('stock.#', None, 'rFoo'),
- ('stock.us.*', None, 'rBar')]
+ table = [
+ ('stock.#', None, 'rFoo'),
+ ('stock.us.*', None, 'rBar'),
+ ]
def setUp(self):
super(test_Topic, self).setUp()
@@ -125,18 +128,24 @@ class test_ExchangeType(ExchangeCase):
)
def test_equivalent(self):
- e1 = dict(type='direct',
- durable=True,
- auto_delete=True,
- arguments={})
+ e1 = dict(
+ type='direct',
+ durable=True,
+ auto_delete=True,
+ arguments={},
+ )
self.assertTrue(
- self.e.equivalent(e1, 'eFoo', 'direct', True, True, {}))
+ self.e.equivalent(e1, 'eFoo', 'direct', True, True, {}),
+ )
self.assertFalse(
- self.e.equivalent(e1, 'eFoo', 'topic', True, True, {}))
+ self.e.equivalent(e1, 'eFoo', 'topic', True, True, {}),
+ )
self.assertFalse(
- self.e.equivalent(e1, 'eFoo', 'direct', False, True, {}))
+ self.e.equivalent(e1, 'eFoo', 'direct', False, True, {}),
+ )
self.assertFalse(
- self.e.equivalent(e1, 'eFoo', 'direct', True, False, {}))
+ self.e.equivalent(e1, 'eFoo', 'direct', True, False, {}),
+ )
self.assertFalse(
self.e.equivalent(e1, 'eFoo', 'direct', True, True,
{'expires': 3000}),
diff --git a/kombu/tests/transport/virtual/test_scheduling.py b/kombu/tests/transport/virtual/test_scheduling.py
index 1a6c32bc..6f2d24f1 100644
--- a/kombu/tests/transport/virtual/test_scheduling.py
+++ b/kombu/tests/transport/virtual/test_scheduling.py
@@ -1,4 +1,5 @@
from __future__ import absolute_import
+from __future__ import with_statement
from kombu.transport.virtual.scheduling import FairCycle
diff --git a/kombu/tests/utilities/test_amq_manager.py b/kombu/tests/utilities/test_amq_manager.py
index e030810e..ccf4ec08 100644
--- a/kombu/tests/utilities/test_amq_manager.py
+++ b/kombu/tests/utilities/test_amq_manager.py
@@ -1,4 +1,5 @@
from __future__ import absolute_import
+from __future__ import with_statement
from mock import patch
diff --git a/kombu/tests/utilities/test_debug.py b/kombu/tests/utilities/test_debug.py
index d364400f..dbe8a2af 100644
--- a/kombu/tests/utilities/test_debug.py
+++ b/kombu/tests/utilities/test_debug.py
@@ -1,4 +1,5 @@
from __future__ import absolute_import
+from __future__ import with_statement
import logging
diff --git a/kombu/tests/utilities/test_encoding.py b/kombu/tests/utilities/test_encoding.py
index 5d42e026..b2942b67 100644
--- a/kombu/tests/utilities/test_encoding.py
+++ b/kombu/tests/utilities/test_encoding.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
-from __future__ import absolute_import, unicode_literals
+from __future__ import absolute_import
+from __future__ import with_statement
import sys
@@ -45,16 +46,16 @@ class test_encoding_utils(TestCase):
def test_str_to_bytes(self):
with clean_encoding() as e:
- self.assertIsInstance(e.str_to_bytes('foobar'), str)
+ self.assertIsInstance(e.str_to_bytes(u'foobar'), str)
self.assertIsInstance(e.str_to_bytes('foobar'), str)
def test_from_utf8(self):
with clean_encoding() as e:
- self.assertIsInstance(e.from_utf8('foobar'), str)
+ self.assertIsInstance(e.from_utf8(u'foobar'), str)
def test_default_encode(self):
with clean_encoding() as e:
- self.assertTrue(e.default_encode(b'foo'))
+ self.assertTrue(e.default_encode('foo'))
class test_safe_str(TestCase):
@@ -63,10 +64,10 @@ class test_safe_str(TestCase):
self.assertEqual(safe_str('foo'), 'foo')
def test_when_unicode(self):
- self.assertIsInstance(safe_str('foo'), str)
+ self.assertIsInstance(safe_str(u'foo'), str)
def test_when_containing_high_chars(self):
- s = 'The quiæk fåx jømps øver the lazy dåg'
+ s = u'The quiæk fåx jømps øver the lazy dåg'
res = safe_str(s)
self.assertIsInstance(res, str)
diff --git a/kombu/tests/utils.py b/kombu/tests/utils.py
index 82830dad..a8f92a37 100644
--- a/kombu/tests/utils.py
+++ b/kombu/tests/utils.py
@@ -1,10 +1,12 @@
from __future__ import absolute_import
+import __builtin__
import os
import sys
import types
from functools import wraps
+from StringIO import StringIO
import mock
@@ -16,8 +18,6 @@ try:
except AttributeError:
import unittest2 as unittest # noqa
-from kombu.five import StringIO, builtins, string_t, module_name_t
-
class TestCase(unittest.TestCase):
@@ -76,8 +76,8 @@ def module_exists(*modules):
@wraps(fun)
def __inner(*args, **kwargs):
for module in modules:
- if isinstance(module, string_t):
- module = types.ModuleType(module_name_t(module))
+ if isinstance(module, basestring):
+ module = types.ModuleType(module)
sys.modules[module.__name__] = module
try:
return fun(*args, **kwargs)
@@ -95,19 +95,19 @@ def mask_modules(*modnames):
@wraps(fun)
def __inner(*args, **kwargs):
- realimport = builtins.__import__
+ realimport = __builtin__.__import__
def myimp(name, *args, **kwargs):
if name in modnames:
- raise ImportError('No module named {0}'.format(name))
+ raise ImportError('No module named %s' % name)
else:
return realimport(name, *args, **kwargs)
- builtins.__import__ = myimp
+ __builtin__.__import__ = myimp
try:
return fun(*args, **kwargs)
finally:
- builtins.__import__ = realimport
+ __builtin__.__import__ = realimport
return __inner
return _inner
@@ -120,7 +120,7 @@ def skip_if_environ(env_var_name):
@wraps(fun)
def _skips_if_environ(*args, **kwargs):
if os.environ.get(env_var_name):
- raise SkipTest('SKIP {0}: {1} set'.format(
+ raise SkipTest('SKIP %s: %s set\n' % (
fun.__name__, env_var_name))
return fun(*args, **kwargs)
@@ -135,7 +135,7 @@ def skip_if_module(module):
def _skip_if_module(*args, **kwargs):
try:
__import__(module)
- raise SkipTest('SKIP {0}: {1} available'.format(
+ raise SkipTest('SKIP %s: %s available\n' % (
fun.__name__, module))
except ImportError:
pass
@@ -151,7 +151,7 @@ def skip_if_not_module(module):
try:
__import__(module)
except ImportError:
- raise SkipTest('SKIP {0}: {1} available'.format(
+ raise SkipTest('SKIP %s: %s available\n' % (
fun.__name__, module))
return fun(*args, **kwargs)
return _skip_if_not_module
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py
index 12380702..9432ba12 100644
--- a/kombu/transport/SQS.py
+++ b/kombu/transport/SQS.py
@@ -240,6 +240,13 @@ class Channel(virtual.Channel):
return payload
raise Empty()
+ def _restore(self, message,
+ unwanted_delivery_info=('sqs_message', 'sqs_queue')):
+ for unwanted_key in unwanted_delivery_info:
+ # Remove objects that aren't JSON serializable (Issue #1108).
+ message.delivery_info.pop(unwanted_key, None)
+ return super(Channel, self)._restore(message)
+
def basic_ack(self, delivery_tag):
delivery_info = self.qos.get(delivery_tag).delivery_info
try:
diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py
index f3dbf58d..b9012a2d 100644
--- a/kombu/transport/virtual/__init__.py
+++ b/kombu/transport/virtual/__init__.py
@@ -131,9 +131,9 @@ class QoS(object):
def append(self, message, delivery_tag):
"""Append message to transactional state."""
- self._delivered[delivery_tag] = message
if self._dirty:
self._flush()
+ self._delivered[delivery_tag] = message
def get(self, delivery_tag):
return self._delivered[delivery_tag]
@@ -238,7 +238,7 @@ class Message(base.Message):
'properties': props,
'content-type': self.content_type,
'content-encoding': self.content_encoding,
- 'headers': self.headers}
+ 'headers': headers}
class AbstractChannel(object):
@@ -510,10 +510,13 @@ class Channel(AbstractChannel, base.StdChannel):
pass
self.connection._callbacks.pop(queue, None)
- def basic_get(self, queue, **kwargs):
+ def basic_get(self, queue, no_ack=False, **kwargs):
"""Get message by direct access (synchronous)."""
try:
- return self._get(queue)
+ message = self.Message(self, self._get(queue))
+ if not no_ack:
+ self.qos.append(message, message.delivery_tag)
+ return message
except Empty:
pass
diff --git a/kombu/utils/amq_manager.py b/kombu/utils/amq_manager.py
index 30419ae6..08ee3939 100644
--- a/kombu/utils/amq_manager.py
+++ b/kombu/utils/amq_manager.py
@@ -8,7 +8,7 @@ def get_manager(client, hostname=None, port=None, userid=None,
def get(name, val, default):
return (val if val is not None
- else opt.get('manager_%s' % name)
+ else opt('manager_%s' % name)
or getattr(client, name, None) or default)
host = get('hostname', hostname, 'localhost')
diff --git a/pavement.py b/pavement.py
index f15cbe68..57f8ce97 100644
--- a/pavement.py
+++ b/pavement.py
@@ -7,7 +7,7 @@ from paver.setuputils import setup # noqa
PYCOMPILE_CACHES = ['*.pyc', '*$py.class']
options(
- sphinx=Bunch(builddir='.build'),
+ sphinx=Bunch(builddir='.build'),
)
@@ -91,9 +91,15 @@ def readme(options):
])
def bump(options):
s = "-- '%s'" % (options.custom, ) \
+<<<<<<< HEAD
if getattr(options, 'custom', None) else ''
sh('extra/release/bump_version.py \
kombu/__init__.py README.rst %s' % (s, ))
+=======
+ if getattr(options, "custom", None) else ""
+ sh("extra/release/bump_version.py \
+ kombu/__init__.py README.rst %s" % (s, ))
+>>>>>>> 2.5
@task
@@ -129,6 +135,7 @@ def flake8(options):
}{exit $FOUND_FLAKE;
'""" % (complexity, migrations_path), ignore_error=noerror)
+
@task
@cmdopts([
('noerror', 'E', 'Ignore errors'),
diff --git a/setup.py b/setup.py
index cf4c9b61..c53190f3 100644
--- a/setup.py
+++ b/setup.py
@@ -86,8 +86,9 @@ for dirpath, dirnames, filenames in os.walk(src_dir):
if filename.endswith('.py'):
packages.append('.'.join(fullsplit(dirpath)))
else:
- data_files.append([dirpath, [os.path.join(dirpath, f) for f in
- filenames]])
+ data_files.append(
+ [dirpath, [os.path.join(dirpath, f) for f in filenames]],
+ )
if os.path.exists('README.rst'):
long_description = codecs.open('README.rst', 'r', 'utf-8').read()