summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2013-09-10 14:53:36 +0100
committerAsk Solem <ask@celeryproject.org>2013-09-10 14:53:36 +0100
commitef4062681877811734e6cb3aaf087016f9eb53f4 (patch)
tree392332465013beff9fef4258448b7b8f580748eb
parent91c6cd6cb50eddf065431930da96716ac751f9cd (diff)
parentd36af8e23708753cff1bdc2719fa99837d2f6eee (diff)
downloadkombu-ef4062681877811734e6cb3aaf087016f9eb53f4.tar.gz
Merge branch '2.5'
Conflicts: Changelog README.rst kombu/__init__.py kombu/messaging.py kombu/mixins.py kombu/tests/test_serialization.py kombu/tests/utilities/test_encoding.py kombu/transport/filesystem.py kombu/transport/pyamqp.py requirements/default.txt setup.py tox.ini
-rw-r--r--AUTHORS2
-rw-r--r--Changelog36
-rw-r--r--kombu/entity.py1
-rw-r--r--kombu/messaging.py2
-rw-r--r--kombu/mixins.py9
-rw-r--r--kombu/serialization.py2
-rw-r--r--kombu/tests/test_serialization.py21
-rw-r--r--kombu/tests/utilities/test_encoding.py27
-rw-r--r--kombu/tests/utils.py4
-rw-r--r--kombu/transport/filesystem.py3
-rw-r--r--kombu/transport/pyamqp.py3
11 files changed, 89 insertions, 21 deletions
diff --git a/AUTHORS b/AUTHORS
index 47c87243..11e9cf60 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -20,6 +20,7 @@ C Anthony Risinger <anthony+corvisa.com@xtfx.me>
Christophe Chauvet <christophe.chauvet@gmail.com>
Christopher Grebs <cg@webshox.org>
Clay Gerrard <clay.gerrard@gmail.com>
+Corentin Ardeois <cardeois@iweb.com>
Dan LaMotte <lamotte85@gmail.com>
Dan McGee <dan@archlinux.org>
Dane Guempel <daneguempel@gmail.com>
@@ -71,6 +72,7 @@ Rob Ottaway <robottaway@gmail.com>
Rumyana Neykova <rumi.neykova@gmail.com>
Rune Halvorsen <runeh@opera.com>
Ryan Petrello <lists@ryanpetrello.com>
+Sascha Peilicke <saschpe@gmx.de>
Scott Lyons <scottalyons@gmail.com>
Sean Bleier <sebleier@gmail.com>
Sean Creeley <sean.creeley@gmail.com>
diff --git a/Changelog b/Changelog
index 2e5faab2..61412da6 100644
--- a/Changelog
+++ b/Changelog
@@ -19,6 +19,41 @@
>>> from kombu import enable_insecure_serializers
>>> enable_insecure_serializers()
+.. _version-2.5.14:
+
+2.5.14
+======
+:release-date: 2014-08-23 17:00 P.M BST
+
+- safe_str did not work properly resulting in
+ :exc:`UnicodeDecodeError` (Issue #248).
+
+.. _version-2.5.13:
+
+2.5.13
+======
+:release-date: 2013-08-16 16:00 P.M BST
+
+- Now depends on :mod:`amqp` 1.0.13
+
+- Fixed typo in Django functional tests.
+
+- safe_str now returns Unicode in Python 2.x
+
+ Fix contributed by Germán M. Bravo.
+
+- amqp: Transport options are now merged with arguments
+ supplied to the connection.
+
+- Tests no longer depends on distribute, which was deprecated
+ and merged back into setuptools.
+
+ Fix contributed by Sascha Peilicke.
+
+- ConsumerMixin now also restarts on channel related errors.
+
+ Fix contributed by Corentin Ardeois.
+
.. _version-2.5.12:
2.5.12
@@ -30,7 +65,6 @@
- Fixed test suite errors on Python 3.
- Fixed msgpack test failures.
->>>>>>> 2.5
.. _version-2.5.11:
diff --git a/kombu/entity.py b/kombu/entity.py
index 496b1eab..b04554b2 100644
--- a/kombu/entity.py
+++ b/kombu/entity.py
@@ -553,6 +553,7 @@ class Queue(MaybeChannelBound):
:keyword no_ack: If enabled the broker will automatically
ack messages.
+ :keyword accept: Custom list of accepted content types.
This method provides direct access to the messages in a
queue using a synchronous dialogue, designed for
diff --git a/kombu/messaging.py b/kombu/messaging.py
index 6719fc0b..0341737c 100644
--- a/kombu/messaging.py
+++ b/kombu/messaging.py
@@ -13,7 +13,7 @@ from .compression import compress
from .connection import maybe_channel, is_connection
from .entity import Exchange, Queue, DELIVERY_MODES
from .five import int_types, text_t, values
-from .serialization import encode, registry, prepare_accept_content
+from .serialization import encode, prepare_accept_content
from .utils import ChannelPromise, maybe_list
__all__ = ['Exchange', 'Queue', 'Producer', 'Consumer']
diff --git a/kombu/mixins.py b/kombu/mixins.py
index 5ba945f1..3f295740 100644
--- a/kombu/mixins.py
+++ b/kombu/mixins.py
@@ -160,14 +160,17 @@ class ConsumerMixin(object):
yield
def run(self, _tokens=1):
+ restart_limit = self.restart_limit
+ errors = (self.connection.connection_errors +
+ self.connection.channel_errors)
while not self.should_stop:
try:
- if self.restart_limit.can_consume(_tokens):
+ if restart_limit.can_consume(_tokens):
for _ in self.consume(limit=None):
pass
else:
- sleep(self.restart_limit.expected_time(_tokens))
- except self.connection.connection_errors:
+ sleep(restart_limit.expected_time(_tokens))
+ except errors:
warn('Connection to broker lost. '
'Trying to re-establish the connection...')
diff --git a/kombu/serialization.py b/kombu/serialization.py
index 2e2b2367..33d19775 100644
--- a/kombu/serialization.py
+++ b/kombu/serialization.py
@@ -362,7 +362,7 @@ def register_msgpack():
'msgpack', dumps, loads,
content_type='application/x-msgpack',
content_encoding='binary')
- except ImportError:
+ except (ImportError, ValueError):
def not_available(*args, **kwargs):
"""In case a client receives a msgpack message, but yaml
diff --git a/kombu/tests/test_serialization.py b/kombu/tests/test_serialization.py
index a66e27f0..bbd1fddb 100644
--- a/kombu/tests/test_serialization.py
+++ b/kombu/tests/test_serialization.py
@@ -15,6 +15,7 @@ from kombu.serialization import (
)
from kombu.utils.encoding import str_to_bytes
+from .five import text_t
from .utils import TestCase
from .utils import mask_modules, skip_if_not_module
@@ -58,8 +59,6 @@ unicode: "Th\\xE9 quick brown fox jumps over th\\xE9 lazy dog"
msgpack_py_data = dict(py_data)
-# msgpack only supports tuples
-#msgpack_py_data['list'] = msgpack_py_data['list']
# Unicode chars are lost in transmit :(
msgpack_py_data['unicode'] = 'Th quick brown fox jumps over th lazy dog'
msgpack_data = b64decode(str_to_bytes("""\
@@ -183,17 +182,25 @@ class test_Serialization(TestCase):
),
)
- @skip_if_not_module('msgpack')
+ @skip_if_not_module('msgpack', (ImportError, ValueError))
def test_msgpack_decode(self):
register_msgpack()
+ res = registry.decode(msgpack_data,
+ content_type='application/x-msgpack',
+ content_encoding='binary')
+ if sys.version_info[0] < 3:
+ for k, v in res.items():
+ if isinstance(v, text_t):
+ res[k] = v.encode()
+ if isinstance(v, (list, tuple)):
+ res[k] = [i.encode() for i in v]
+ print('RES: %r' % (res, ))
self.assertEqual(
msgpack_py_data,
- registry.decode(msgpack_data,
- content_type='application/x-msgpack',
- content_encoding='binary'),
+ res,
)
- @skip_if_not_module('msgpack')
+ @skip_if_not_module('msgpack', (ImportError, ValueError))
def test_msgpack_encode(self):
register_msgpack()
self.assertEqual(
diff --git a/kombu/tests/utilities/test_encoding.py b/kombu/tests/utilities/test_encoding.py
index 56348d23..7da9c379 100644
--- a/kombu/tests/utilities/test_encoding.py
+++ b/kombu/tests/utilities/test_encoding.py
@@ -60,16 +60,35 @@ class test_encoding_utils(TestCase):
class test_safe_str(TestCase):
- def test_when_str(self):
+ def setUp(self):
+ self._cencoding = patch('sys.getdefaultencoding')
+ self._encoding = self._cencoding.__enter__()
+ self._encoding.return_value = 'ascii'
+
+ def tearDown(self):
+ self._cencoding.__exit__()
+
+ def test_when_bytes(self):
self.assertEqual(safe_str('foo'), 'foo')
def test_when_unicode(self):
self.assertIsInstance(safe_str('foo'), string_t)
+ def test_when_encoding_utf8(self):
+ with patch('sys.getdefaultencoding') as encoding:
+ encoding.return_value = 'utf-8'
+ self.assertEqual(default_encoding(), 'utf-8')
+ s = u'The quiæk fåx jømps øver the lazy dåg'
+ res = safe_str(s)
+ self.assertIsInstance(res, str)
+
def test_when_containing_high_chars(self):
- s = 'The quiæk fåx jømps øver the lazy dåg'
- res = safe_str(s)
- self.assertIsInstance(res, string_t)
+ with patch('sys.getdefaultencoding') as encoding:
+ encoding.return_value = 'ascii'
+ s = u'The quiæk fåx jømps øver the lazy dåg'
+ res = safe_str(s)
+ self.assertIsInstance(res, str)
+ self.assertEqual(len(s), len(res))
def test_when_not_string(self):
o = object()
diff --git a/kombu/tests/utils.py b/kombu/tests/utils.py
index f6724308..d0e1db82 100644
--- a/kombu/tests/utils.py
+++ b/kombu/tests/utils.py
@@ -166,13 +166,13 @@ def skip_if_module(module):
return _wrap_test
-def skip_if_not_module(module):
+def skip_if_not_module(module, import_errors=(ImportError, )):
def _wrap_test(fun):
@wraps(fun)
def _skip_if_not_module(*args, **kwargs):
try:
__import__(module)
- except ImportError:
+ except import_errors:
raise SkipTest('SKIP %s: %s available\n' % (
fun.__name__, module))
return fun(*args, **kwargs)
diff --git a/kombu/transport/filesystem.py b/kombu/transport/filesystem.py
index 3b248bc8..697e0f9a 100644
--- a/kombu/transport/filesystem.py
+++ b/kombu/transport/filesystem.py
@@ -19,7 +19,7 @@ from . import virtual
from kombu.exceptions import StdConnectionError, StdChannelError
from kombu.five import Empty
from kombu.utils import cached_property
-from kombu.utils.encoding import str_to_bytes, bytes_to_str
+from kombu.utils.encoding import bytes_to_str, str_to_bytes
VERSION = (1, 0, 0)
__version__ = ".".join(map(str, VERSION))
@@ -72,7 +72,6 @@ class Channel(virtual.Channel):
try:
f = open(filename, 'wb')
lock(f, LOCK_EX)
- dumps(payload)
f.write(str_to_bytes(dumps(payload)))
except (IOError, OSError):
raise StdChannelError(
diff --git a/kombu/transport/pyamqp.py b/kombu/transport/pyamqp.py
index 4042d21b..e77f290b 100644
--- a/kombu/transport/pyamqp.py
+++ b/kombu/transport/pyamqp.py
@@ -88,6 +88,9 @@ class Transport(base.Transport):
self.client = client
self.default_port = kwargs.get('default_port') or self.default_port
+ def driver_version(self):
+ return amqp.__version__
+
def create_channel(self, connection):
return connection.channel()