diff options
author | Brian Bouterse <bmbouter@gmail.com> | 2014-06-11 09:21:03 -0400 |
---|---|---|
committer | Brian Bouterse <bmbouter@gmail.com> | 2014-06-11 09:21:03 -0400 |
commit | c569d35c30284bb3deb228d7babd2ff578b76f2e (patch) | |
tree | 4540a84372cfb2d9c6e7d19dd8f90ebbee17e43a | |
parent | 286672f2eca9f497c44e34f1535cf5d166f0813a (diff) | |
parent | 244b882e0c103b915e98de6ebb7d0cb026974515 (diff) | |
download | kombu-c569d35c30284bb3deb228d7babd2ff578b76f2e.tar.gz |
Merge branch 'master' into qpid-transport
-rw-r--r-- | AUTHORS | 2 | ||||
-rw-r--r-- | Changelog | 28 | ||||
-rw-r--r-- | docs/reference/index.rst | 2 | ||||
-rw-r--r-- | kombu/async/hub.py | 13 | ||||
-rw-r--r-- | kombu/common.py | 45 | ||||
-rw-r--r-- | kombu/connection.py | 1 | ||||
-rw-r--r-- | kombu/entity.py | 4 | ||||
-rw-r--r-- | kombu/messaging.py | 2 | ||||
-rw-r--r-- | kombu/tests/test_common.py | 14 | ||||
-rw-r--r-- | kombu/tests/test_entities.py | 2 | ||||
-rw-r--r-- | kombu/tests/test_messaging.py | 2 | ||||
-rw-r--r-- | kombu/tests/transport/test_redis.py | 3 | ||||
-rw-r--r-- | kombu/transport/amqplib.py | 28 | ||||
-rw-r--r-- | kombu/transport/beanstalk.py | 21 | ||||
-rw-r--r-- | kombu/transport/couchdb.py | 33 | ||||
-rw-r--r-- | kombu/transport/redis.py | 4 | ||||
-rw-r--r-- | kombu/transport/sqlalchemy/__init__.py | 1 | ||||
-rw-r--r-- | kombu/utils/json.py | 13 | ||||
-rw-r--r-- | requirements/extras/librabbitmq.txt | 2 | ||||
-rw-r--r-- | tox.ini | 59 |
20 files changed, 213 insertions, 66 deletions
@@ -38,6 +38,7 @@ Dustin J. Mitchell <dustin@mozilla.com> Ephemera <obliviscence+git@gmail.com> Eric Reynolds <ereynolds@opendns.com> Fabrice Rabaute <fabrice@expa.com> +Felix Schwarz <felix.schwarz@oss.schwarz.eu> Fernando Jorge Mota <f.j.mota13@gmail.com> Flavio [FlaPer87] Percoco Premoli <flaper87@flaper87.org> Florian Munz <surf@theflow.de> @@ -62,6 +63,7 @@ Joseph Crosland <jcrosland@flumotion.com> Keith Fitzgerald <ghostrocket@me.com> Kevin McCarthy <me@kevinmccarthy.org> Kevin McDonald <k3vinmcdonald@gmail.com> +Latitia M. Haskins <lhaskins@jetsonsys.com> Mahendra M <Mahendra_M@infosys.com> Marcin Lulek (ergo) <info@webreactor.eu> Mark Lavin <mlavin@caktusgroup.com> @@ -4,6 +4,34 @@ Change history ================ +.. _version-3.0.17: + +3.0.17 +====== +:release-date: 2014-06-02 06:00 P.M UTC +:release-by: Ask Solem + +- ``kombu[librabbitmq]`` now depends on librabbitmq 1.5.2. + +- Async: Event loop now selectively removes file descriptors for the mode + it failed in, and keeps others (e.g read vs write). + + Fix contributed by Roger Hu. + +- CouchDB: Now works without userid set. + + Fix contributed by Latitia M. Haskins. + +- SQLAlchemy: Now supports recovery from connection errors. + + Contributed by Felix Schwarz. + +- Redis: Restore at shutdown now works when ack emulation is disabled. + +- :func:`kombu.common.eventloop` accidentally swallowed socket errors. + +- Adds :func:`kombu.utils.url.sanitize_url` + .. _version-3.0.16: 3.0.16 diff --git a/docs/reference/index.rst b/docs/reference/index.rst index 965bb1d6..202c733a 100644 --- a/docs/reference/index.rst +++ b/docs/reference/index.rst @@ -27,7 +27,7 @@ kombu.async.hub kombu.async.semaphore kombu.async.timer - kombu.async..debug + kombu.async.debug kombu.transport kombu.transport.pyamqp kombu.transport.librabbitmq diff --git a/kombu/async/hub.py b/kombu/async/hub.py index e46f7908..408a065b 100644 --- a/kombu/async/hub.py +++ b/kombu/async/hub.py @@ -15,7 +15,7 @@ from contextlib import contextmanager from time import sleep from types import GeneratorType as generator -from amqp import promise +from amqp.promise import Thenable, promise from kombu.five import Empty, range from kombu.log import get_logger @@ -79,7 +79,7 @@ class Hub(object): self.writers = {} self.on_tick = set() self.on_close = set() - self._ready = deque() + self._ready = set() self._running = False self._loop = None @@ -183,9 +183,10 @@ class Hub(object): self._loop = None def call_soon(self, callback, *args): - handle = promise(callback, args) - self._ready.append(handle) - return handle + if not isinstance(callback, Thenable): + callback = promise(callback, args) + self._ready.add(callback) + return callback def call_later(self, delay, callback, *args): return self.timer.call_after(delay, callback, args) @@ -266,7 +267,7 @@ class Hub(object): tick_callback() while todo: - item = todo.popleft() + item = todo.pop() if item: item() diff --git a/kombu/common.py b/kombu/common.py index 1a6e587b..799d0b64 100644 --- a/kombu/common.py +++ b/kombu/common.py @@ -22,7 +22,6 @@ from amqp import RecoverableConnectionError from .entity import Exchange, Queue from .five import range from .log import get_logger -from .messaging import Consumer as _Consumer from .serialization import registry as serializers from .utils import uuid @@ -91,33 +90,39 @@ def declaration_cached(entity, channel): def maybe_declare(entity, channel=None, retry=False, **retry_policy): - if not entity.is_bound: - assert channel - entity = entity.bind(channel) + is_bound = entity.is_bound + + if channel is None: + assert is_bound + channel = entity.channel + + declared = ident = None + if channel.connection and entity.can_cache_declaration: + declared = channel.connection.client.declared_entities + ident = hash(entity) + if ident in declared: + return False + + entity = entity if is_bound else entity.bind(channel) if retry: - return _imaybe_declare(entity, **retry_policy) - return _maybe_declare(entity) + return _imaybe_declare(entity, declared, ident, channel, **retry_policy) + return _maybe_declare(entity, declared, ident, channel) -def _maybe_declare(entity): - channel = entity.channel +def _maybe_declare(entity, declared, ident, channel): + channel = channel or entity.channel if not channel.connection: raise RecoverableConnectionError('channel disconnected') - if entity.can_cache_declaration: - declared = channel.connection.client.declared_entities - ident = hash(entity) - if ident not in declared: - entity.declare() - declared.add(ident) - return True - return False entity.declare() + if declared is not None and ident: + declared.add(ident) return True -def _imaybe_declare(entity, **retry_policy): +def _imaybe_declare(entity, declared, ident, channel, **retry_policy): return entity.channel.connection.client.ensure( - entity, _maybe_declare, **retry_policy)(entity) + entity, _maybe_declare, **retry_policy)( + entity, declared, ident, channel) def drain_consumer(consumer, limit=1, timeout=None, callbacks=None): @@ -138,8 +143,8 @@ def drain_consumer(consumer, limit=1, timeout=None, callbacks=None): def itermessages(conn, channel, queue, limit=1, timeout=None, - Consumer=_Consumer, callbacks=None, **kwargs): - return drain_consumer(Consumer(channel, queues=[queue], **kwargs), + callbacks=None, **kwargs): + return drain_consumer(conn.Consumer(channel, queues=[queue], **kwargs), limit=limit, timeout=timeout, callbacks=callbacks) diff --git a/kombu/connection.py b/kombu/connection.py index 513d64b2..8e770a3b 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -194,6 +194,7 @@ class Connection(object): """Switch connection parameters to use a new URL (does not reconnect)""" self.close() + self.declared_entities.clear() self._closed = False self._init_params(**dict(self._initial_params, **parse_url(url))) diff --git a/kombu/entity.py b/kombu/entity.py index fda53bef..53777c68 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -288,7 +288,7 @@ class Exchange(MaybeChannelBound): @property def can_cache_declaration(self): - return self.durable and not self.auto_delete + return True class binding(object): @@ -672,7 +672,7 @@ class Queue(MaybeChannelBound): @property def can_cache_declaration(self): - return self.durable and not self.auto_delete + return True @classmethod def from_dict(self, queue, **options): diff --git a/kombu/messaging.py b/kombu/messaging.py index 98d59d45..8b923950 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -11,6 +11,7 @@ import numbers from itertools import count +from .common import maybe_declare from .compression import compress from .connection import maybe_channel, is_connection from .entity import Exchange, Queue, DELIVERY_MODES @@ -107,7 +108,6 @@ class Producer(object): """Declare the exchange if it hasn't already been declared during this session.""" if entity: - from .common import maybe_declare return maybe_declare(entity, self.channel, retry, **retry_policy) def publish(self, body, routing_key=None, delivery_mode=None, diff --git a/kombu/tests/test_common.py b/kombu/tests/test_common.py index 34406992..c4eebb71 100644 --- a/kombu/tests/test_common.py +++ b/kombu/tests/test_common.py @@ -105,6 +105,8 @@ class test_maybe_declare(Case): def test_with_retry(self): channel = Mock() + client = channel.connection.client = Mock() + client.declared_entities = set() entity = Mock() entity.can_cache_declaration = True entity.is_bound = True @@ -265,8 +267,8 @@ class test_itermessages(Case): conn = self.MockConnection() channel = Mock() channel.connection.client = conn - it = common.itermessages(conn, channel, 'q', limit=1, - Consumer=MockConsumer) + conn.Consumer = MockConsumer + it = common.itermessages(conn, channel, 'q', limit=1) ret = next(it) self.assertTupleEqual(ret, ('body', 'message')) @@ -279,8 +281,8 @@ class test_itermessages(Case): conn.should_raise_timeout = True channel = Mock() channel.connection.client = conn - it = common.itermessages(conn, channel, 'q', limit=1, - Consumer=MockConsumer) + conn.Consumer = MockConsumer + it = common.itermessages(conn, channel, 'q', limit=1) with self.assertRaises(StopIteration): next(it) @@ -291,8 +293,8 @@ class test_itermessages(Case): deque_instance.popleft.side_effect = IndexError() conn = self.MockConnection() channel = Mock() - it = common.itermessages(conn, channel, 'q', limit=1, - Consumer=MockConsumer) + conn.Consumer = MockConsumer + it = common.itermessages(conn, channel, 'q', limit=1) with self.assertRaises(StopIteration): next(it) diff --git a/kombu/tests/test_entities.py b/kombu/tests/test_entities.py index 165160f1..0ce359d4 100644 --- a/kombu/tests/test_entities.py +++ b/kombu/tests/test_entities.py @@ -285,7 +285,7 @@ class test_Queue(Case): def test_can_cache_declaration(self): self.assertTrue(Queue('a', durable=True).can_cache_declaration) - self.assertFalse(Queue('a', durable=False).can_cache_declaration) + self.assertTrue(Queue('a', durable=False).can_cache_declaration) def test_eq(self): q1 = Queue('xxx', Exchange('xxx', 'direct'), 'xxx') diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py index 877d36cc..6bcf89b3 100644 --- a/kombu/tests/test_messaging.py +++ b/kombu/tests/test_messaging.py @@ -36,7 +36,7 @@ class test_Producer(Case): p = Producer(None) self.assertFalse(p._channel) - @patch('kombu.common.maybe_declare') + @patch('kombu.messaging.maybe_declare') def test_maybe_declare(self, maybe_declare): p = self.connection.Producer() q = Queue('foo') diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py index f9dc685a..6b142d1f 100644 --- a/kombu/tests/transport/test_redis.py +++ b/kombu/tests/transport/test_redis.py @@ -220,6 +220,7 @@ class Transport(redis.Transport): class test_Channel(Case): + @skip_if_not_module('redis') def setUp(self): self.connection = self.create_connection() self.channel = self.connection.default_channel @@ -788,6 +789,7 @@ class test_Channel(Case): class test_Redis(Case): + @skip_if_not_module('redis') def setUp(self): self.connection = Connection(transport=Transport) self.exchange = Exchange('test_Redis', type='direct') @@ -944,6 +946,7 @@ def _redis_modules(): class test_MultiChannelPoller(Case): + @skip_if_not_module('redis') def setUp(self): self.Poller = redis.MultiChannelPoller diff --git a/kombu/transport/amqplib.py b/kombu/transport/amqplib.py index fff82a1f..2a8e103a 100644 --- a/kombu/transport/amqplib.py +++ b/kombu/transport/amqplib.py @@ -17,11 +17,26 @@ except ImportError: pass from struct import unpack -from amqplib import client_0_8 as amqp -from amqplib.client_0_8 import transport -from amqplib.client_0_8.channel import Channel as _Channel -from amqplib.client_0_8.exceptions import AMQPConnectionException -from amqplib.client_0_8.exceptions import AMQPChannelException +try: + from amqplib import client_0_8 as amqp + from amqplib.client_0_8 import transport + from amqplib.client_0_8.channel import Channel as _Channel + from amqplib.client_0_8.exceptions import AMQPConnectionException + from amqplib.client_0_8.exceptions import AMQPChannelException +except ImportError: # pragma: no cover + + class NA(object): + pass + + class NAx(object): + pass + amqp = NA + amqp.Connection = NA + transport = _Channel = NA # noqa + # Sphinx crashes if this is NA, must be different class + transport.TCPTransport = transport.SSLTransport = NAx + AMQPConnectionException = AMQPChannelException = NA # noqa + from kombu.five import items from kombu.utils.encoding import str_to_bytes @@ -321,6 +336,9 @@ class Transport(base.Transport): self.client = client self.default_port = kwargs.get('default_port') or self.default_port + if amqp is NA: + raise ImportError('Missing amqplib library (pip install amqplib)') + def create_channel(self, connection): return connection.channel() diff --git a/kombu/transport/beanstalk.py b/kombu/transport/beanstalk.py index b7688631..fd12575d 100644 --- a/kombu/transport/beanstalk.py +++ b/kombu/transport/beanstalk.py @@ -10,7 +10,6 @@ Beanstalk transport. """ from __future__ import absolute_import -import beanstalkc import socket from kombu.five import Empty @@ -19,6 +18,11 @@ from kombu.utils.json import loads, dumps from . import virtual +try: + import beanstalkc +except ImportError: # pragma: no cover + beanstalkc = None # noqa + DEFAULT_PORT = 11300 __author__ = 'David Ziegler <david.ziegler@gmail.com>' @@ -126,16 +130,25 @@ class Transport(virtual.Transport): default_port = DEFAULT_PORT connection_errors = ( virtual.Transport.connection_errors + ( - socket.error, beanstalkc.SocketError, IOError) + socket.error, IOError, + getattr(beanstalkc, 'SocketError', None), + ) ) channel_errors = ( virtual.Transport.channel_errors + ( socket.error, IOError, - beanstalkc.SocketError, - beanstalkc.BeanstalkcException) + getattr(beanstalkc, 'SocketError', None), + getattr(beanstalkc, 'BeanstalkcException', None), + ) ) driver_type = 'beanstalk' driver_name = 'beanstalkc' + def __init__(self, *args, **kwargs): + if beanstalkc is None: + raise ImportError( + 'Missing beanstalkc library (pip install beanstalkc)') + super(Transport, self).__init__(*args, **kwargs) + def driver_version(self): return beanstalkc.__version__ diff --git a/kombu/transport/couchdb.py b/kombu/transport/couchdb.py index a4d87d99..c7811ab9 100644 --- a/kombu/transport/couchdb.py +++ b/kombu/transport/couchdb.py @@ -11,7 +11,6 @@ CouchDB transport. from __future__ import absolute_import import socket -import couchdb from kombu.five import Empty from kombu.utils import uuid4 @@ -20,6 +19,11 @@ from kombu.utils.json import loads, dumps from . import virtual +try: + import couchdb +except ImportError: # pragma: no cover + couchdb = None # noqa + DEFAULT_PORT = 5984 DEFAULT_DATABASE = 'kombu_default' @@ -79,7 +83,9 @@ class Channel(virtual.Channel): port)) # Use username and password if avaliable try: - server.resource.credentials = (conninfo.userid, conninfo.password) + if conninfo.userid: + server.resource.credentials = (conninfo.userid, + conninfo.password) except AttributeError: pass try: @@ -109,20 +115,27 @@ class Transport(virtual.Transport): connection_errors = ( virtual.Transport.connection_errors + ( socket.error, - couchdb.HTTPError, - couchdb.ServerError, - couchdb.Unauthorized) + getattr(couchdb, 'HTTPError', None), + getattr(couchdb, 'ServerError', None), + getattr(couchdb, 'Unauthorized', None), + ) ) channel_errors = ( virtual.Transport.channel_errors + ( - couchdb.HTTPError, - couchdb.ServerError, - couchdb.PreconditionFailed, - couchdb.ResourceConflict, - couchdb.ResourceNotFound) + getattr(couchdb, 'HTTPError', None), + getattr(couchdb, 'ServerError', None), + getattr(couchdb, 'PreconditionFailed', None), + getattr(couchdb, 'ResourceConflict', None), + getattr(couchdb, 'ResourceNotFound', None), + ) ) driver_type = 'couchdb' driver_name = 'couchdb' + def __init__(self, *args, **kwargs): + if couchdb is None: + raise ImportError('Missing couchdb library (pip install couchdb)') + super(Transport, self).__init__(*args, **kwargs) + def driver_version(self): return couchdb.__version__ diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 30015a60..96ca0177 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -475,6 +475,8 @@ class Channel(virtual.Channel): crit('Could not restore message: %r', payload, exc_info=True) def _restore(self, message, leftmost=False): + if not self.ack_emulation: + return super(Channel, self)._restore(message) tag = message.delivery_tag with self.conn_or_acquire() as client: P, _ = client.pipeline() \ @@ -907,6 +909,8 @@ class Transport(virtual.Transport): driver_name = 'redis' def __init__(self, *args, **kwargs): + if redis is None: + raise ImportError('Missing redis library (pip install redis)') super(Transport, self).__init__(*args, **kwargs) # Get redis-py exceptions. diff --git a/kombu/transport/sqlalchemy/__init__.py b/kombu/transport/sqlalchemy/__init__.py index 0dd5a9da..27c6e65d 100644 --- a/kombu/transport/sqlalchemy/__init__.py +++ b/kombu/transport/sqlalchemy/__init__.py @@ -153,6 +153,7 @@ class Transport(virtual.Transport): default_port = 0 driver_type = 'sql' driver_name = 'sqlalchemy' + connection_errors = (OperationalError, ) def driver_version(self): import sqlalchemy diff --git a/kombu/utils/json.py b/kombu/utils/json.py index 9dd03429..a5227467 100644 --- a/kombu/utils/json.py +++ b/kombu/utils/json.py @@ -17,14 +17,11 @@ class JSONEncoder(json.JSONEncoder): def default(self, obj, _super=json.JSONEncoder.default): try: - _super(self, obj) - except TypeError: - try: - reducer = obj.__json__ - except AttributeError: - raise - else: - return reducer() + reducer = obj.__json__ + except AttributeError: + return _super(self, obj) + else: + return reducer() def dumps(s, _dumps=json.dumps, cls=JSONEncoder): diff --git a/requirements/extras/librabbitmq.txt b/requirements/extras/librabbitmq.txt index f43f4558..866d11bc 100644 --- a/requirements/extras/librabbitmq.txt +++ b/requirements/extras/librabbitmq.txt @@ -1 +1 @@ -librabbitmq>=1.5.1 +librabbitmq>=1.5.2 diff --git a/tox.ini b/tox.ini new file mode 100644 index 00000000..5b8c32d7 --- /dev/null +++ b/tox.ini @@ -0,0 +1,59 @@ +[tox] +envlist = + 2.7, + 3.3, + 3.4, + pypy + +[testenv] +sitepackages = False +commands = nosetests + +[testenv:3.4] +basepython = python3.4 +deps = -r{toxinidir}/requirements/default.txt + -r{toxinidir}/requirements/test3.txt + -r{toxinidir}/requirements/test-ci3.txt +commands = {toxinidir}/extra/release/removepyc.sh {toxinidir} + pip install -U -r{toxinidir}/requirements/dev.txt + nosetests -vds kombu.tests \ + --with-coverage --cover-inclusive --cover-erase [] + +[testenv:3.3] +basepython = python3.3 +deps = -r{toxinidir}/requirements/default.txt + -r{toxinidir}/requirements/test3.txt + -r{toxinidir}/requirements/test-ci3.txt +commands = {toxinidir}/extra/release/removepyc.sh {toxinidir} + pip install -U -r{toxinidir}/requirements/dev.txt + nosetests -vds kombu.tests \ + --with-coverage --cover-inclusive --cover-erase [] + +[testenv:2.7] +basepython = python2.7 +deps = -r{toxinidir}/requirements/default.txt + -r{toxinidir}/requirements/test.txt + -r{toxinidir}/requirements/test-ci.txt +commands = {toxinidir}/extra/release/removepyc.sh {toxinidir} + pip install -U -r{toxinidir}/requirements/dev.txt + nosetests --with-coverage --cover-inclusive --cover-erase [] + +[testenv:pypy] +basepython = pypy +deps = -r{toxinidir}/requirements/default.txt + -r{toxinidir}/requirements/test.txt + -r{toxinidir}/requirements/test-ci.txt +commands = {toxinidir}/extra/release/removepyc.sh {toxinidir} + pip install -U -r{toxinidir}/requirements/dev.txt + nosetests --with-coverage --cover-inclusive --cover-erase [] + +[testenv:jython] +basepython = jython +recreate = True +where = .tox +deps = -r{toxinidir}/requirements/default.txt + -r{toxinidir}/requirements/test.txt + -r{toxinidir}/requirements/test-ci.txt +commands = {toxinidir}/extra/release/removepyc.sh {toxinidir} + pip install -U -r{toxinidir}/requirements/dev.txt + nosetests --with-coverage --cover-inclusive --cover-erase [] |