From 917df5da62a88ff9734b05a70b4cec4b1269a2d9 Mon Sep 17 00:00:00 2001 From: Randy Barlow Date: Mon, 21 Apr 2014 14:13:30 -0400 Subject: 344 - added the skip decorator to three tests. Three tests were missing the @skip_if_not_module('redis') decorator. https://github.com/celery/kombu/issues/344 --- kombu/tests/transport/test_redis.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'kombu') diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py index 48f7c6be..fa1782b2 100644 --- a/kombu/tests/transport/test_redis.py +++ b/kombu/tests/transport/test_redis.py @@ -616,10 +616,12 @@ class test_Channel(Case): self.channel.connection.client.virtual_host = 'dwqeq' self.channel._connparams() + @skip_if_not_module('redis') def test_connparams_allows_slash_in_db(self): self.channel.connection.client.virtual_host = '/123' self.assertEqual(self.channel._connparams()['db'], 123) + @skip_if_not_module('redis') def test_connparams_db_can_be_int(self): self.channel.connection.client.virtual_host = 124 self.assertEqual(self.channel._connparams()['db'], 124) @@ -630,6 +632,7 @@ class test_Channel(Case): redis.Channel._new_queue(self.channel, 'elaine', auto_delete=True) self.assertIn('elaine', self.channel.auto_delete_queues) + @skip_if_not_module('redis') def test_connparams_regular_hostname(self): self.channel.connection.client.hostname = 'george.vandelay.com' self.assertEqual( -- cgit v1.2.1 From fce718e9ae58d047ba743d492a7cd05a630b2692 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Sat, 19 Apr 2014 00:46:05 +0000 Subject: epoll unregister catches fileno TypeError --- kombu/utils/eventio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'kombu') diff --git a/kombu/utils/eventio.py b/kombu/utils/eventio.py index ed9ec06c..e4961cdc 100644 --- a/kombu/utils/eventio.py +++ b/kombu/utils/eventio.py @@ -83,7 +83,7 @@ class _epoll(Poller): def unregister(self, fd): try: self._epoll.unregister(fd) - except (socket.error, ValueError, KeyError): + except (socket.error, ValueError, KeyError, TypeError): pass except (IOError, OSError) as exc: if get_errno(exc) != errno.ENOENT: -- cgit v1.2.1 From ffd25eb6bceabbb9410040be42099f1f03fb716b Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Tue, 29 Apr 2014 11:03:11 +0100 Subject: Extra check for #342 --- kombu/transport/redis.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'kombu') diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 3ee049cb..c3f6decd 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -255,10 +255,9 @@ class MultiChannelPoller(object): self._channels.discard(channel) def _on_connection_disconnect(self, connection): - try: - self.poller.unregister(connection._sock) - except AttributeError: - pass + sock = getattr(connection, '_sock', None) + if sock is not None: + self.poller.unregister(sock) def _register(self, channel, client, type): if (channel, client, type) in self._chan_to_sock: -- cgit v1.2.1 From eae4dba4f298501a1b12119c3161912efc8edbb4 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Tue, 6 May 2014 14:34:13 +0100 Subject: Bumps version to 3.0.16 and updates Changelog --- kombu/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'kombu') diff --git a/kombu/__init__.py b/kombu/__init__.py index f6b73012..deac30c4 100644 --- a/kombu/__init__.py +++ b/kombu/__init__.py @@ -7,7 +7,7 @@ version_info_t = namedtuple( 'version_info_t', ('major', 'minor', 'micro', 'releaselevel', 'serial'), ) -VERSION = version_info_t(3, 0, 15, '', '') +VERSION = version_info_t(3, 0, 16, '', '') __version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION) __author__ = 'Ask Solem' __contact__ = 'ask@celeryproject.org' -- cgit v1.2.1 From 3dd3d71eb6ecb422ba73176ce3ed6ae62337d4f0 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Thu, 8 May 2014 09:06:49 +0400 Subject: Priority support for MongoDB transport. Transport table update. Priority conversion unification. --- kombu/tests/transport/virtual/test_base.py | 16 ++++++++++++++++ kombu/transport/beanstalk.py | 2 +- kombu/transport/mongodb.py | 9 ++++++--- kombu/transport/redis.py | 7 ++----- kombu/transport/virtual/__init__.py | 20 +++++++++++++++++++- kombu/transport/zookeeper.py | 8 +------- 6 files changed, 45 insertions(+), 17 deletions(-) (limited to 'kombu') diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py index d249c4e7..bdc7382b 100644 --- a/kombu/tests/transport/virtual/test_base.py +++ b/kombu/tests/transport/virtual/test_base.py @@ -515,6 +515,22 @@ class test_Channel(Case): with self.assertRaises(ChannelError): self.channel.queue_declare(queue='21wisdjwqe', passive=True) + def test_get_message_priority(self): + def _message(priority): + return self.channel.prepare_message('the message with priority', + priority=priority) + + self.assertEqual(self.channel._get_message_priority(_message(5)), + 5) + self.assertEqual(self.channel._get_message_priority(_message(self.channel.min_priority - 10)), + self.channel.min_priority) + self.assertEqual(self.channel._get_message_priority(_message(self.channel.max_priority + 10)), + self.channel.max_priority) + self.assertEqual(self.channel._get_message_priority(_message('foobar')), + self.channel.default_priority) + self.assertEqual(self.channel._get_message_priority(_message(2), reverse=True), + self.channel.max_priority - 2) + class test_Transport(Case): diff --git a/kombu/transport/beanstalk.py b/kombu/transport/beanstalk.py index 9dff8b49..544fd438 100644 --- a/kombu/transport/beanstalk.py +++ b/kombu/transport/beanstalk.py @@ -44,7 +44,7 @@ class Channel(virtual.Channel): def _put(self, queue, message, **kwargs): extra = {} - priority = message['properties']['delivery_info']['priority'] + priority = self._get_message_priority(message) ttr = message['properties'].get('ttr') if ttr is not None: extra['ttr'] = ttr diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index 78af0f9f..5f726d09 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -99,7 +99,8 @@ class Channel(virtual.Channel): else: msg = self.get_messages().find_and_modify( query={'queue': queue}, - sort={'_id': pymongo.ASCENDING}, + sort=[('priority', pymongo.ASCENDING), + ('_id', pymongo.ASCENDING)], remove=True, ) @@ -116,7 +117,9 @@ class Channel(virtual.Channel): def _put(self, queue, message, **kwargs): self.get_messages().insert({'payload': dumps(message), - 'queue': queue}) + 'queue': queue, + 'priority': self._get_message_priority(message, + reverse=True)}) def _purge(self, queue): size = self._size(queue) @@ -202,7 +205,7 @@ class Channel(virtual.Channel): def _ensure_indexes(self): '''Ensure indexes on collections.''' self.get_messages().ensure_index( - [('queue', 1), ('_id', 1)], background=True, + [('queue', 1), ('priority', 1), ('_id', 1)], background=True, ) self.get_broadcast().ensure_index([('queue', 1)]) self.get_routing().ensure_index([('queue', 1), ('exchange', 1)]) diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 3ee049cb..164c8c2c 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -662,11 +662,8 @@ class Channel(virtual.Channel): def _put(self, queue, message, **kwargs): """Deliver message.""" - try: - pri = max(min(int( - message['properties']['delivery_info']['priority']), 9), 0) - except (TypeError, ValueError, KeyError): - pri = 0 + pri = self._get_message_priority(message) + with self.conn_or_acquire() as client: client.lpush(self._q_for_pri(queue, pri), dumps(message)) diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index cb844de9..2e34cab7 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -367,6 +367,11 @@ class Channel(AbstractChannel, base.StdChannel): # List of options to transfer from :attr:`transport_options`. from_transport_options = ('body_encoding', 'deadletter_queue') + # Priority defaults + default_priority = 0 + min_priority = 0 + max_priority = 9 + def __init__(self, connection, **kwargs): self.connection = connection self._consumers = set() @@ -653,7 +658,7 @@ class Channel(AbstractChannel, base.StdChannel): """Prepare message data.""" properties = properties or {} info = properties.setdefault('delivery_info', {}) - info['priority'] = priority or 0 + info['priority'] = priority or self.default_priority return {'body': body, 'content-encoding': content_encoding, @@ -723,6 +728,19 @@ class Channel(AbstractChannel, base.StdChannel): self._reset_cycle() return self._cycle + def _get_message_priority(self, message, reverse=False): + """Gets priority from message and converts it to the bounds: [0, 9]. + Higher value has more priority. + """ + try: + priority = max(min(int(message['properties']['delivery_info']['priority']), + self.max_priority), + self.min_priority) + except (TypeError, ValueError, KeyError): + priority = self.default_priority + + return (self.max_priority - priority) if reverse else priority + class Management(base.Management): diff --git a/kombu/transport/zookeeper.py b/kombu/transport/zookeeper.py index 2d1c8abc..6645507a 100644 --- a/kombu/transport/zookeeper.py +++ b/kombu/transport/zookeeper.py @@ -37,7 +37,6 @@ from kombu.utils.encoding import bytes_to_str from . import virtual -MAX_PRIORITY = 9 try: import kazoo @@ -103,13 +102,8 @@ class Channel(virtual.Channel): return queue def _put(self, queue, message, **kwargs): - try: - priority = message['properties']['delivery_info']['priority'] - except KeyError: - priority = 0 - queue = self._get_queue(queue) - queue.put(dumps(message), priority=(MAX_PRIORITY - priority)) + queue.put(dumps(message), priority=self._get_message_priority(message, reverse=True)) def _get(self, queue): queue = self._get_queue(queue) -- cgit v1.2.1 From 816627fa17e3b0618c329bb8de5e3fbf35312a7a Mon Sep 17 00:00:00 2001 From: Roger Hu Date: Thu, 8 May 2014 00:27:32 +0000 Subject: Be selective about how file descriptors are removed since they may be reused for a different purpose. Kombu was removing them after they were being reused by another worker process. --- kombu/async/hub.py | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) (limited to 'kombu') diff --git a/kombu/async/hub.py b/kombu/async/hub.py index 673a60ac..306fdd31 100644 --- a/kombu/async/hub.py +++ b/kombu/async/hub.py @@ -287,20 +287,26 @@ class Hub(object): to_consolidate.append(fileno) continue cb = cbargs = None - try: - if event & READ: + + if event & READ: + try: cb, cbargs = readers[fileno] - elif event & WRITE: + except KeyError: + self.remove_reader(fileno) + continue + elif event & WRITE: + try: cb, cbargs = writers[fileno] - elif event & ERR: - try: - cb, cbargs = (readers.get(fileno) or - writers.get(fileno)) - except TypeError: - pass - except (KeyError, Empty): - hub_remove(fileno) - continue + except KeyError: + self.remove_writer(fileno) + continue + elif event & ERR: + try: + cb, cbargs = (readers.get(fileno) or + writers.get(fileno)) + except TypeError: + pass + if cb is None: continue if isinstance(cb, generator): -- cgit v1.2.1 From 0b05298377488e411e3204d0e93bbad62b1fd68a Mon Sep 17 00:00:00 2001 From: Roger Hu Date: Thu, 8 May 2014 00:27:32 +0000 Subject: Be selective about how file descriptors are removed since they may be reused for a different purpose. Kombu was removing them after they were being reused by another worker process. --- kombu/async/hub.py | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) (limited to 'kombu') diff --git a/kombu/async/hub.py b/kombu/async/hub.py index 673a60ac..306fdd31 100644 --- a/kombu/async/hub.py +++ b/kombu/async/hub.py @@ -287,20 +287,26 @@ class Hub(object): to_consolidate.append(fileno) continue cb = cbargs = None - try: - if event & READ: + + if event & READ: + try: cb, cbargs = readers[fileno] - elif event & WRITE: + except KeyError: + self.remove_reader(fileno) + continue + elif event & WRITE: + try: cb, cbargs = writers[fileno] - elif event & ERR: - try: - cb, cbargs = (readers.get(fileno) or - writers.get(fileno)) - except TypeError: - pass - except (KeyError, Empty): - hub_remove(fileno) - continue + except KeyError: + self.remove_writer(fileno) + continue + elif event & ERR: + try: + cb, cbargs = (readers.get(fileno) or + writers.get(fileno)) + except TypeError: + pass + if cb is None: continue if isinstance(cb, generator): -- cgit v1.2.1 From 55197ee34198b2ccbe8ccc57bac336b9dd42e31b Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Mon, 19 May 2014 22:27:36 +0100 Subject: master now depends on Python 2.7 --- kombu/__init__.py | 6 +++--- kombu/abstract.py | 6 +++--- kombu/async/timer.py | 3 +-- kombu/connection.py | 2 +- kombu/five.py | 15 +++++---------- kombu/pidbox.py | 6 +++--- kombu/serialization.py | 5 +++-- kombu/tests/case.py | 3 ++- kombu/tests/test_messaging.py | 2 +- kombu/tests/transport/test_base.py | 4 ++-- kombu/tests/utils/test_utils.py | 20 ++------------------ kombu/transport/SLMQ.py | 5 +++-- kombu/transport/SQS.py | 7 ++++--- kombu/transport/redis.py | 4 ++-- kombu/transport/virtual/__init__.py | 6 +++--- kombu/utils/__init__.py | 18 +----------------- kombu/utils/compat.py | 34 ---------------------------------- kombu/utils/eventio.py | 4 ++-- kombu/utils/url.py | 4 +--- 19 files changed, 42 insertions(+), 112 deletions(-) (limited to 'kombu') diff --git a/kombu/__init__.py b/kombu/__init__.py index deac30c4..da1b6b2c 100644 --- a/kombu/__init__.py +++ b/kombu/__init__.py @@ -7,7 +7,7 @@ version_info_t = namedtuple( 'version_info_t', ('major', 'minor', 'micro', 'releaselevel', 'serial'), ) -VERSION = version_info_t(3, 0, 16, '', '') +VERSION = version_info_t(3, 1, 0, 'a1', '') __version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION) __author__ = 'Ask Solem' __contact__ = 'ask@celeryproject.org' @@ -19,8 +19,8 @@ __docformat__ = 'restructuredtext en' import os import sys -if sys.version_info < (2, 6): # pragma: no cover - raise Exception('Kombu 3.1 requires Python versions 2.6 or later.') +if sys.version_info < (2, 7): # pragma: no cover + raise Exception('Kombu 3.1 requires Python versions 2.7 or later.') STATICA_HACK = True globals()['kcah_acitats'[::-1].upper()] = False diff --git a/kombu/abstract.py b/kombu/abstract.py index 6dff8486..31bb547e 100644 --- a/kombu/abstract.py +++ b/kombu/abstract.py @@ -42,9 +42,9 @@ class Object(object): if recurse and isinstance(obj, Object): return obj.as_dict(recurse=True) return type(obj) if type else obj - return dict( - (attr, f(getattr(self, attr), type)) for attr, type in self.attrs - ) + return { + attr: f(getattr(self, attr), type) for attr, type in self.attrs + } def __reduce__(self): return unpickle_dict, (self.__class__, self.as_dict()) diff --git a/kombu/async/timer.py b/kombu/async/timer.py index 00f54123..09b6cf76 100644 --- a/kombu/async/timer.py +++ b/kombu/async/timer.py @@ -19,7 +19,6 @@ from weakref import proxy as weakrefproxy from kombu.five import monotonic from kombu.log import get_logger -from kombu.utils.compat import timedelta_seconds try: from pytz import utc @@ -41,7 +40,7 @@ def to_timestamp(d, default_timezone=utc): if isinstance(d, datetime): if d.tzinfo is None: d = d.replace(tzinfo=default_timezone) - return timedelta_seconds(d - EPOCH) + return max((d - EPOCH).total_seconds(), 0) return d diff --git a/kombu/connection.py b/kombu/connection.py index 85b8f5e9..7ddbf772 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -10,6 +10,7 @@ from __future__ import absolute_import import os import socket +from collections import OrderedDict from contextlib import contextmanager from functools import partial from itertools import count, cycle @@ -26,7 +27,6 @@ from .five import Empty, range, string_t, text_t, LifoQueue as _LifoQueue from .log import get_logger from .transport import get_transport_cls, supports_librabbitmq from .utils import cached_property, retry_over_time, shufflecycle, HashedSeq -from .utils.compat import OrderedDict from .utils.functional import lazy from .utils.url import parse_url, urlparse diff --git a/kombu/five.py b/kombu/five.py index 33c77fe5..3ea7ff0f 100644 --- a/kombu/five.py +++ b/kombu/five.py @@ -96,7 +96,7 @@ if PY3: # pragma: no cover from queue import Queue, Empty, Full, LifoQueue from itertools import zip_longest - from io import StringIO, BytesIO + from io import StringIO map = map zip = zip @@ -143,10 +143,7 @@ else: izip as zip, izip_longest as zip_longest, ) - try: - from cStringIO import StringIO # noqa - except ImportError: # pragma: no cover - from StringIO import StringIO # noqa + from io import StringIO as WhateverIO string = unicode # noqa string_t = basestring # noqa @@ -183,10 +180,8 @@ else: exec_("""def reraise(tp, value, tb=None): raise tp, value, tb""") - BytesIO = WhateverIO = StringIO # noqa - -def with_metaclass(Type, skip_attrs=set(['__dict__', '__weakref__'])): +def with_metaclass(Type, skip_attrs={'__dict__', '__weakref__'}): """Class decorator to set metaclass. Works with both Python 3 and Python 3 and it does not add @@ -196,8 +191,8 @@ def with_metaclass(Type, skip_attrs=set(['__dict__', '__weakref__'])): """ def _clone_with_metaclass(Class): - attrs = dict((key, value) for key, value in items(vars(Class)) - if key not in skip_attrs) + attrs = {key: value for key, value in items(vars(Class)) + if key not in skip_attrs} return Type(Class.__name__, Class.__bases__, attrs) return _clone_with_metaclass diff --git a/kombu/pidbox.py b/kombu/pidbox.py index 5c70a382..f6dca574 100644 --- a/kombu/pidbox.py +++ b/kombu/pidbox.py @@ -22,7 +22,7 @@ from .common import maybe_declare, oid_from from .exceptions import InconsistencyError from .five import range from .log import get_logger -from .utils import cached_property, kwdict, uuid, reprcall +from .utils import cached_property, uuid, reprcall REPLY_QUEUE_EXPIRES = 10 @@ -102,7 +102,7 @@ class Node(object): reprcall(method, (), kwargs=arguments), reply_to, ticket) handle = reply_to and self.handle_call or self.handle_cast try: - reply = handle(method, kwdict(arguments)) + reply = handle(method, arguments) except SystemExit: raise except Exception as exc: @@ -130,7 +130,7 @@ class Node(object): if message: self.adjust_clock(message.headers.get('clock') or 0) if not destination or self.hostname in destination: - return self.dispatch(**kwdict(body)) + return self.dispatch(**body) dispatch_from_message = handle_message def reply(self, data, exchange, routing_key, ticket, **kwargs): diff --git a/kombu/serialization.py b/kombu/serialization.py index 47b8f91b..b7ce2c27 100644 --- a/kombu/serialization.py +++ b/kombu/serialization.py @@ -19,11 +19,12 @@ except ImportError: # pragma: no cover from collections import namedtuple from contextlib import contextmanager +from io import BytesIO from .exceptions import ( ContentDisallowed, DecodeError, EncodeError, SerializerNotInstalled ) -from .five import BytesIO, reraise, text_t +from .five import reraise, text_t from .utils import entrypoints from .utils.encoding import str_to_bytes, bytes_t @@ -451,5 +452,5 @@ for ep, args in entrypoints('kombu.serializers'): # pragma: no cover def prepare_accept_content(l, name_to_type=registry.name_to_type): if l is not None: - return set(n if '/' in n else name_to_type[n] for n in l) + return {n if '/' in n else name_to_type[n] for n in l} return l diff --git a/kombu/tests/case.py b/kombu/tests/case.py index e8b6d32b..c265cb91 100644 --- a/kombu/tests/case.py +++ b/kombu/tests/case.py @@ -5,12 +5,13 @@ import sys import types from functools import wraps +from io import StringIO import mock from nose import SkipTest -from kombu.five import builtins, string_t, StringIO +from kombu.five import builtins, string_t from kombu.utils.encoding import ensure_bytes try: diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py index c9573c2c..0d68e643 100644 --- a/kombu/tests/test_messaging.py +++ b/kombu/tests/test_messaging.py @@ -230,7 +230,7 @@ class test_Consumer(Case): b = Consumer(self.connection, accept=['json', 'pickle']) self.assertSetEqual( b.accept, - set(['application/json', 'application/x-python-serialize']), + {'application/json', 'application/x-python-serialize'}, ) c = Consumer(self.connection, accept=b.accept) self.assertSetEqual(b.accept, c.accept) diff --git a/kombu/tests/transport/test_base.py b/kombu/tests/transport/test_base.py index 5c4a50d5..4405d376 100644 --- a/kombu/tests/transport/test_base.py +++ b/kombu/tests/transport/test_base.py @@ -52,7 +52,7 @@ class test_Message(Case): m.ack() def test_ack_respects_no_ack_consumers(self): - self.channel.no_ack_consumers = set(['abc']) + self.channel.no_ack_consumers = {'abc'} self.message.delivery_info['consumer_tag'] = 'abc' ack = self.channel.basic_ack = Mock() @@ -61,7 +61,7 @@ class test_Message(Case): self.assertFalse(ack.called) def test_ack_missing_consumer_tag(self): - self.channel.no_ack_consumers = set(['abc']) + self.channel.no_ack_consumers = {'abc'} self.message.delivery_info = {} ack = self.channel.basic_ack = Mock() diff --git a/kombu/tests/utils/test_utils.py b/kombu/tests/utils/test_utils.py index 0248a303..0cb5068f 100644 --- a/kombu/tests/utils/test_utils.py +++ b/kombu/tests/utils/test_utils.py @@ -5,11 +5,7 @@ import pickle import sys from functools import wraps - -if sys.version_info >= (3, 0): - from io import StringIO, BytesIO -else: - from StringIO import StringIO, StringIO as BytesIO # noqa +from io import StringIO, BytesIO from kombu import version_info_t from kombu import utils @@ -102,18 +98,6 @@ class test_UUID(Case): sys.modules['celery.utils'] = old_utils -class test_Misc(Case): - - def test_kwdict(self): - - def f(**kwargs): - return kwargs - - kw = {'foo': 'foo', - 'bar': 'bar'} - self.assertTrue(f(**utils.kwdict(kw))) - - class MyStringIO(StringIO): def close(self): @@ -369,7 +353,7 @@ class test_shufflecycle(Case): prev_repeat, utils.repeat = utils.repeat, Mock() try: utils.repeat.return_value = list(range(10)) - values = set(['A', 'B', 'C']) + values = {'A', 'B', 'C'} cycle = utils.shufflecycle(values) seen = set() for i in range(10): diff --git a/kombu/transport/SLMQ.py b/kombu/transport/SLMQ.py index 449bc2fb..61756a7e 100644 --- a/kombu/transport/SLMQ.py +++ b/kombu/transport/SLMQ.py @@ -27,8 +27,9 @@ except ImportError: # pragma: no cover get_client = ResponseError = None # noqa # dots are replaced by dash, all other punctuation replaced by underscore. -CHARS_REPLACE_TABLE = dict( - (ord(c), 0x5f) for c in string.punctuation if c not in '_') +CHARS_REPLACE_TABLE = { + ord(c): 0x5f for c in string.punctuation if c not in '_' +} class Channel(virtual.Channel): diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index 68cb053c..790f4270 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -66,8 +66,9 @@ logger = get_logger(__name__) # dots are replaced by dash, all other punctuation # replaced by underscore. -CHARS_REPLACE_TABLE = dict((ord(c), 0x5f) - for c in string.punctuation if c not in '-_.') +CHARS_REPLACE_TABLE = { + ord(c): 0x5f for c in string.punctuation if c not in '-_.' +} CHARS_REPLACE_TABLE[0x2e] = 0x2d # '.' -> '-' @@ -154,7 +155,7 @@ class Table(Domain): return item def get_exchanges(self): - return list(set(i['exchange'] for i in self.select())) + return list({i['exchange'] for i in self.select()}) def _get_queue_item(self, queue): return self._try_first("""WHERE queue = '%s' limit 1""" % queue) diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index c3f6decd..b93fa7af 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -896,8 +896,8 @@ class Channel(virtual.Channel): @property def active_queues(self): """Set of queues being consumed from (excluding fanout queues).""" - return set(queue for queue in self._active_queues - if queue not in self.active_fanout_queues) + return {queue for queue in self._active_queues + if queue not in self.active_fanout_queues} class Transport(virtual.Transport): diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index cb844de9..35f46bd3 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -15,6 +15,7 @@ import sys import warnings from array import array +from collections import OrderedDict from itertools import count from multiprocessing.util import Finalize from time import sleep @@ -23,8 +24,7 @@ from amqp.protocol import queue_declare_ok_t from kombu.exceptions import ResourceError, ChannelError from kombu.five import Empty, items, monotonic -from kombu.utils import emergency_dump_state, kwdict, say, uuid -from kombu.utils.compat import OrderedDict +from kombu.utils import emergency_dump_state, say, uuid from kombu.utils.encoding import str_to_bytes, bytes_to_str from kombu.transport import base @@ -253,7 +253,7 @@ class Message(base.Message): 'delivery_info': properties.get('delivery_info'), 'postencode': 'utf-8', }) - super(Message, self).__init__(channel, **kwdict(kwargs)) + super(Message, self).__init__(channel, **kwargs) def serializable(self): props = self.properties diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py index 0745ddfe..43e1322d 100644 --- a/kombu/utils/__init__.py +++ b/kombu/utils/__init__.py @@ -35,7 +35,7 @@ except ImportError: # pragma: no cover FILENO_ERRORS = (AttributeError, ValueError) # noqa -__all__ = ['EqualityDict', 'say', 'uuid', 'kwdict', 'maybe_list', +__all__ = ['EqualityDict', 'say', 'uuid', 'maybe_list', 'fxrange', 'fxrangemax', 'retry_over_time', 'emergency_dump_state', 'cached_property', 'reprkwargs', 'reprcall', 'nested', 'fileno', 'maybe_fileno'] @@ -159,22 +159,6 @@ def uuid(): gen_unique_id = uuid -if sys.version_info >= (2, 6, 5): - - def kwdict(kwargs): - return kwargs -else: - def kwdict(kwargs): # pragma: no cover # noqa - """Make sure keyword arguments are not in Unicode. - - This should be fixed in newer Python versions, - see: http://bugs.python.org/issue4978. - - """ - return dict((key.encode('utf-8'), value) - for key, value in items(kwargs)) - - def maybe_list(v): if v is None: return [] diff --git a/kombu/utils/compat.py b/kombu/utils/compat.py index 16028997..a306c672 100644 --- a/kombu/utils/compat.py +++ b/kombu/utils/compat.py @@ -8,34 +8,6 @@ Helps compatibility with older Python versions. from __future__ import absolute_import -############## timedelta_seconds() -> delta.total_seconds #################### -from datetime import timedelta - -HAVE_TIMEDELTA_TOTAL_SECONDS = hasattr(timedelta, 'total_seconds') - - -if HAVE_TIMEDELTA_TOTAL_SECONDS: # pragma: no cover - - def timedelta_seconds(delta): - """Convert :class:`datetime.timedelta` to seconds. - - Doesn't account for negative values. - - """ - return max(delta.total_seconds(), 0) - -else: # pragma: no cover - - def timedelta_seconds(delta): # noqa - """Convert :class:`datetime.timedelta` to seconds. - - Doesn't account for negative values. - - """ - if delta.days < 0: - return 0 - return delta.days * 86400 + delta.seconds + (delta.microseconds / 10e5) - ############## socket.error.errno ############################################ @@ -52,9 +24,3 @@ def get_errno(exc): except AttributeError: pass return 0 - -############## collections.OrderedDict ####################################### -try: - from collections import OrderedDict -except ImportError: - from ordereddict import OrderedDict # noqa diff --git a/kombu/utils/eventio.py b/kombu/utils/eventio.py index e4961cdc..fdb24b6e 100644 --- a/kombu/utils/eventio.py +++ b/kombu/utils/eventio.py @@ -53,9 +53,9 @@ WRITE = POLL_WRITE = 0x004 ERR = POLL_ERR = 0x008 | 0x010 try: - SELECT_BAD_FD = set((errno.EBADF, errno.WSAENOTSOCK)) + SELECT_BAD_FD = {errno.EBADF, errno.WSAENOTSOCK} except AttributeError: - SELECT_BAD_FD = set((errno.EBADF,)) + SELECT_BAD_FD = {errno.EBADF} class Poller(object): diff --git a/kombu/utils/url.py b/kombu/utils/url.py index 8839fca2..9f349d9f 100644 --- a/kombu/utils/url.py +++ b/kombu/utils/url.py @@ -6,8 +6,6 @@ except ImportError: from urllib import unquote # noqa from urlparse import urlparse, parse_qsl # noqa -from . import kwdict - def _parse_url(url): scheme = urlparse(url).scheme @@ -28,7 +26,7 @@ def _parse_url(url): unquote(parts.username or '') or None, unquote(parts.password or '') or None, unquote(path or '') or None, - kwdict(dict(parse_qsl(parts.query)))) + dict(parse_qsl(parts.query))) def parse_url(url): -- cgit v1.2.1 From 4b26895232ed2fcee9dcf33c1ddf04d876899321 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Mon, 19 May 2014 22:53:12 +0100 Subject: Using print function --- kombu/tests/test_serialization.py | 4 ---- kombu/tests/transport/virtual/test_base.py | 6 +++--- kombu/transport/virtual/__init__.py | 20 ++++++++++++-------- kombu/utils/__init__.py | 19 ++++++++++--------- 4 files changed, 25 insertions(+), 24 deletions(-) (limited to 'kombu') diff --git a/kombu/tests/test_serialization.py b/kombu/tests/test_serialization.py index 23120711..c048f799 100644 --- a/kombu/tests/test_serialization.py +++ b/kombu/tests/test_serialization.py @@ -69,10 +69,6 @@ ApVGggcXVpY2sgYnJvd24gZm94IGp1bXBzIG92ZXIgdGggbGF6eSBkb2c=\ """)) -def say(m): - sys.stderr.write('%s\n' % (m, )) - - registry.register('testS', lambda s: s, lambda s: 'decoded', 'application/testS', 'utf-8') diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py index d249c4e7..fbe8f064 100644 --- a/kombu/tests/transport/virtual/test_base.py +++ b/kombu/tests/transport/virtual/test_base.py @@ -393,8 +393,8 @@ class test_Channel(Case): self.assertFalse(q._delivered) @patch('kombu.transport.virtual.emergency_dump_state') - @patch('kombu.transport.virtual.say') - def test_restore_unacked_once_when_unrestored(self, say, + @patch('__builtin__.print') + def test_restore_unacked_once_when_unrestored(self, print_, emergency_dump_state): q = self.channel.qos q._flush = Mock() @@ -413,7 +413,7 @@ class test_Channel(Case): self.channel.do_restore = True q.restore_unacked_once() - self.assertTrue(say.called) + self.assertTrue(print_.called) self.assertTrue(emergency_dump_state.called) def test_basic_recover(self): diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index 35f46bd3..5a05386e 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -7,7 +7,7 @@ Virtual transport implementation. Emulates the AMQ API for non-AMQ transports. """ -from __future__ import absolute_import, unicode_literals +from __future__ import absolute_import, print_function, unicode_literals import base64 import socket @@ -24,7 +24,7 @@ from amqp.protocol import queue_declare_ok_t from kombu.exceptions import ResourceError, ChannelError from kombu.five import Empty, items, monotonic -from kombu.utils import emergency_dump_state, say, uuid +from kombu.utils import emergency_dump_state, uuid from kombu.utils.encoding import str_to_bytes, bytes_to_str from kombu.transport import base @@ -44,6 +44,9 @@ Cannot redeclare exchange {0!r} in vhost {1!r} with \ different type, durable, autodelete or arguments value.\ """ +RESTORING_FMT = 'Restoring {0!r} unacknowledged message(s)' +RESTORE_PANIC_FMT = 'UNABLE TO RESTORE {0} MESSAGES: {1}' + class Base64(object): @@ -196,7 +199,7 @@ class QoS(object): delivered.clear() return errors - def restore_unacked_once(self): + def restore_unacked_once(self, stderr=None): """Restores all unacknowledged messages at shutdown/gc collect. Will only be done once for each instance. @@ -204,6 +207,7 @@ class QoS(object): """ self._on_collect.cancel() self._flush() + stderr = sys.stderr if stderr is None else stderr state = self._delivered if not self.restore_at_shutdown or not self.channel.do_restore: @@ -213,15 +217,15 @@ class QoS(object): return try: if state: - say('Restoring {0!r} unacknowledged message(s).', - len(self._delivered)) + print(RESTORING_FMT.format(len(self._delivered)), + file=stderr) unrestored = self.restore_unacked() if unrestored: errors, messages = list(zip(*unrestored)) - say('UNABLE TO RESTORE {0} MESSAGES: {1}', - len(errors), errors) - emergency_dump_state(messages) + print(RESTORE_PANIC_FMT.format(len(errors), errors), + file=stderr) + emergency_dump_state(messages, stderr=stderr) finally: state.restored = True diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py index 43e1322d..ba4f88be 100644 --- a/kombu/utils/__init__.py +++ b/kombu/utils/__init__.py @@ -5,7 +5,7 @@ kombu.utils Internal utilities. """ -from __future__ import absolute_import, print_function +from __future__ import absolute_import, print_function, unicode_literals import importlib import numbers @@ -35,7 +35,7 @@ except ImportError: # pragma: no cover FILENO_ERRORS = (AttributeError, ValueError) # noqa -__all__ = ['EqualityDict', 'say', 'uuid', 'maybe_list', +__all__ = ['EqualityDict', 'uuid', 'maybe_list', 'fxrange', 'fxrangemax', 'retry_over_time', 'emergency_dump_state', 'cached_property', 'reprkwargs', 'reprcall', 'nested', 'fileno', 'maybe_fileno'] @@ -136,10 +136,6 @@ class EqualityDict(dict): return dict.__delitem__(self, eqhash(key)) -def say(m, *fargs, **fkwargs): - print(str(m).format(*fargs, **fkwargs), file=sys.stderr) - - def uuid4(): # Workaround for http://bugs.python.org/issue4607 if ctypes and _uuid_generate_random: # pragma: no cover @@ -241,21 +237,26 @@ def retry_over_time(fun, catch, args=[], kwargs={}, errback=None, sleep(abs(int(tts) - tts)) -def emergency_dump_state(state, open_file=open, dump=None): +def emergency_dump_state(state, open_file=open, dump=None, stderr=None): from pprint import pformat from tempfile import mktemp + stderr = sys.stderr if stderr is None else stderr if dump is None: import pickle dump = pickle.dump persist = mktemp() - say('EMERGENCY DUMP STATE TO FILE -> {0} <-', persist) + print('EMERGENCY DUMP STATE TO FILE -> {0} <-'.format(persist), + file=stderr) fh = open_file(persist, 'w') try: try: dump(state, fh, protocol=0) except Exception as exc: - say('Cannot pickle state: {0!r}. Fallback to pformat.', exc) + print( + 'Cannot pickle state: {0!r}. Fallback to pformat.'.format(exc), + file=stderr, + ) fh.write(default_encode(pformat(state))) finally: fh.flush() -- cgit v1.2.1 From df3fa3c2d50f8ccb90b2d736b9bf7c73abf838bc Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Tue, 20 May 2014 18:19:14 +0100 Subject: parse_url function should not know about mongodb, and the transport already sets can_parse_url --- kombu/tests/test_connection.py | 4 ---- kombu/utils/url.py | 10 +--------- 2 files changed, 1 insertion(+), 13 deletions(-) (limited to 'kombu') diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py index 58790db0..147faaf0 100644 --- a/kombu/tests/test_connection.py +++ b/kombu/tests/test_connection.py @@ -31,10 +31,6 @@ class test_connection_utils(Case): result = parse_url(self.url) self.assertDictEqual(result, self.expected) - def test_parse_url_mongodb(self): - result = parse_url('mongodb://example.com/') - self.assertEqual(result['hostname'], 'example.com/') - def test_parse_generated_as_uri(self): conn = Connection(self.url) info = conn.info() diff --git a/kombu/utils/url.py b/kombu/utils/url.py index 9f349d9f..209d549c 100644 --- a/kombu/utils/url.py +++ b/kombu/utils/url.py @@ -12,17 +12,9 @@ def _parse_url(url): schemeless = url[len(scheme) + 3:] # parse with HTTP URL semantics parts = urlparse('http://' + schemeless) - - # The first pymongo.Connection() argument (host) can be - # a mongodb connection URI. If this is the case, don't - # use port but let pymongo get the port(s) from the URI instead. - # This enables the use of replica sets and sharding. - # See pymongo.Connection() for more info. - port = scheme != 'mongodb' and parts.port or None - hostname = schemeless if scheme == 'mongodb' else parts.hostname path = parts.path or '' path = path[1:] if path and path[0] == '/' else path - return (scheme, unquote(hostname or '') or None, port, + return (scheme, unquote(parts.hostname or '') or None, parts.port, unquote(parts.username or '') or None, unquote(parts.password or '') or None, unquote(path or '') or None, -- cgit v1.2.1 From 14ecb44a080720d3d395f59834881e8099c26d1a Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Tue, 20 May 2014 18:41:26 +0100 Subject: flakes --- kombu/five.py | 2 +- kombu/utils/__init__.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'kombu') diff --git a/kombu/five.py b/kombu/five.py index 3ea7ff0f..2bd631fb 100644 --- a/kombu/five.py +++ b/kombu/five.py @@ -143,7 +143,7 @@ else: izip as zip, izip_longest as zip_longest, ) - from io import StringIO as WhateverIO + from io import StringIO as WhateverIO # noqa string = unicode # noqa string_t = basestring # noqa diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py index ba4f88be..0fe80b0c 100644 --- a/kombu/utils/__init__.py +++ b/kombu/utils/__init__.py @@ -246,14 +246,14 @@ def emergency_dump_state(state, open_file=open, dump=None, stderr=None): import pickle dump = pickle.dump persist = mktemp() - print('EMERGENCY DUMP STATE TO FILE -> {0} <-'.format(persist), + print('EMERGENCY DUMP STATE TO FILE -> {0} <-'.format(persist), ## noqa file=stderr) fh = open_file(persist, 'w') try: try: dump(state, fh, protocol=0) except Exception as exc: - print( + print( # noqa 'Cannot pickle state: {0!r}. Fallback to pformat.'.format(exc), file=stderr, ) -- cgit v1.2.1 From f4ef17236e0085b0d948162cfbaa6d42935e2dca Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Tue, 20 May 2014 18:41:44 +0100 Subject: Adds sanitize_url and as_url to kombu.utils.url --- kombu/connection.py | 40 +++++++++++----------------------------- kombu/utils/url.py | 32 ++++++++++++++++++++++++++++++-- 2 files changed, 41 insertions(+), 31 deletions(-) (limited to 'kombu') diff --git a/kombu/connection.py b/kombu/connection.py index 7ddbf772..63b4b685 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -12,13 +12,8 @@ import socket from collections import OrderedDict from contextlib import contextmanager -from functools import partial from itertools import count, cycle from operator import itemgetter -try: - from urllib.parse import quote -except ImportError: # Py2 - from urllib import quote # noqa # jython breaks on relative import for .exceptions for some reason # (Issue #112) @@ -28,7 +23,7 @@ from .log import get_logger from .transport import get_transport_cls, supports_librabbitmq from .utils import cached_property, retry_over_time, shufflecycle, HashedSeq from .utils.functional import lazy -from .utils.url import parse_url, urlparse +from .utils.url import as_url, parse_url, quote, urlparse __all__ = ['Connection', 'ConnectionPool', 'ChannelPool'] @@ -569,36 +564,23 @@ class Connection(object): self.password, self.virtual_host, self.port, repr(self.transport_options)) - def as_uri(self, include_password=False, mask=''): + def as_uri(self, include_password=False, mask='**', + getfields=itemgetter('port', 'userid', 'password', + 'virtual_host', 'transport')): """Convert connection parameters to URL form.""" hostname = self.hostname or 'localhost' if self.transport.can_parse_url: if self.uri_prefix: return '%s+%s' % (self.uri_prefix, hostname) return self.hostname - quoteS = partial(quote, safe='') # strict quote fields = self.info() - port, userid, password, transport = itemgetter( - 'port', 'userid', 'password', 'transport' - )(fields) - url = '%s://' % transport - if userid or password: - if userid: - url += quoteS(userid) - if password: - if include_password: - url += ':' + quoteS(password) - else: - url += ':' + mask if mask else '' - url += '@' - url += quoteS(fields['hostname']) - if port: - url += ':%s' % (port, ) - - url += '/' + quote(fields['virtual_host']) - if self.uri_prefix: - return '%s+%s' % (self.uri_prefix, url) - return url + port, userid, password, vhost, transport = getfields(fields) + scheme = ('{0}+{1}'.format(self.uri_prefix, transport) + if self.uri_prefix else transport) + return as_url( + scheme, hostname, port, userid, password, quote(vhost), + sanitize=not include_password, mask=mask, + ) def Pool(self, limit=None, preload=None): """Pool of connections. diff --git a/kombu/utils/url.py b/kombu/utils/url.py index 209d549c..8f8b5290 100644 --- a/kombu/utils/url.py +++ b/kombu/utils/url.py @@ -1,11 +1,15 @@ from __future__ import absolute_import +from functools import partial + try: - from urllib.parse import unquote, urlparse, parse_qsl + from urllib.parse import parse_qsl, quote, unquote, urlparse except ImportError: - from urllib import unquote # noqa + from urllib import quote, unquote # noqa from urlparse import urlparse, parse_qsl # noqa +safequote = partial(quote, safe='') + def _parse_url(url): scheme = urlparse(url).scheme @@ -26,3 +30,27 @@ def parse_url(url): return dict(transport=scheme, hostname=host, port=port, userid=user, password=password, virtual_host=path, **query) + + +def as_url(scheme, host=None, port=None, user=None, password=None, + path=None, query=None, sanitize=False, mask='**'): + parts = ['{0}://'.format(scheme)] + if user or password: + if user: + parts.append(safequote(user)) + if password: + if sanitize: + parts.extend([':', mask] if mask else [':']) + else: + parts.extend([':', safequote(password)]) + parts.append('@') + parts.append(safequote(host)) + if port: + parts.extend([':', port]) + parts.extend(['/', path]) + return ''.join(str(part) for part in parts if part) + + +def sanitize_url(url, mask='**'): + return as_url(*_parse_url(url), sanitize=True, mask=mask) + -- cgit v1.2.1 From 66419eb780c8392286212c7a73c525277b10c970 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Tue, 20 May 2014 19:00:24 +0100 Subject: Adds maybe_sanitize_url to kombu.utils.url --- kombu/utils/url.py | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'kombu') diff --git a/kombu/utils/url.py b/kombu/utils/url.py index 8f8b5290..5e6ec52d 100644 --- a/kombu/utils/url.py +++ b/kombu/utils/url.py @@ -8,6 +8,8 @@ except ImportError: from urllib import quote, unquote # noqa from urlparse import urlparse, parse_qsl # noqa +from kombu.five import string_t + safequote = partial(quote, safe='') @@ -54,3 +56,9 @@ def as_url(scheme, host=None, port=None, user=None, password=None, def sanitize_url(url, mask='**'): return as_url(*_parse_url(url), sanitize=True, mask=mask) + +def maybe_sanitize_url(url, mask='**'): + if isinstance(url, string_t) and '://' in url: + return sanitize_url(url, mask) + return url + -- cgit v1.2.1 From 81ccd384aaa218c019006e9428b3b0fc5f54e9f1 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Tue, 20 May 2014 19:05:01 +0100 Subject: Tests passing --- kombu/tests/test_connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'kombu') diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py index 147faaf0..6bd3303f 100644 --- a/kombu/tests/test_connection.py +++ b/kombu/tests/test_connection.py @@ -17,7 +17,7 @@ class test_connection_utils(Case): def setUp(self): self.url = 'amqp://user:pass@localhost:5672/my/vhost' - self.nopass = 'amqp://user@localhost:5672/my/vhost' + self.nopass = 'amqp://user:**@localhost:5672/my/vhost' self.expected = { 'transport': 'amqp', 'userid': 'user', -- cgit v1.2.1 From 3d963c1eb641e593469aa28e53db447c8f895115 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Tue, 20 May 2014 21:49:47 +0100 Subject: Removes anyjson dependency :sad: --- kombu/five.py | 9 +++++++ kombu/serialization.py | 9 ++----- kombu/tests/__init__.py | 8 ------ kombu/tests/mocks.py | 5 ++-- kombu/tests/test_messaging.py | 10 +++---- kombu/tests/transport/test_redis.py | 2 +- kombu/transport/SLMQ.py | 3 +-- kombu/transport/SQS.py | 3 +-- kombu/transport/beanstalk.py | 3 +-- kombu/transport/couchdb.py | 3 +-- kombu/transport/django/__init__.py | 4 +-- kombu/transport/filesystem.py | 4 +-- kombu/transport/mongodb.py | 2 +- kombu/transport/redis.py | 2 +- kombu/transport/sqlalchemy/__init__.py | 2 +- kombu/transport/zookeeper.py | 4 +-- kombu/utils/json.py | 49 ++++++++++++++++++++++++++++++++++ 17 files changed, 80 insertions(+), 42 deletions(-) create mode 100644 kombu/utils/json.py (limited to 'kombu') diff --git a/kombu/five.py b/kombu/five.py index 2bd631fb..9c875601 100644 --- a/kombu/five.py +++ b/kombu/five.py @@ -10,6 +10,15 @@ """ from __future__ import absolute_import +try: + buffer_t = buffer +except NameError: # pragma: no cover + # Py3 does not have buffer, only use this for isa checks. + + class buffer_t(object): # noqa + pass + + ############## py3k ######################################################### import sys PY3 = sys.version_info[0] == 3 diff --git a/kombu/serialization.py b/kombu/serialization.py index b7ce2c27..53b8cddf 100644 --- a/kombu/serialization.py +++ b/kombu/serialization.py @@ -308,14 +308,9 @@ def raw_encode(data): def register_json(): """Register a encoder/decoder for JSON serialization.""" - from anyjson import loads as json_loads, dumps as json_dumps + from kombu.utils import json as _json - def _loads(obj): - if isinstance(obj, bytes_t): - obj = obj.decode() - return json_loads(obj) - - registry.register('json', json_dumps, _loads, + registry.register('json', _json.dumps, _json.loads, content_type='application/json', content_encoding='utf-8') diff --git a/kombu/tests/__init__.py b/kombu/tests/__init__.py index fb9f21a2..e5ed4047 100644 --- a/kombu/tests/__init__.py +++ b/kombu/tests/__init__.py @@ -1,19 +1,11 @@ from __future__ import absolute_import -import anyjson import atexit import os import sys from kombu.exceptions import VersionMismatch -# avoid json implementation inconsistencies. -try: - import json # noqa - anyjson.force_implementation('json') -except ImportError: - anyjson.force_implementation('simplejson') - def teardown(): # Workaround for multiprocessing bug where logging diff --git a/kombu/tests/mocks.py b/kombu/tests/mocks.py index 836457eb..5af53b19 100644 --- a/kombu/tests/mocks.py +++ b/kombu/tests/mocks.py @@ -2,9 +2,8 @@ from __future__ import absolute_import from itertools import count -import anyjson - from kombu.transport import base +from kombu.utils import json class Message(base.Message): @@ -104,7 +103,7 @@ class Channel(base.StdChannel): def message_to_python(self, message, *args, **kwargs): self._called('message_to_python') - return Message(self, body=anyjson.dumps(message), + return Message(self, body=json.dumps(message), delivery_tag=next(self.deliveries), throw_decode_error=self.throw_decode_error, content_type='application/json', diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py index 0d68e643..877d36cc 100644 --- a/kombu/tests/test_messaging.py +++ b/kombu/tests/test_messaging.py @@ -1,6 +1,5 @@ from __future__ import absolute_import, unicode_literals -import anyjson import pickle from collections import defaultdict @@ -8,6 +7,7 @@ from collections import defaultdict from kombu import Connection, Consumer, Producer, Exchange, Queue from kombu.exceptions import MessageStateError from kombu.utils import ChannelPromise +from kombu.utils import json from .case import Case, Mock, patch from .mocks import Transport @@ -73,7 +73,7 @@ class test_Producer(Case): channel = self.connection.channel() p = Producer(channel, self.exchange, serializer='json') m, ctype, cencoding = p._prepare(message, headers={}) - self.assertDictEqual(message, anyjson.loads(m)) + self.assertDictEqual(message, json.loads(m)) self.assertEqual(ctype, 'application/json') self.assertEqual(cencoding, 'utf-8') @@ -89,7 +89,7 @@ class test_Producer(Case): self.assertEqual(headers['compression'], 'application/x-gzip') import zlib self.assertEqual( - anyjson.loads(zlib.decompress(m).decode('utf-8')), + json.loads(zlib.decompress(m).decode('utf-8')), message, ) @@ -184,7 +184,7 @@ class test_Producer(Case): self.assertIn('basic_publish', channel) m, exc, rkey = ret - self.assertDictEqual(message, anyjson.loads(m['body'])) + self.assertDictEqual(message, json.loads(m['body'])) self.assertDictContainsSubset({'content_type': 'application/json', 'content_encoding': 'utf-8', 'priority': 0}, m) @@ -580,7 +580,7 @@ class test_Consumer(Case): self.assertTrue(thrown) m, exc = thrown[0] - self.assertEqual(anyjson.loads(m), {'foo': 'bar'}) + self.assertEqual(json.loads(m), {'foo': 'bar'}) self.assertIsInstance(exc, ValueError) def test_recover(self): diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py index fa1782b2..f9dc685a 100644 --- a/kombu/tests/transport/test_redis.py +++ b/kombu/tests/transport/test_redis.py @@ -3,7 +3,6 @@ from __future__ import absolute_import import socket import types -from anyjson import dumps, loads from collections import defaultdict from itertools import count @@ -12,6 +11,7 @@ from kombu.exceptions import InconsistencyError, VersionMismatch from kombu.five import Empty, Queue as _Queue from kombu.transport import virtual from kombu.utils import eventio # patch poll +from kombu.utils.json import dumps, loads from kombu.tests.case import ( Case, Mock, call, module_exists, skip_if_not_module, patch, diff --git a/kombu/transport/SLMQ.py b/kombu/transport/SLMQ.py index 61756a7e..3ea573af 100644 --- a/kombu/transport/SLMQ.py +++ b/kombu/transport/SLMQ.py @@ -10,13 +10,12 @@ from __future__ import absolute_import import socket import string -from anyjson import loads, dumps - import os from kombu.five import Empty, text_t from kombu.utils import cached_property # , uuid from kombu.utils.encoding import bytes_to_str, safe_str +from kombu.utils.json import loads, dumps from . import virtual diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index 790f4270..2ad6b0e0 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -43,8 +43,6 @@ import collections import socket import string -from anyjson import loads, dumps - import boto from boto import exception from boto import sdb as _sdb @@ -58,6 +56,7 @@ from kombu.five import Empty, range, text_t from kombu.log import get_logger from kombu.utils import cached_property, uuid from kombu.utils.encoding import bytes_to_str, safe_str +from kombu.utils.json import loads, dumps from kombu.transport.virtual import scheduling from . import virtual diff --git a/kombu/transport/beanstalk.py b/kombu/transport/beanstalk.py index 544fd438..b7688631 100644 --- a/kombu/transport/beanstalk.py +++ b/kombu/transport/beanstalk.py @@ -13,10 +13,9 @@ from __future__ import absolute_import import beanstalkc import socket -from anyjson import loads, dumps - from kombu.five import Empty from kombu.utils.encoding import bytes_to_str +from kombu.utils.json import loads, dumps from . import virtual diff --git a/kombu/transport/couchdb.py b/kombu/transport/couchdb.py index 009dbbdc..a4d87d99 100644 --- a/kombu/transport/couchdb.py +++ b/kombu/transport/couchdb.py @@ -13,11 +13,10 @@ from __future__ import absolute_import import socket import couchdb -from anyjson import loads, dumps - from kombu.five import Empty from kombu.utils import uuid4 from kombu.utils.encoding import bytes_to_str +from kombu.utils.json import loads, dumps from . import virtual diff --git a/kombu/transport/django/__init__.py b/kombu/transport/django/__init__.py index 67bfa576..4493cf19 100644 --- a/kombu/transport/django/__init__.py +++ b/kombu/transport/django/__init__.py @@ -1,14 +1,14 @@ """Kombu transport using the Django database as a message store.""" from __future__ import absolute_import -from anyjson import loads, dumps - from django.conf import settings from django.core import exceptions as errors from kombu.five import Empty from kombu.transport import virtual from kombu.utils.encoding import bytes_to_str +from kombu.utils.json import loads, dumps + from .models import Queue diff --git a/kombu/transport/filesystem.py b/kombu/transport/filesystem.py index c83dcdc3..92917ab4 100644 --- a/kombu/transport/filesystem.py +++ b/kombu/transport/filesystem.py @@ -7,8 +7,6 @@ Transport using the file system as the message store. """ from __future__ import absolute_import -from anyjson import loads, dumps - import os import shutil import uuid @@ -19,6 +17,8 @@ from kombu.exceptions import ChannelError from kombu.five import Empty, monotonic from kombu.utils import cached_property from kombu.utils.encoding import bytes_to_str, str_to_bytes +from kombu.utils.json import loads, dumps + VERSION = (1, 0, 0) __version__ = '.'.join(map(str, VERSION)) diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index 5f726d09..ec3fa5dd 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -13,12 +13,12 @@ from __future__ import absolute_import import pymongo from pymongo import errors -from anyjson import loads, dumps from pymongo import MongoClient, uri_parser from kombu.five import Empty from kombu.syn import _detect_environment from kombu.utils.encoding import bytes_to_str +from kombu.utils.json import loads, dumps from . import virtual diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 6609292e..30015a60 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -16,7 +16,6 @@ from contextlib import contextmanager from time import time from amqp import promise -from anyjson import loads, dumps from kombu.exceptions import InconsistencyError, VersionMismatch from kombu.five import Empty, values, string_t @@ -24,6 +23,7 @@ from kombu.log import get_logger from kombu.utils import cached_property, uuid from kombu.utils.eventio import poll, READ, ERR from kombu.utils.encoding import bytes_to_str +from kombu.utils.json import loads, dumps from kombu.utils.url import _parse_url NO_ROUTE_ERROR = """ diff --git a/kombu/transport/sqlalchemy/__init__.py b/kombu/transport/sqlalchemy/__init__.py index c085b469..0dd5a9da 100644 --- a/kombu/transport/sqlalchemy/__init__.py +++ b/kombu/transport/sqlalchemy/__init__.py @@ -4,7 +4,6 @@ from __future__ import absolute_import -from anyjson import loads, dumps from sqlalchemy import create_engine from sqlalchemy.exc import OperationalError from sqlalchemy.orm import sessionmaker @@ -13,6 +12,7 @@ from kombu.five import Empty from kombu.transport import virtual from kombu.utils import cached_property from kombu.utils.encoding import bytes_to_str +from kombu.utils.json import loads, dumps from .models import (ModelBase, Queue as QueueBase, Message as MessageBase, class_registry, metadata) diff --git a/kombu/transport/zookeeper.py b/kombu/transport/zookeeper.py index 6645507a..4764c0e3 100644 --- a/kombu/transport/zookeeper.py +++ b/kombu/transport/zookeeper.py @@ -30,14 +30,12 @@ from __future__ import absolute_import import os import socket -from anyjson import loads, dumps - from kombu.five import Empty from kombu.utils.encoding import bytes_to_str +from kombu.utils.json import loads, dumps from . import virtual - try: import kazoo from kazoo.client import KazooClient diff --git a/kombu/utils/json.py b/kombu/utils/json.py new file mode 100644 index 00000000..9dd03429 --- /dev/null +++ b/kombu/utils/json.py @@ -0,0 +1,49 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +import sys + +from kombu.five import buffer_t, text_t, bytes_t + +try: + import simplejson as json +except ImportError: # pragma: no cover + import json # noqa + +IS_PY3 = sys.version_info[0] == 3 + + +class JSONEncoder(json.JSONEncoder): + + def default(self, obj, _super=json.JSONEncoder.default): + try: + _super(self, obj) + except TypeError: + try: + reducer = obj.__json__ + except AttributeError: + raise + else: + return reducer() + + +def dumps(s, _dumps=json.dumps, cls=JSONEncoder): + return _dumps(s, cls=cls) + + +def loads(s, _loads=json.loads, decode_bytes=IS_PY3): + # None of the json implementations supports decoding from + # a buffer/memoryview, or even reading from a stream + # (load is just loads(fp.read())) + # but this is Python, we love copying strings, preferably many times + # over. Note that pickle does support buffer/memoryview + # + if isinstance(s, memoryview): + s = s.tobytes().decode('utf-8') + elif isinstance(s, bytearray): + s = s.decode('utf-8') + elif decode_bytes and isinstance(s, bytes_t): + s = s.decode('utf-8') + elif isinstance(s, buffer_t): + s = text_t(s) # ... awwwwwww :( + return _loads(s) -- cgit v1.2.1 From 11c9c7b7b427c46544637c77801e4b8132a22a13 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Tue, 20 May 2014 21:55:29 +0100 Subject: flakes --- kombu/tests/transport/virtual/test_base.py | 41 ++++++++++++++++++++---------- kombu/transport/mongodb.py | 9 ++++--- kombu/transport/virtual/__init__.py | 13 +++++++--- kombu/transport/zookeeper.py | 6 +++-- kombu/utils/url.py | 1 - 5 files changed, 46 insertions(+), 24 deletions(-) (limited to 'kombu') diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py index 0627aed5..cbb1e58b 100644 --- a/kombu/tests/transport/virtual/test_base.py +++ b/kombu/tests/transport/virtual/test_base.py @@ -516,20 +516,35 @@ class test_Channel(Case): self.channel.queue_declare(queue='21wisdjwqe', passive=True) def test_get_message_priority(self): + def _message(priority): - return self.channel.prepare_message('the message with priority', - priority=priority) - - self.assertEqual(self.channel._get_message_priority(_message(5)), - 5) - self.assertEqual(self.channel._get_message_priority(_message(self.channel.min_priority - 10)), - self.channel.min_priority) - self.assertEqual(self.channel._get_message_priority(_message(self.channel.max_priority + 10)), - self.channel.max_priority) - self.assertEqual(self.channel._get_message_priority(_message('foobar')), - self.channel.default_priority) - self.assertEqual(self.channel._get_message_priority(_message(2), reverse=True), - self.channel.max_priority - 2) + return self.channel.prepare_message( + 'the message with priority', priority=priority, + ) + + self.assertEqual( + self.channel._get_message_priority(_message(5)), 5, + ) + self.assertEqual( + self.channel._get_message_priority( + _message(self.channel.min_priority - 10), + ), + self.channel.min_priority, + ) + self.assertEqual( + self.channel._get_message_priority( + _message(self.channel.max_priority + 10), + ), + self.channel.max_priority, + ) + self.assertEqual( + self.channel._get_message_priority(_message('foobar')), + self.channel.default_priority, + ) + self.assertEqual( + self.channel._get_message_priority(_message(2), reverse=True), + self.channel.max_priority - 2, + ) class test_Transport(Case): diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index ec3fa5dd..5b26118a 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -116,10 +116,11 @@ class Channel(virtual.Channel): return self.get_messages().find({'queue': queue}).count() def _put(self, queue, message, **kwargs): - self.get_messages().insert({'payload': dumps(message), - 'queue': queue, - 'priority': self._get_message_priority(message, - reverse=True)}) + self.get_messages().insert({ + 'payload': dumps(message), + 'queue': queue, + 'priority': self._get_message_priority(message, reverse=True), + }) def _purge(self, queue): size = self._size(queue) diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index 21bbbe23..34094d3e 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -733,13 +733,18 @@ class Channel(AbstractChannel, base.StdChannel): return self._cycle def _get_message_priority(self, message, reverse=False): - """Gets priority from message and converts it to the bounds: [0, 9]. + """Get priority from message and limit the value within a + boundary of 0 to 9. + Higher value has more priority. + """ try: - priority = max(min(int(message['properties']['delivery_info']['priority']), - self.max_priority), - self.min_priority) + priority = max( + min(int(message['properties']['delivery_info']['priority']), + self.max_priority), + self.min_priority, + ) except (TypeError, ValueError, KeyError): priority = self.default_priority diff --git a/kombu/transport/zookeeper.py b/kombu/transport/zookeeper.py index 4764c0e3..ab6d72f8 100644 --- a/kombu/transport/zookeeper.py +++ b/kombu/transport/zookeeper.py @@ -100,8 +100,10 @@ class Channel(virtual.Channel): return queue def _put(self, queue, message, **kwargs): - queue = self._get_queue(queue) - queue.put(dumps(message), priority=self._get_message_priority(message, reverse=True)) + return self._get_queue(queue).put( + dumps(message), + priority=self._get_message_priority(message, reverse=True), + ) def _get(self, queue): queue = self._get_queue(queue) diff --git a/kombu/utils/url.py b/kombu/utils/url.py index 5e6ec52d..3d344ebb 100644 --- a/kombu/utils/url.py +++ b/kombu/utils/url.py @@ -61,4 +61,3 @@ def maybe_sanitize_url(url, mask='**'): if isinstance(url, string_t) and '://' in url: return sanitize_url(url, mask) return url - -- cgit v1.2.1 From 095ba022631559d8a01b21f259aa8babbabd3d7a Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Tue, 20 May 2014 22:51:09 +0100 Subject: kombu.five now extends amqp.five --- kombu/five.py | 199 +--------------------------------------------------------- 1 file changed, 3 insertions(+), 196 deletions(-) (limited to 'kombu') diff --git a/kombu/five.py b/kombu/five.py index 9c875601..9022b899 100644 --- a/kombu/five.py +++ b/kombu/five.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- """ - celery.five + kombu.five ~~~~~~~~~~~ Compatibility implementations of features @@ -10,198 +10,5 @@ """ from __future__ import absolute_import -try: - buffer_t = buffer -except NameError: # pragma: no cover - # Py3 does not have buffer, only use this for isa checks. - - class buffer_t(object): # noqa - pass - - -############## py3k ######################################################### -import sys -PY3 = sys.version_info[0] == 3 - -try: - reload = reload # noqa -except NameError: # pragma: no cover - from imp import reload # noqa - -try: - from collections import UserList # noqa -except ImportError: # pragma: no cover - from UserList import UserList # noqa - -try: - from collections import UserDict # noqa -except ImportError: # pragma: no cover - from UserDict import UserDict # noqa - -try: - bytes_t = bytes -except NameError: # pragma: no cover - bytes_t = str # noqa - -############## time.monotonic ################################################ - -if sys.version_info < (3, 3): - - import platform - SYSTEM = platform.system() - - if SYSTEM == 'Darwin': - import ctypes - from ctypes.util import find_library - libSystem = ctypes.CDLL('libSystem.dylib') - CoreServices = ctypes.CDLL(find_library('CoreServices'), - use_errno=True) - mach_absolute_time = libSystem.mach_absolute_time - mach_absolute_time.restype = ctypes.c_uint64 - absolute_to_nanoseconds = CoreServices.AbsoluteToNanoseconds - absolute_to_nanoseconds.restype = ctypes.c_uint64 - absolute_to_nanoseconds.argtypes = [ctypes.c_uint64] - - def _monotonic(): - return absolute_to_nanoseconds(mach_absolute_time()) * 1e-9 - - elif SYSTEM == 'Linux': - # from stackoverflow: - # questions/1205722/how-do-i-get-monotonic-time-durations-in-python - import ctypes - import os - - CLOCK_MONOTONIC = 1 # see - - class timespec(ctypes.Structure): - _fields_ = [ - ('tv_sec', ctypes.c_long), - ('tv_nsec', ctypes.c_long), - ] - - librt = ctypes.CDLL('librt.so.1', use_errno=True) - clock_gettime = librt.clock_gettime - clock_gettime.argtypes = [ - ctypes.c_int, ctypes.POINTER(timespec), - ] - - def _monotonic(): # noqa - t = timespec() - if clock_gettime(CLOCK_MONOTONIC, ctypes.pointer(t)) != 0: - errno_ = ctypes.get_errno() - raise OSError(errno_, os.strerror(errno_)) - return t.tv_sec + t.tv_nsec * 1e-9 - else: - from time import time as _monotonic -try: - from time import monotonic -except ImportError: - monotonic = _monotonic # noqa - -############## Py3 <-> Py2 ################################################### - -if PY3: # pragma: no cover - import builtins - - from queue import Queue, Empty, Full, LifoQueue - from itertools import zip_longest - from io import StringIO - - map = map - zip = zip - string = str - string_t = str - long_t = int - text_t = str - range = range - module_name_t = str - - open_fqdn = 'builtins.open' - - def items(d): - return d.items() - - def keys(d): - return d.keys() - - def values(d): - return d.values() - - def nextfun(it): - return it.__next__ - - exec_ = getattr(builtins, 'exec') - - def reraise(tp, value, tb=None): - if value.__traceback__ is not tb: - raise value.with_traceback(tb) - raise value - - class WhateverIO(StringIO): - - def write(self, data): - if isinstance(data, bytes): - data = data.encode() - StringIO.write(self, data) - -else: - import __builtin__ as builtins # noqa - from Queue import Queue, Empty, Full, LifoQueue # noqa - from itertools import ( # noqa - imap as map, - izip as zip, - izip_longest as zip_longest, - ) - from io import StringIO as WhateverIO # noqa - - string = unicode # noqa - string_t = basestring # noqa - text_t = unicode - long_t = long # noqa - range = xrange - module_name_t = str - - open_fqdn = '__builtin__.open' - - def items(d): # noqa - return d.iteritems() - - def keys(d): # noqa - return d.iterkeys() - - def values(d): # noqa - return d.itervalues() - - def nextfun(it): # noqa - return it.next - - def exec_(code, globs=None, locs=None): # pragma: no cover - """Execute code in a namespace.""" - if globs is None: - frame = sys._getframe(1) - globs = frame.f_globals - if locs is None: - locs = frame.f_locals - del frame - elif locs is None: - locs = globs - exec("""exec code in globs, locs""") - - exec_("""def reraise(tp, value, tb=None): raise tp, value, tb""") - - -def with_metaclass(Type, skip_attrs={'__dict__', '__weakref__'}): - """Class decorator to set metaclass. - - Works with both Python 3 and Python 3 and it does not add - an extra class in the lookup order like ``six.with_metaclass`` does - (that is -- it copies the original class instead of using inheritance). - - """ - - def _clone_with_metaclass(Class): - attrs = {key: value for key, value in items(vars(Class)) - if key not in skip_attrs} - return Type(Class.__name__, Class.__bases__, attrs) - - return _clone_with_metaclass +from amqp.five import * # noqa +from amqp.five import __all__ -- cgit v1.2.1 From f80e0c349fbbc4245b9dc007b09957dc730a0718 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Wed, 21 May 2014 12:57:24 +0100 Subject: Removes kombu.utils.compat.get_errno --- kombu/async/hub.py | 5 ++--- kombu/transport/base.py | 6 ++---- kombu/utils/compat.py | 26 -------------------------- kombu/utils/eventio.py | 15 +++++++-------- 4 files changed, 11 insertions(+), 41 deletions(-) delete mode 100644 kombu/utils/compat.py (limited to 'kombu') diff --git a/kombu/async/hub.py b/kombu/async/hub.py index 306fdd31..7b05b409 100644 --- a/kombu/async/hub.py +++ b/kombu/async/hub.py @@ -20,7 +20,6 @@ from amqp import promise from kombu.five import Empty, range from kombu.log import get_logger from kombu.utils import cached_property, fileno -from kombu.utils.compat import get_errno from kombu.utils.eventio import READ, WRITE, ERR, poll from .timer import Timer @@ -139,7 +138,7 @@ class Hub(object): except (MemoryError, AssertionError): raise except OSError as exc: - if get_errno(exc) == errno.ENOMEM: + if exc.errno == errno.ENOMEM: raise logger.error('Error in timer: %r', exc, exc_info=1) except Exception as exc: @@ -313,7 +312,7 @@ class Hub(object): try: next(cb) except OSError as exc: - if get_errno(exc) != errno.EBADF: + if exc.errno != errno.EBADF: raise hub_remove(fileno) except StopIteration: diff --git a/kombu/transport/base.py b/kombu/transport/base.py index c226307e..d054b1f1 100644 --- a/kombu/transport/base.py +++ b/kombu/transport/base.py @@ -13,7 +13,6 @@ import socket from kombu.exceptions import ChannelError, ConnectionError from kombu.message import Message from kombu.utils import cached_property -from kombu.utils.compat import get_errno __all__ = ['Message', 'StdChannel', 'Management', 'Transport'] @@ -133,8 +132,7 @@ class Transport(object): return True def _make_reader(self, connection, timeout=socket.timeout, - error=socket.error, get_errno=get_errno, - _unavail=(errno.EAGAIN, errno.EINTR)): + error=socket.error, _unavail=(errno.EAGAIN, errno.EINTR)): drain_events = connection.drain_events def _read(loop): @@ -145,7 +143,7 @@ class Transport(object): except timeout: return except error as exc: - if get_errno(exc) in _unavail: + if exc.errno in _unavail: return raise loop.call_soon(_read, loop) diff --git a/kombu/utils/compat.py b/kombu/utils/compat.py deleted file mode 100644 index a306c672..00000000 --- a/kombu/utils/compat.py +++ /dev/null @@ -1,26 +0,0 @@ -""" -kombu.utils.compat -================== - -Helps compatibility with older Python versions. - -""" -from __future__ import absolute_import - - -############## socket.error.errno ############################################ - - -def get_errno(exc): - """:exc:`socket.error` and :exc:`IOError` first got - the ``.errno`` attribute in Py2.7""" - try: - return exc.errno - except AttributeError: - try: - # e.args = (errno, reason) - if isinstance(exc.args, tuple) and len(exc.args) == 2: - return exc.args[0] - except AttributeError: - pass - return 0 diff --git a/kombu/utils/eventio.py b/kombu/utils/eventio.py index fdb24b6e..6ed352d3 100644 --- a/kombu/utils/eventio.py +++ b/kombu/utils/eventio.py @@ -44,7 +44,6 @@ KQ_NOTE_REVOKE = getattr(__select__, 'kQ_NOTE_REVOKE', 64) from kombu.syn import detect_environment from . import fileno -from .compat import get_errno __all__ = ['poll'] @@ -64,7 +63,7 @@ class Poller(object): try: return self._poll(timeout) except Exception as exc: - if get_errno(exc) != errno.EINTR: + if exc.errno != errno.EINTR: raise @@ -77,7 +76,7 @@ class _epoll(Poller): try: self._epoll.register(fd, events) except Exception as exc: - if get_errno(exc) != errno.EEXIST: + if exc.errno != errno.EEXIST: raise def unregister(self, fd): @@ -86,7 +85,7 @@ class _epoll(Poller): except (socket.error, ValueError, KeyError, TypeError): pass except (IOError, OSError) as exc: - if get_errno(exc) != errno.ENOENT: + if exc.errno != errno.ENOENT: raise def _poll(self, timeout): @@ -198,7 +197,7 @@ class _select(Poller): try: _selectf([fd], [], [], 0) except (_selecterr, socket.error) as exc: - if get_errno(exc) in SELECT_BAD_FD: + if exc.errno in SELECT_BAD_FD: self.unregister(fd) def unregister(self, fd): @@ -207,7 +206,7 @@ class _select(Poller): except socket.error as exc: # we don't know the previous fd of this object # but it will be removed by the next poll iteration. - if get_errno(exc) in SELECT_BAD_FD: + if exc.errno in SELECT_BAD_FD: return raise self._rfd.discard(fd) @@ -220,9 +219,9 @@ class _select(Poller): self._rfd, self._wfd, self._efd, timeout, ) except (_selecterr, socket.error) as exc: - if get_errno(exc) == errno.EINTR: + if exc.errno == errno.EINTR: return - elif get_errno(exc) in SELECT_BAD_FD: + elif exc.errno in SELECT_BAD_FD: return self._remove_bad() raise -- cgit v1.2.1 From 49de25a17589d9a34defcbc1fefc45ee57c3a4ee Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Wed, 21 May 2014 13:25:18 +0100 Subject: Tests passing on Py3 --- kombu/tests/transport/virtual/test_base.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'kombu') diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py index cbb1e58b..420862b3 100644 --- a/kombu/tests/transport/virtual/test_base.py +++ b/kombu/tests/transport/virtual/test_base.py @@ -1,5 +1,6 @@ from __future__ import absolute_import +import sys import warnings from kombu import Connection @@ -10,6 +11,9 @@ from kombu.compression import compress from kombu.tests.case import Case, Mock, patch, redirect_stdouts +PY3 = sys.version_info[0] == 3 +PRINT_FQDN = 'builtins.print' if PY3 else '__builtin__.print' + def client(**kwargs): return Connection(transport='kombu.transport.virtual:Transport', **kwargs) @@ -393,7 +397,7 @@ class test_Channel(Case): self.assertFalse(q._delivered) @patch('kombu.transport.virtual.emergency_dump_state') - @patch('__builtin__.print') + @patch(PRINT_FQDN) def test_restore_unacked_once_when_unrestored(self, print_, emergency_dump_state): q = self.channel.qos -- cgit v1.2.1 From e9e48ad72992c8a48c280165b71a84a7965f4219 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Wed, 21 May 2014 15:12:54 +0100 Subject: sanitize_url now works if host is None --- kombu/utils/url.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'kombu') diff --git a/kombu/utils/url.py b/kombu/utils/url.py index 3d344ebb..d7197215 100644 --- a/kombu/utils/url.py +++ b/kombu/utils/url.py @@ -46,7 +46,7 @@ def as_url(scheme, host=None, port=None, user=None, password=None, else: parts.extend([':', safequote(password)]) parts.append('@') - parts.append(safequote(host)) + parts.append(safequote(host) if host else '') if port: parts.extend([':', port]) parts.extend(['/', path]) -- cgit v1.2.1 From 224f16160b10c7308bb42c0800cd7623d54bde15 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Wed, 21 May 2014 16:49:21 +0100 Subject: Fixes typo in comment --- kombu/transport/virtual/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'kombu') diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index 34094d3e..b5011d98 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -529,7 +529,7 @@ class Channel(AbstractChannel, base.StdChannel): return self.typeof(exchange).deliver( message, exchange, routing_key, **kwargs ) - # anon exchange: routing_key is the destintaion queue + # anon exchange: routing_key is the destination queue return self._put(routing_key, message, **kwargs) def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs): -- cgit v1.2.1 From d7fb705294375baded1b23407d29d28f44f70546 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Fri, 23 May 2014 14:25:04 +0100 Subject: eventloop() should not swallow socket.errors --- kombu/common.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'kombu') diff --git a/kombu/common.py b/kombu/common.py index cac03314..1a6e587b 100644 --- a/kombu/common.py +++ b/kombu/common.py @@ -181,8 +181,6 @@ def eventloop(conn, limit=None, timeout=None, ignore_timeouts=False): except socket.timeout: if timeout and not ignore_timeouts: # pragma: no cover raise - except socket.error: # pragma: no cover - pass def send_reply(exchange, req, msg, -- cgit v1.2.1 From 2d43889d6ef727d67f6aed0231103132de5f228e Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Fri, 23 May 2014 14:25:04 +0100 Subject: eventloop() should not swallow socket.errors --- kombu/common.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'kombu') diff --git a/kombu/common.py b/kombu/common.py index cac03314..1a6e587b 100644 --- a/kombu/common.py +++ b/kombu/common.py @@ -181,8 +181,6 @@ def eventloop(conn, limit=None, timeout=None, ignore_timeouts=False): except socket.timeout: if timeout and not ignore_timeouts: # pragma: no cover raise - except socket.error: # pragma: no cover - pass def send_reply(exchange, req, msg, -- cgit v1.2.1 From 50e2d6c1a48c0f3d2669a877ad7e541938c6cf28 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Tue, 27 May 2014 14:33:11 +0100 Subject: Adds Makefile --- kombu/tests/case.py | 4 ---- 1 file changed, 4 deletions(-) (limited to 'kombu') diff --git a/kombu/tests/case.py b/kombu/tests/case.py index c265cb91..09102c6e 100644 --- a/kombu/tests/case.py +++ b/kombu/tests/case.py @@ -186,7 +186,3 @@ def skip_if_not_module(module, import_errors=(ImportError, )): return fun(*args, **kwargs) return _skip_if_not_module return _wrap_test - - -def skip_if_quick(fun): - return skip_if_environ('QUICKTEST')(fun) -- cgit v1.2.1 From 86274e967d900077f5e509d436d92f5a5db34b63 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Tue, 27 May 2014 14:41:20 +0100 Subject: Fixes flakes after flake8 upgrade --- kombu/async/hub.py | 30 +++++++++++++++--------------- kombu/five.py | 4 ++-- kombu/pidbox.py | 18 ++++++++++-------- kombu/syn.py | 4 ++-- kombu/tests/test_mixins.py | 2 -- kombu/tests/test_syn.py | 9 ++++++--- kombu/tests/transport/virtual/test_base.py | 4 ++-- kombu/transport/django/__init__.py | 1 - kombu/transport/mongodb.py | 3 ++- 9 files changed, 39 insertions(+), 36 deletions(-) (limited to 'kombu') diff --git a/kombu/async/hub.py b/kombu/async/hub.py index 7b05b409..e46f7908 100644 --- a/kombu/async/hub.py +++ b/kombu/async/hub.py @@ -271,38 +271,38 @@ class Hub(object): item() poll_timeout = fire_timers(propagate=propagate) if scheduled else 1 - #print('[[[HUB]]]: %s' % (self.repr_active(), )) + # print('[[[HUB]]]: %s' % (self.repr_active(), )) if readers or writers: to_consolidate = [] try: events = poll(poll_timeout) - #print('[EVENTS]: %s' % (self.nepr_events(events or []), )) + # print('[EVENTS]: %s' % (self.repr_events(events), )) except ValueError: # Issue 882 raise StopIteration() - for fileno, event in events or (): - if fileno in consolidate and \ - writers.get(fileno) is None: - to_consolidate.append(fileno) + for fd, event in events or (): + if fd in consolidate and \ + writers.get(fd) is None: + to_consolidate.append(fd) continue cb = cbargs = None if event & READ: try: - cb, cbargs = readers[fileno] + cb, cbargs = readers[fd] except KeyError: - self.remove_reader(fileno) + self.remove_reader(fd) continue elif event & WRITE: try: - cb, cbargs = writers[fileno] + cb, cbargs = writers[fd] except KeyError: - self.remove_writer(fileno) + self.remove_writer(fd) continue elif event & ERR: try: - cb, cbargs = (readers.get(fileno) or - writers.get(fileno)) + cb, cbargs = (readers.get(fd) or + writers.get(fd)) except TypeError: pass @@ -314,11 +314,11 @@ class Hub(object): except OSError as exc: if exc.errno != errno.EBADF: raise - hub_remove(fileno) + hub_remove(fd) except StopIteration: pass except Exception: - hub_remove(fileno) + hub_remove(fd) raise else: try: @@ -338,7 +338,7 @@ class Hub(object): def repr_events(self, events): from .debug import repr_events - return repr_events(self, events) + return repr_events(self, events or []) @cached_property def scheduler(self): diff --git a/kombu/five.py b/kombu/five.py index 9022b899..d83015f3 100644 --- a/kombu/five.py +++ b/kombu/five.py @@ -10,5 +10,5 @@ """ from __future__ import absolute_import -from amqp.five import * # noqa -from amqp.five import __all__ +from amqp.five import * # noqa +from amqp.five import __all__ # noqa diff --git a/kombu/pidbox.py b/kombu/pidbox.py index f6dca574..4ffb32e1 100644 --- a/kombu/pidbox.py +++ b/kombu/pidbox.py @@ -210,14 +210,16 @@ class Mailbox(object): def get_reply_queue(self): oid = self.oid - return Queue('%s.%s' % (oid, self.reply_exchange.name), - exchange=self.reply_exchange, - routing_key=oid, - durable=False, - auto_delete=True, - queue_arguments={ - 'x-expires': int(REPLY_QUEUE_EXPIRES * 1000), - }) + return Queue( + '%s.%s' % (oid, self.reply_exchange.name), + exchange=self.reply_exchange, + routing_key=oid, + durable=False, + auto_delete=True, + queue_arguments={ + 'x-expires': int(REPLY_QUEUE_EXPIRES * 1000), + }, + ) @cached_property def reply_queue(self): diff --git a/kombu/syn.py b/kombu/syn.py index 7f6e8099..01b4d479 100644 --- a/kombu/syn.py +++ b/kombu/syn.py @@ -21,7 +21,7 @@ def select_blocking_method(type): def _detect_environment(): - ## -eventlet- + # ## -eventlet- if 'eventlet' in sys.modules: try: from eventlet.patcher import is_monkey_patched as is_eventlet @@ -32,7 +32,7 @@ def _detect_environment(): except ImportError: pass - # -gevent- + # ## -gevent- if 'gevent' in sys.modules: try: from gevent import socket as _gsocket diff --git a/kombu/tests/test_mixins.py b/kombu/tests/test_mixins.py index b80f0131..53c423df 100644 --- a/kombu/tests/test_mixins.py +++ b/kombu/tests/test_mixins.py @@ -90,7 +90,6 @@ class test_ConsumerMixin(Case): def test_Consumer_context(self): c, Acons, Bcons = self._context() - _connref = _chanref = None with c.Consumer() as (conn, channel, consumer): self.assertIs(conn, c.connection) @@ -104,7 +103,6 @@ class test_ConsumerMixin(Case): self.assertIs(subcons.channel, conn.default_channel) Acons.__enter__.assert_called_with() Bcons.__enter__.assert_called_with() - _connref, _chanref = conn, channel c.on_consume_end.assert_called_with(conn, channel) diff --git a/kombu/tests/test_syn.py b/kombu/tests/test_syn.py index 551e5544..34e58035 100644 --- a/kombu/tests/test_syn.py +++ b/kombu/tests/test_syn.py @@ -38,9 +38,12 @@ class test_syn(Case): def test_detect_environment_gevent(self): with patch('gevent.socket', create=True) as m: prev, socket.socket = socket.socket, m.socket - self.assertTrue(sys.modules['gevent']) - env = syn._detect_environment() - self.assertEqual(env, 'gevent') + try: + self.assertTrue(sys.modules['gevent']) + env = syn._detect_environment() + self.assertEqual(env, 'gevent') + finally: + socket.socket = prev def test_detect_environment_no_eventlet_or_gevent(self): try: diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py index 420862b3..a2186228 100644 --- a/kombu/tests/transport/virtual/test_base.py +++ b/kombu/tests/transport/virtual/test_base.py @@ -271,8 +271,8 @@ class test_Channel(Case): c.exchange_declare(n) c.queue_declare(n) c.queue_bind(n, n, n) - c.queue_bind(n, n, n) # tests code path that returns - # if queue already bound. + # tests code path that returns if queue already bound. + c.queue_bind(n, n, n) c.queue_delete(n, if_empty=True) self.assertIn(n, c.state.bindings) diff --git a/kombu/transport/django/__init__.py b/kombu/transport/django/__init__.py index 4493cf19..9dde51a2 100644 --- a/kombu/transport/django/__init__.py +++ b/kombu/transport/django/__init__.py @@ -35,7 +35,6 @@ class Channel(virtual.Channel): super(Channel, self).basic_consume(queue, *args, **kwargs) def _get(self, queue): - #self.refresh_connection() m = Queue.objects.fetch(queue) if m: return loads(bytes_to_str(m)) diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index 5b26118a..b361ed3c 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -211,9 +211,10 @@ class Channel(virtual.Channel): self.get_broadcast().ensure_index([('queue', 1)]) self.get_routing().ensure_index([('queue', 1), ('exchange', 1)]) - #TODO Store a more complete exchange metatable in the routing collection def get_table(self, exchange): """Get table of bindings for ``exchange``.""" + # TODO Store a more complete exchange metatable in the + # routing collection localRoutes = frozenset(self.state.exchanges[exchange]['table']) brokerRoutes = self.get_messages().routing.find( {'exchange': exchange} -- cgit v1.2.1 From 3d63c6762d27a094f9cbf7252102e75fc088409d Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Thu, 29 May 2014 12:36:35 +0100 Subject: cosmetics --- kombu/utils/json.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) (limited to 'kombu') diff --git a/kombu/utils/json.py b/kombu/utils/json.py index 9dd03429..a5227467 100644 --- a/kombu/utils/json.py +++ b/kombu/utils/json.py @@ -17,14 +17,11 @@ class JSONEncoder(json.JSONEncoder): def default(self, obj, _super=json.JSONEncoder.default): try: - _super(self, obj) - except TypeError: - try: - reducer = obj.__json__ - except AttributeError: - raise - else: - return reducer() + reducer = obj.__json__ + except AttributeError: + return _super(self, obj) + else: + return reducer() def dumps(s, _dumps=json.dumps, cls=JSONEncoder): -- cgit v1.2.1 From 3da895e754d430b619ceddbcb7ad5668fe1a617a Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Fri, 30 May 2014 13:28:37 +0100 Subject: Redis: Use super._restore when ack_emulation disabled. Closes celery/celery#1776 --- kombu/transport/redis.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'kombu') diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 30015a60..f9afb01f 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -475,6 +475,8 @@ class Channel(virtual.Channel): crit('Could not restore message: %r', payload, exc_info=True) def _restore(self, message, leftmost=False): + if not self.ack_emulation: + return super(Channel, self)._restore(message) tag = message.delivery_tag with self.conn_or_acquire() as client: P, _ = client.pipeline() \ -- cgit v1.2.1 From 778972fbde7e4c0de2dcd6d557eb5fa5e27ea3ca Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Fri, 30 May 2014 13:28:37 +0100 Subject: Redis: Use super._restore when ack_emulation disabled. Closes celery/celery#1776 --- kombu/transport/redis.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'kombu') diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index c3f6decd..8d13f36c 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -475,6 +475,8 @@ class Channel(virtual.Channel): crit('Could not restore message: %r', payload, exc_info=True) def _restore(self, message, leftmost=False): + if not self.ack_emulation: + return super(Channel, self)._restore(message) tag = message.delivery_tag with self.conn_or_acquire() as client: P, _ = client.pipeline() \ -- cgit v1.2.1 From 3dad9e7aac50293dc2628d9b6e93e9eec9e7ff0e Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Tue, 20 May 2014 18:19:14 +0100 Subject: parse_url function should not know about mongodb, and the transport already sets can_parse_url --- kombu/tests/test_connection.py | 4 ---- kombu/utils/url.py | 10 +--------- 2 files changed, 1 insertion(+), 13 deletions(-) (limited to 'kombu') diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py index 58790db0..147faaf0 100644 --- a/kombu/tests/test_connection.py +++ b/kombu/tests/test_connection.py @@ -31,10 +31,6 @@ class test_connection_utils(Case): result = parse_url(self.url) self.assertDictEqual(result, self.expected) - def test_parse_url_mongodb(self): - result = parse_url('mongodb://example.com/') - self.assertEqual(result['hostname'], 'example.com/') - def test_parse_generated_as_uri(self): conn = Connection(self.url) info = conn.info() diff --git a/kombu/utils/url.py b/kombu/utils/url.py index 8839fca2..3eebe345 100644 --- a/kombu/utils/url.py +++ b/kombu/utils/url.py @@ -14,17 +14,9 @@ def _parse_url(url): schemeless = url[len(scheme) + 3:] # parse with HTTP URL semantics parts = urlparse('http://' + schemeless) - - # The first pymongo.Connection() argument (host) can be - # a mongodb connection URI. If this is the case, don't - # use port but let pymongo get the port(s) from the URI instead. - # This enables the use of replica sets and sharding. - # See pymongo.Connection() for more info. - port = scheme != 'mongodb' and parts.port or None - hostname = schemeless if scheme == 'mongodb' else parts.hostname path = parts.path or '' path = path[1:] if path and path[0] == '/' else path - return (scheme, unquote(hostname or '') or None, port, + return (scheme, unquote(parts.hostname or '') or None, parts.port, unquote(parts.username or '') or None, unquote(parts.password or '') or None, unquote(path or '') or None, -- cgit v1.2.1 From 85c0a15c687b5b5ccd44681dae356a718041b757 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Tue, 20 May 2014 18:41:44 +0100 Subject: Adds sanitize_url and as_url to kombu.utils.url Conflicts: kombu/utils/url.py --- kombu/connection.py | 40 +++++++++++----------------------------- kombu/utils/url.py | 31 +++++++++++++++++++++++++++++-- 2 files changed, 40 insertions(+), 31 deletions(-) (limited to 'kombu') diff --git a/kombu/connection.py b/kombu/connection.py index 85b8f5e9..c49c5b4b 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -11,13 +11,8 @@ import os import socket from contextlib import contextmanager -from functools import partial from itertools import count, cycle from operator import itemgetter -try: - from urllib.parse import quote -except ImportError: # Py2 - from urllib import quote # noqa # jython breaks on relative import for .exceptions for some reason # (Issue #112) @@ -28,7 +23,7 @@ from .transport import get_transport_cls, supports_librabbitmq from .utils import cached_property, retry_over_time, shufflecycle, HashedSeq from .utils.compat import OrderedDict from .utils.functional import lazy -from .utils.url import parse_url, urlparse +from .utils.url import as_url, parse_url, quote, urlparse __all__ = ['Connection', 'ConnectionPool', 'ChannelPool'] @@ -569,36 +564,23 @@ class Connection(object): self.password, self.virtual_host, self.port, repr(self.transport_options)) - def as_uri(self, include_password=False, mask=''): + def as_uri(self, include_password=False, mask='**', + getfields=itemgetter('port', 'userid', 'password', + 'virtual_host', 'transport')): """Convert connection parameters to URL form.""" hostname = self.hostname or 'localhost' if self.transport.can_parse_url: if self.uri_prefix: return '%s+%s' % (self.uri_prefix, hostname) return self.hostname - quoteS = partial(quote, safe='') # strict quote fields = self.info() - port, userid, password, transport = itemgetter( - 'port', 'userid', 'password', 'transport' - )(fields) - url = '%s://' % transport - if userid or password: - if userid: - url += quoteS(userid) - if password: - if include_password: - url += ':' + quoteS(password) - else: - url += ':' + mask if mask else '' - url += '@' - url += quoteS(fields['hostname']) - if port: - url += ':%s' % (port, ) - - url += '/' + quote(fields['virtual_host']) - if self.uri_prefix: - return '%s+%s' % (self.uri_prefix, url) - return url + port, userid, password, vhost, transport = getfields(fields) + scheme = ('{0}+{1}'.format(self.uri_prefix, transport) + if self.uri_prefix else transport) + return as_url( + scheme, hostname, port, userid, password, quote(vhost), + sanitize=not include_password, mask=mask, + ) def Pool(self, limit=None, preload=None): """Pool of connections. diff --git a/kombu/utils/url.py b/kombu/utils/url.py index 3eebe345..adc03004 100644 --- a/kombu/utils/url.py +++ b/kombu/utils/url.py @@ -1,13 +1,17 @@ from __future__ import absolute_import +from functools import partial + try: - from urllib.parse import unquote, urlparse, parse_qsl + from urllib.parse import parse_qsl, quote, unquote, urlparse except ImportError: - from urllib import unquote # noqa + from urllib import quote, unquote # noqa from urlparse import urlparse, parse_qsl # noqa from . import kwdict +safequote = partial(quote, safe='') + def _parse_url(url): scheme = urlparse(url).scheme @@ -28,3 +32,26 @@ def parse_url(url): return dict(transport=scheme, hostname=host, port=port, userid=user, password=password, virtual_host=path, **query) + + +def as_url(scheme, host=None, port=None, user=None, password=None, + path=None, query=None, sanitize=False, mask='**'): + parts = ['{0}://'.format(scheme)] + if user or password: + if user: + parts.append(safequote(user)) + if password: + if sanitize: + parts.extend([':', mask] if mask else [':']) + else: + parts.extend([':', safequote(password)]) + parts.append('@') + parts.append(safequote(host)) + if port: + parts.extend([':', port]) + parts.extend(['/', path]) + return ''.join(str(part) for part in parts if part) + + +def sanitize_url(url, mask='**'): + return as_url(*_parse_url(url), sanitize=True, mask=mask) -- cgit v1.2.1 From a9c197b1c06ee5f14850dfb78ce6c38077078b2f Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Tue, 20 May 2014 19:00:24 +0100 Subject: Adds maybe_sanitize_url to kombu.utils.url Conflicts: kombu/utils/url.py --- kombu/utils/url.py | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'kombu') diff --git a/kombu/utils/url.py b/kombu/utils/url.py index adc03004..82e061cd 100644 --- a/kombu/utils/url.py +++ b/kombu/utils/url.py @@ -9,6 +9,7 @@ except ImportError: from urlparse import urlparse, parse_qsl # noqa from . import kwdict +from kombu.five import string_t safequote = partial(quote, safe='') @@ -55,3 +56,9 @@ def as_url(scheme, host=None, port=None, user=None, password=None, def sanitize_url(url, mask='**'): return as_url(*_parse_url(url), sanitize=True, mask=mask) + + +def maybe_sanitize_url(url, mask='**'): + if isinstance(url, string_t) and '://' in url: + return sanitize_url(url, mask) + return url -- cgit v1.2.1 From e7707f97182c893dca757c6dfb075986d223c1d0 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Tue, 20 May 2014 19:05:01 +0100 Subject: Tests passing --- kombu/tests/test_connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'kombu') diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py index 147faaf0..6bd3303f 100644 --- a/kombu/tests/test_connection.py +++ b/kombu/tests/test_connection.py @@ -17,7 +17,7 @@ class test_connection_utils(Case): def setUp(self): self.url = 'amqp://user:pass@localhost:5672/my/vhost' - self.nopass = 'amqp://user@localhost:5672/my/vhost' + self.nopass = 'amqp://user:**@localhost:5672/my/vhost' self.expected = { 'transport': 'amqp', 'userid': 'user', -- cgit v1.2.1 From 40d074af95cd6f48efca619c772a009a0008607c Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Wed, 21 May 2014 15:12:54 +0100 Subject: sanitize_url now works if host is None --- kombu/utils/url.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'kombu') diff --git a/kombu/utils/url.py b/kombu/utils/url.py index 82e061cd..f93282d3 100644 --- a/kombu/utils/url.py +++ b/kombu/utils/url.py @@ -47,7 +47,7 @@ def as_url(scheme, host=None, port=None, user=None, password=None, else: parts.extend([':', safequote(password)]) parts.append('@') - parts.append(safequote(host)) + parts.append(safequote(host) if host else '') if port: parts.extend([':', port]) parts.extend(['/', path]) -- cgit v1.2.1 From c37a005caaed4a9a53b2bd87443b36f9797c826c Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Wed, 21 May 2014 16:49:21 +0100 Subject: Fixes typo in comment --- kombu/transport/virtual/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'kombu') diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index cb844de9..ddcca471 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -520,7 +520,7 @@ class Channel(AbstractChannel, base.StdChannel): return self.typeof(exchange).deliver( message, exchange, routing_key, **kwargs ) - # anon exchange: routing_key is the destintaion queue + # anon exchange: routing_key is the destination queue return self._put(routing_key, message, **kwargs) def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs): -- cgit v1.2.1 From d006baf16947607a6a5bafc71562c6c88e708871 Mon Sep 17 00:00:00 2001 From: "Latitia M. Haskins" Date: Tue, 27 May 2014 12:15:08 -0400 Subject: Issue #355: Check if the userid is populated for couchdb credentials --- kombu/transport/couchdb.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'kombu') diff --git a/kombu/transport/couchdb.py b/kombu/transport/couchdb.py index a4d87d99..0ccceffb 100644 --- a/kombu/transport/couchdb.py +++ b/kombu/transport/couchdb.py @@ -79,7 +79,9 @@ class Channel(virtual.Channel): port)) # Use username and password if avaliable try: - server.resource.credentials = (conninfo.userid, conninfo.password) + if conninfo.userid: + server.resource.credentials = (conninfo.userid, + conninfo.password) except AttributeError: pass try: -- cgit v1.2.1 From fb2bf278e4da999f410776ab07c3fc5f5ae64359 Mon Sep 17 00:00:00 2001 From: "Latitia M. Haskins" Date: Tue, 27 May 2014 12:15:08 -0400 Subject: Issue #355: Check if the userid is populated for couchdb credentials --- kombu/transport/couchdb.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'kombu') diff --git a/kombu/transport/couchdb.py b/kombu/transport/couchdb.py index 009dbbdc..28a8598b 100644 --- a/kombu/transport/couchdb.py +++ b/kombu/transport/couchdb.py @@ -80,7 +80,9 @@ class Channel(virtual.Channel): port)) # Use username and password if avaliable try: - server.resource.credentials = (conninfo.userid, conninfo.password) + if conninfo.userid: + server.resource.credentials = (conninfo.userid, + conninfo.password) except AttributeError: pass try: -- cgit v1.2.1 From b9c897e96ca085dd0206e59d5a2ec5d08b1957f3 Mon Sep 17 00:00:00 2001 From: Felix Schwarz Date: Fri, 18 Apr 2014 15:19:50 +0200 Subject: SQLAlchemy Transport treats OperationalError as connection error (e.g. MySQL connection drops due to inactivity or a MySQL server restart) --- kombu/transport/sqlalchemy/__init__.py | 1 + 1 file changed, 1 insertion(+) (limited to 'kombu') diff --git a/kombu/transport/sqlalchemy/__init__.py b/kombu/transport/sqlalchemy/__init__.py index 0dd5a9da..27c6e65d 100644 --- a/kombu/transport/sqlalchemy/__init__.py +++ b/kombu/transport/sqlalchemy/__init__.py @@ -153,6 +153,7 @@ class Transport(virtual.Transport): default_port = 0 driver_type = 'sql' driver_name = 'sqlalchemy' + connection_errors = (OperationalError, ) def driver_version(self): import sqlalchemy -- cgit v1.2.1 From 5ca5df46a6ef50d2fb6aa9729ffa2ab3dfaf637b Mon Sep 17 00:00:00 2001 From: Felix Schwarz Date: Fri, 18 Apr 2014 15:19:50 +0200 Subject: SQLAlchemy Transport treats OperationalError as connection error (e.g. MySQL connection drops due to inactivity or a MySQL server restart) --- kombu/transport/sqlalchemy/__init__.py | 1 + 1 file changed, 1 insertion(+) (limited to 'kombu') diff --git a/kombu/transport/sqlalchemy/__init__.py b/kombu/transport/sqlalchemy/__init__.py index c085b469..3aab1557 100644 --- a/kombu/transport/sqlalchemy/__init__.py +++ b/kombu/transport/sqlalchemy/__init__.py @@ -153,6 +153,7 @@ class Transport(virtual.Transport): default_port = 0 driver_type = 'sql' driver_name = 'sqlalchemy' + connection_errors = (OperationalError, ) def driver_version(self): import sqlalchemy -- cgit v1.2.1 From 96ed178cda629217a8ae81703d1866676f8cc916 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Mon, 2 Jun 2014 16:56:34 +0100 Subject: Bumps version to 3.0.17 and updates Changelog --- kombu/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'kombu') diff --git a/kombu/__init__.py b/kombu/__init__.py index deac30c4..cb47f1a9 100644 --- a/kombu/__init__.py +++ b/kombu/__init__.py @@ -7,7 +7,7 @@ version_info_t = namedtuple( 'version_info_t', ('major', 'minor', 'micro', 'releaselevel', 'serial'), ) -VERSION = version_info_t(3, 0, 16, '', '') +VERSION = version_info_t(3, 0, 17, '', '') __version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION) __author__ = 'Ask Solem' __contact__ = 'ask@celeryproject.org' -- cgit v1.2.1 From 241c4c0c0d8845f83356d7aa4caf40952faf4ea2 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Mon, 2 Jun 2014 17:17:20 +0100 Subject: amqplib can now be imported without amqplib dependency. Closes #316 --- kombu/transport/amqplib.py | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) (limited to 'kombu') diff --git a/kombu/transport/amqplib.py b/kombu/transport/amqplib.py index fff82a1f..2a8e103a 100644 --- a/kombu/transport/amqplib.py +++ b/kombu/transport/amqplib.py @@ -17,11 +17,26 @@ except ImportError: pass from struct import unpack -from amqplib import client_0_8 as amqp -from amqplib.client_0_8 import transport -from amqplib.client_0_8.channel import Channel as _Channel -from amqplib.client_0_8.exceptions import AMQPConnectionException -from amqplib.client_0_8.exceptions import AMQPChannelException +try: + from amqplib import client_0_8 as amqp + from amqplib.client_0_8 import transport + from amqplib.client_0_8.channel import Channel as _Channel + from amqplib.client_0_8.exceptions import AMQPConnectionException + from amqplib.client_0_8.exceptions import AMQPChannelException +except ImportError: # pragma: no cover + + class NA(object): + pass + + class NAx(object): + pass + amqp = NA + amqp.Connection = NA + transport = _Channel = NA # noqa + # Sphinx crashes if this is NA, must be different class + transport.TCPTransport = transport.SSLTransport = NAx + AMQPConnectionException = AMQPChannelException = NA # noqa + from kombu.five import items from kombu.utils.encoding import str_to_bytes @@ -321,6 +336,9 @@ class Transport(base.Transport): self.client = client self.default_port = kwargs.get('default_port') or self.default_port + if amqp is NA: + raise ImportError('Missing amqplib library (pip install amqplib)') + def create_channel(self, connection): return connection.channel() -- cgit v1.2.1 From b0ce0a4524249db654b5194bf27f3344ad0879e6 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Mon, 2 Jun 2014 17:17:50 +0100 Subject: Redis: raise proper error if redis library missing --- kombu/transport/redis.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'kombu') diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 8d13f36c..414c8e08 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -912,6 +912,8 @@ class Transport(virtual.Transport): driver_name = 'redis' def __init__(self, *args, **kwargs): + if redis is None: + raise ImportError('Missing redis library (pip install redis)') super(Transport, self).__init__(*args, **kwargs) # Get redis-py exceptions. -- cgit v1.2.1 From 3624a37170847d1872b30bd863add51983a54599 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Mon, 2 Jun 2014 17:18:07 +0100 Subject: couchdb can now be imported without couchdb dependency --- kombu/transport/couchdb.py | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) (limited to 'kombu') diff --git a/kombu/transport/couchdb.py b/kombu/transport/couchdb.py index 28a8598b..99d1362e 100644 --- a/kombu/transport/couchdb.py +++ b/kombu/transport/couchdb.py @@ -11,7 +11,6 @@ CouchDB transport. from __future__ import absolute_import import socket -import couchdb from anyjson import loads, dumps @@ -21,6 +20,11 @@ from kombu.utils.encoding import bytes_to_str from . import virtual +try: + import couchdb +except ImportError: # pragma: no cover + couchdb = None # noqa + DEFAULT_PORT = 5984 DEFAULT_DATABASE = 'kombu_default' @@ -112,20 +116,27 @@ class Transport(virtual.Transport): connection_errors = ( virtual.Transport.connection_errors + ( socket.error, - couchdb.HTTPError, - couchdb.ServerError, - couchdb.Unauthorized) + getattr(couchdb, 'HTTPError', None), + getattr(couchdb, 'ServerError', None), + getattr(couchdb, 'Unauthorized', None), + ) ) channel_errors = ( virtual.Transport.channel_errors + ( - couchdb.HTTPError, - couchdb.ServerError, - couchdb.PreconditionFailed, - couchdb.ResourceConflict, - couchdb.ResourceNotFound) + getattr(couchdb, 'HTTPError', None), + getattr(couchdb, 'ServerError', None), + getattr(couchdb, 'PreconditionFailed', None), + getattr(couchdb, 'ResourceConflict', None), + getattr(couchdb, 'ResourceNotFound', None), + ) ) driver_type = 'couchdb' driver_name = 'couchdb' + def __init__(self, *args, **kwargs): + if couchdb is None: + raise ImportError('Missing couchdb library (pip install couchdb)') + super(Transport, self).__init__(*args, **kwargs) + def driver_version(self): return couchdb.__version__ -- cgit v1.2.1 From 1dddd80a6e1f2c6d1f654567266a45cd5ba90a1a Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Mon, 2 Jun 2014 17:18:24 +0100 Subject: beanstalk can now be imported without beanstalkc dependency --- kombu/transport/beanstalk.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) (limited to 'kombu') diff --git a/kombu/transport/beanstalk.py b/kombu/transport/beanstalk.py index 9dff8b49..4e73bbca 100644 --- a/kombu/transport/beanstalk.py +++ b/kombu/transport/beanstalk.py @@ -10,7 +10,6 @@ Beanstalk transport. """ from __future__ import absolute_import -import beanstalkc import socket from anyjson import loads, dumps @@ -20,6 +19,11 @@ from kombu.utils.encoding import bytes_to_str from . import virtual +try: + import beanstalkc +except ImportError: # pragma: no cover + beanstalkc = None # noqa + DEFAULT_PORT = 11300 __author__ = 'David Ziegler ' @@ -127,16 +131,25 @@ class Transport(virtual.Transport): default_port = DEFAULT_PORT connection_errors = ( virtual.Transport.connection_errors + ( - socket.error, beanstalkc.SocketError, IOError) + socket.error, IOError, + getattr(beanstalkc, 'SocketError', None), + ) ) channel_errors = ( virtual.Transport.channel_errors + ( socket.error, IOError, - beanstalkc.SocketError, - beanstalkc.BeanstalkcException) + getattr(beanstalkc, 'SocketError', None), + getattr(beanstalkc, 'BeanstalkcException', None), + ) ) driver_type = 'beanstalk' driver_name = 'beanstalkc' + def __init__(self, *args, **kwargs): + if beanstalkc is None: + raise ImportError( + 'Missing beanstalkc library (pip install beanstalkc)') + super(Transport, self).__init__(*args, **kwargs) + def driver_version(self): return beanstalkc.__version__ -- cgit v1.2.1 From 12b78204d4ba115cf4ef3678c37c10262e7a5d31 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Mon, 2 Jun 2014 17:31:52 +0100 Subject: Tests passing --- kombu/tests/transport/test_redis.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'kombu') diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py index f9dc685a..6b142d1f 100644 --- a/kombu/tests/transport/test_redis.py +++ b/kombu/tests/transport/test_redis.py @@ -220,6 +220,7 @@ class Transport(redis.Transport): class test_Channel(Case): + @skip_if_not_module('redis') def setUp(self): self.connection = self.create_connection() self.channel = self.connection.default_channel @@ -788,6 +789,7 @@ class test_Channel(Case): class test_Redis(Case): + @skip_if_not_module('redis') def setUp(self): self.connection = Connection(transport=Transport) self.exchange = Exchange('test_Redis', type='direct') @@ -944,6 +946,7 @@ def _redis_modules(): class test_MultiChannelPoller(Case): + @skip_if_not_module('redis') def setUp(self): self.Poller = redis.MultiChannelPoller -- cgit v1.2.1 From 19e83733c26d907522616e3ab3c7e3b323eaf50e Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Wed, 4 Jun 2014 14:14:24 +0100 Subject: Hub._ready is now a set --- kombu/async/hub.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) (limited to 'kombu') diff --git a/kombu/async/hub.py b/kombu/async/hub.py index e46f7908..408a065b 100644 --- a/kombu/async/hub.py +++ b/kombu/async/hub.py @@ -15,7 +15,7 @@ from contextlib import contextmanager from time import sleep from types import GeneratorType as generator -from amqp import promise +from amqp.promise import Thenable, promise from kombu.five import Empty, range from kombu.log import get_logger @@ -79,7 +79,7 @@ class Hub(object): self.writers = {} self.on_tick = set() self.on_close = set() - self._ready = deque() + self._ready = set() self._running = False self._loop = None @@ -183,9 +183,10 @@ class Hub(object): self._loop = None def call_soon(self, callback, *args): - handle = promise(callback, args) - self._ready.append(handle) - return handle + if not isinstance(callback, Thenable): + callback = promise(callback, args) + self._ready.add(callback) + return callback def call_later(self, delay, callback, *args): return self.timer.call_after(delay, callback, args) @@ -266,7 +267,7 @@ class Hub(object): tick_callback() while todo: - item = todo.popleft() + item = todo.pop() if item: item() -- cgit v1.2.1 From d1b017b99c0465a716d711f01eefdf58eca8e8aa Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Thu, 5 Jun 2014 16:04:50 +0100 Subject: Optimizes maybe_declare for transient queues --- kombu/common.py | 45 ++++++++++++++++++++++++------------------- kombu/connection.py | 1 + kombu/entity.py | 2 +- kombu/messaging.py | 2 +- kombu/tests/test_common.py | 14 ++++++++------ kombu/tests/test_entities.py | 2 +- kombu/tests/test_messaging.py | 2 +- 7 files changed, 38 insertions(+), 30 deletions(-) (limited to 'kombu') diff --git a/kombu/common.py b/kombu/common.py index 1a6e587b..799d0b64 100644 --- a/kombu/common.py +++ b/kombu/common.py @@ -22,7 +22,6 @@ from amqp import RecoverableConnectionError from .entity import Exchange, Queue from .five import range from .log import get_logger -from .messaging import Consumer as _Consumer from .serialization import registry as serializers from .utils import uuid @@ -91,33 +90,39 @@ def declaration_cached(entity, channel): def maybe_declare(entity, channel=None, retry=False, **retry_policy): - if not entity.is_bound: - assert channel - entity = entity.bind(channel) + is_bound = entity.is_bound + + if channel is None: + assert is_bound + channel = entity.channel + + declared = ident = None + if channel.connection and entity.can_cache_declaration: + declared = channel.connection.client.declared_entities + ident = hash(entity) + if ident in declared: + return False + + entity = entity if is_bound else entity.bind(channel) if retry: - return _imaybe_declare(entity, **retry_policy) - return _maybe_declare(entity) + return _imaybe_declare(entity, declared, ident, channel, **retry_policy) + return _maybe_declare(entity, declared, ident, channel) -def _maybe_declare(entity): - channel = entity.channel +def _maybe_declare(entity, declared, ident, channel): + channel = channel or entity.channel if not channel.connection: raise RecoverableConnectionError('channel disconnected') - if entity.can_cache_declaration: - declared = channel.connection.client.declared_entities - ident = hash(entity) - if ident not in declared: - entity.declare() - declared.add(ident) - return True - return False entity.declare() + if declared is not None and ident: + declared.add(ident) return True -def _imaybe_declare(entity, **retry_policy): +def _imaybe_declare(entity, declared, ident, channel, **retry_policy): return entity.channel.connection.client.ensure( - entity, _maybe_declare, **retry_policy)(entity) + entity, _maybe_declare, **retry_policy)( + entity, declared, ident, channel) def drain_consumer(consumer, limit=1, timeout=None, callbacks=None): @@ -138,8 +143,8 @@ def drain_consumer(consumer, limit=1, timeout=None, callbacks=None): def itermessages(conn, channel, queue, limit=1, timeout=None, - Consumer=_Consumer, callbacks=None, **kwargs): - return drain_consumer(Consumer(channel, queues=[queue], **kwargs), + callbacks=None, **kwargs): + return drain_consumer(conn.Consumer(channel, queues=[queue], **kwargs), limit=limit, timeout=timeout, callbacks=callbacks) diff --git a/kombu/connection.py b/kombu/connection.py index 63b4b685..b18d67d3 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -194,6 +194,7 @@ class Connection(object): """Switch connection parameters to use a new URL (does not reconnect)""" self.close() + self.declared_entities.clear() self._closed = False self._init_params(**dict(self._initial_params, **parse_url(url))) diff --git a/kombu/entity.py b/kombu/entity.py index fda53bef..3856a706 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -672,7 +672,7 @@ class Queue(MaybeChannelBound): @property def can_cache_declaration(self): - return self.durable and not self.auto_delete + return True @classmethod def from_dict(self, queue, **options): diff --git a/kombu/messaging.py b/kombu/messaging.py index 98d59d45..8b923950 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -11,6 +11,7 @@ import numbers from itertools import count +from .common import maybe_declare from .compression import compress from .connection import maybe_channel, is_connection from .entity import Exchange, Queue, DELIVERY_MODES @@ -107,7 +108,6 @@ class Producer(object): """Declare the exchange if it hasn't already been declared during this session.""" if entity: - from .common import maybe_declare return maybe_declare(entity, self.channel, retry, **retry_policy) def publish(self, body, routing_key=None, delivery_mode=None, diff --git a/kombu/tests/test_common.py b/kombu/tests/test_common.py index 34406992..c4eebb71 100644 --- a/kombu/tests/test_common.py +++ b/kombu/tests/test_common.py @@ -105,6 +105,8 @@ class test_maybe_declare(Case): def test_with_retry(self): channel = Mock() + client = channel.connection.client = Mock() + client.declared_entities = set() entity = Mock() entity.can_cache_declaration = True entity.is_bound = True @@ -265,8 +267,8 @@ class test_itermessages(Case): conn = self.MockConnection() channel = Mock() channel.connection.client = conn - it = common.itermessages(conn, channel, 'q', limit=1, - Consumer=MockConsumer) + conn.Consumer = MockConsumer + it = common.itermessages(conn, channel, 'q', limit=1) ret = next(it) self.assertTupleEqual(ret, ('body', 'message')) @@ -279,8 +281,8 @@ class test_itermessages(Case): conn.should_raise_timeout = True channel = Mock() channel.connection.client = conn - it = common.itermessages(conn, channel, 'q', limit=1, - Consumer=MockConsumer) + conn.Consumer = MockConsumer + it = common.itermessages(conn, channel, 'q', limit=1) with self.assertRaises(StopIteration): next(it) @@ -291,8 +293,8 @@ class test_itermessages(Case): deque_instance.popleft.side_effect = IndexError() conn = self.MockConnection() channel = Mock() - it = common.itermessages(conn, channel, 'q', limit=1, - Consumer=MockConsumer) + conn.Consumer = MockConsumer + it = common.itermessages(conn, channel, 'q', limit=1) with self.assertRaises(StopIteration): next(it) diff --git a/kombu/tests/test_entities.py b/kombu/tests/test_entities.py index 165160f1..0ce359d4 100644 --- a/kombu/tests/test_entities.py +++ b/kombu/tests/test_entities.py @@ -285,7 +285,7 @@ class test_Queue(Case): def test_can_cache_declaration(self): self.assertTrue(Queue('a', durable=True).can_cache_declaration) - self.assertFalse(Queue('a', durable=False).can_cache_declaration) + self.assertTrue(Queue('a', durable=False).can_cache_declaration) def test_eq(self): q1 = Queue('xxx', Exchange('xxx', 'direct'), 'xxx') diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py index 877d36cc..6bcf89b3 100644 --- a/kombu/tests/test_messaging.py +++ b/kombu/tests/test_messaging.py @@ -36,7 +36,7 @@ class test_Producer(Case): p = Producer(None) self.assertFalse(p._channel) - @patch('kombu.common.maybe_declare') + @patch('kombu.messaging.maybe_declare') def test_maybe_declare(self, maybe_declare): p = self.connection.Producer() q = Queue('foo') -- cgit v1.2.1 From 244b882e0c103b915e98de6ebb7d0cb026974515 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Mon, 9 Jun 2014 13:53:24 +0100 Subject: Also cache durable and auto_delete entities --- kombu/entity.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'kombu') diff --git a/kombu/entity.py b/kombu/entity.py index 3856a706..53777c68 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -288,7 +288,7 @@ class Exchange(MaybeChannelBound): @property def can_cache_declaration(self): - return self.durable and not self.auto_delete + return True class binding(object): -- cgit v1.2.1