summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrian Bouterse <bmbouter@gmail.com>2014-06-11 09:21:03 -0400
committerBrian Bouterse <bmbouter@gmail.com>2014-06-11 09:21:03 -0400
commitc569d35c30284bb3deb228d7babd2ff578b76f2e (patch)
tree4540a84372cfb2d9c6e7d19dd8f90ebbee17e43a
parent286672f2eca9f497c44e34f1535cf5d166f0813a (diff)
parent244b882e0c103b915e98de6ebb7d0cb026974515 (diff)
downloadkombu-c569d35c30284bb3deb228d7babd2ff578b76f2e.tar.gz
Merge branch 'master' into qpid-transport
-rw-r--r--AUTHORS2
-rw-r--r--Changelog28
-rw-r--r--docs/reference/index.rst2
-rw-r--r--kombu/async/hub.py13
-rw-r--r--kombu/common.py45
-rw-r--r--kombu/connection.py1
-rw-r--r--kombu/entity.py4
-rw-r--r--kombu/messaging.py2
-rw-r--r--kombu/tests/test_common.py14
-rw-r--r--kombu/tests/test_entities.py2
-rw-r--r--kombu/tests/test_messaging.py2
-rw-r--r--kombu/tests/transport/test_redis.py3
-rw-r--r--kombu/transport/amqplib.py28
-rw-r--r--kombu/transport/beanstalk.py21
-rw-r--r--kombu/transport/couchdb.py33
-rw-r--r--kombu/transport/redis.py4
-rw-r--r--kombu/transport/sqlalchemy/__init__.py1
-rw-r--r--kombu/utils/json.py13
-rw-r--r--requirements/extras/librabbitmq.txt2
-rw-r--r--tox.ini59
20 files changed, 213 insertions, 66 deletions
diff --git a/AUTHORS b/AUTHORS
index ac3fed7d..1d90bc23 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -38,6 +38,7 @@ Dustin J. Mitchell <dustin@mozilla.com>
Ephemera <obliviscence+git@gmail.com>
Eric Reynolds <ereynolds@opendns.com>
Fabrice Rabaute <fabrice@expa.com>
+Felix Schwarz <felix.schwarz@oss.schwarz.eu>
Fernando Jorge Mota <f.j.mota13@gmail.com>
Flavio [FlaPer87] Percoco Premoli <flaper87@flaper87.org>
Florian Munz <surf@theflow.de>
@@ -62,6 +63,7 @@ Joseph Crosland <jcrosland@flumotion.com>
Keith Fitzgerald <ghostrocket@me.com>
Kevin McCarthy <me@kevinmccarthy.org>
Kevin McDonald <k3vinmcdonald@gmail.com>
+Latitia M. Haskins <lhaskins@jetsonsys.com>
Mahendra M <Mahendra_M@infosys.com>
Marcin Lulek (ergo) <info@webreactor.eu>
Mark Lavin <mlavin@caktusgroup.com>
diff --git a/Changelog b/Changelog
index 047ee32b..fdf55977 100644
--- a/Changelog
+++ b/Changelog
@@ -4,6 +4,34 @@
Change history
================
+.. _version-3.0.17:
+
+3.0.17
+======
+:release-date: 2014-06-02 06:00 P.M UTC
+:release-by: Ask Solem
+
+- ``kombu[librabbitmq]`` now depends on librabbitmq 1.5.2.
+
+- Async: Event loop now selectively removes file descriptors for the mode
+ it failed in, and keeps others (e.g read vs write).
+
+ Fix contributed by Roger Hu.
+
+- CouchDB: Now works without userid set.
+
+ Fix contributed by Latitia M. Haskins.
+
+- SQLAlchemy: Now supports recovery from connection errors.
+
+ Contributed by Felix Schwarz.
+
+- Redis: Restore at shutdown now works when ack emulation is disabled.
+
+- :func:`kombu.common.eventloop` accidentally swallowed socket errors.
+
+- Adds :func:`kombu.utils.url.sanitize_url`
+
.. _version-3.0.16:
3.0.16
diff --git a/docs/reference/index.rst b/docs/reference/index.rst
index 965bb1d6..202c733a 100644
--- a/docs/reference/index.rst
+++ b/docs/reference/index.rst
@@ -27,7 +27,7 @@
kombu.async.hub
kombu.async.semaphore
kombu.async.timer
- kombu.async..debug
+ kombu.async.debug
kombu.transport
kombu.transport.pyamqp
kombu.transport.librabbitmq
diff --git a/kombu/async/hub.py b/kombu/async/hub.py
index e46f7908..408a065b 100644
--- a/kombu/async/hub.py
+++ b/kombu/async/hub.py
@@ -15,7 +15,7 @@ from contextlib import contextmanager
from time import sleep
from types import GeneratorType as generator
-from amqp import promise
+from amqp.promise import Thenable, promise
from kombu.five import Empty, range
from kombu.log import get_logger
@@ -79,7 +79,7 @@ class Hub(object):
self.writers = {}
self.on_tick = set()
self.on_close = set()
- self._ready = deque()
+ self._ready = set()
self._running = False
self._loop = None
@@ -183,9 +183,10 @@ class Hub(object):
self._loop = None
def call_soon(self, callback, *args):
- handle = promise(callback, args)
- self._ready.append(handle)
- return handle
+ if not isinstance(callback, Thenable):
+ callback = promise(callback, args)
+ self._ready.add(callback)
+ return callback
def call_later(self, delay, callback, *args):
return self.timer.call_after(delay, callback, args)
@@ -266,7 +267,7 @@ class Hub(object):
tick_callback()
while todo:
- item = todo.popleft()
+ item = todo.pop()
if item:
item()
diff --git a/kombu/common.py b/kombu/common.py
index 1a6e587b..799d0b64 100644
--- a/kombu/common.py
+++ b/kombu/common.py
@@ -22,7 +22,6 @@ from amqp import RecoverableConnectionError
from .entity import Exchange, Queue
from .five import range
from .log import get_logger
-from .messaging import Consumer as _Consumer
from .serialization import registry as serializers
from .utils import uuid
@@ -91,33 +90,39 @@ def declaration_cached(entity, channel):
def maybe_declare(entity, channel=None, retry=False, **retry_policy):
- if not entity.is_bound:
- assert channel
- entity = entity.bind(channel)
+ is_bound = entity.is_bound
+
+ if channel is None:
+ assert is_bound
+ channel = entity.channel
+
+ declared = ident = None
+ if channel.connection and entity.can_cache_declaration:
+ declared = channel.connection.client.declared_entities
+ ident = hash(entity)
+ if ident in declared:
+ return False
+
+ entity = entity if is_bound else entity.bind(channel)
if retry:
- return _imaybe_declare(entity, **retry_policy)
- return _maybe_declare(entity)
+ return _imaybe_declare(entity, declared, ident, channel, **retry_policy)
+ return _maybe_declare(entity, declared, ident, channel)
-def _maybe_declare(entity):
- channel = entity.channel
+def _maybe_declare(entity, declared, ident, channel):
+ channel = channel or entity.channel
if not channel.connection:
raise RecoverableConnectionError('channel disconnected')
- if entity.can_cache_declaration:
- declared = channel.connection.client.declared_entities
- ident = hash(entity)
- if ident not in declared:
- entity.declare()
- declared.add(ident)
- return True
- return False
entity.declare()
+ if declared is not None and ident:
+ declared.add(ident)
return True
-def _imaybe_declare(entity, **retry_policy):
+def _imaybe_declare(entity, declared, ident, channel, **retry_policy):
return entity.channel.connection.client.ensure(
- entity, _maybe_declare, **retry_policy)(entity)
+ entity, _maybe_declare, **retry_policy)(
+ entity, declared, ident, channel)
def drain_consumer(consumer, limit=1, timeout=None, callbacks=None):
@@ -138,8 +143,8 @@ def drain_consumer(consumer, limit=1, timeout=None, callbacks=None):
def itermessages(conn, channel, queue, limit=1, timeout=None,
- Consumer=_Consumer, callbacks=None, **kwargs):
- return drain_consumer(Consumer(channel, queues=[queue], **kwargs),
+ callbacks=None, **kwargs):
+ return drain_consumer(conn.Consumer(channel, queues=[queue], **kwargs),
limit=limit, timeout=timeout, callbacks=callbacks)
diff --git a/kombu/connection.py b/kombu/connection.py
index 513d64b2..8e770a3b 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -194,6 +194,7 @@ class Connection(object):
"""Switch connection parameters to use a new URL (does not
reconnect)"""
self.close()
+ self.declared_entities.clear()
self._closed = False
self._init_params(**dict(self._initial_params, **parse_url(url)))
diff --git a/kombu/entity.py b/kombu/entity.py
index fda53bef..53777c68 100644
--- a/kombu/entity.py
+++ b/kombu/entity.py
@@ -288,7 +288,7 @@ class Exchange(MaybeChannelBound):
@property
def can_cache_declaration(self):
- return self.durable and not self.auto_delete
+ return True
class binding(object):
@@ -672,7 +672,7 @@ class Queue(MaybeChannelBound):
@property
def can_cache_declaration(self):
- return self.durable and not self.auto_delete
+ return True
@classmethod
def from_dict(self, queue, **options):
diff --git a/kombu/messaging.py b/kombu/messaging.py
index 98d59d45..8b923950 100644
--- a/kombu/messaging.py
+++ b/kombu/messaging.py
@@ -11,6 +11,7 @@ import numbers
from itertools import count
+from .common import maybe_declare
from .compression import compress
from .connection import maybe_channel, is_connection
from .entity import Exchange, Queue, DELIVERY_MODES
@@ -107,7 +108,6 @@ class Producer(object):
"""Declare the exchange if it hasn't already been declared
during this session."""
if entity:
- from .common import maybe_declare
return maybe_declare(entity, self.channel, retry, **retry_policy)
def publish(self, body, routing_key=None, delivery_mode=None,
diff --git a/kombu/tests/test_common.py b/kombu/tests/test_common.py
index 34406992..c4eebb71 100644
--- a/kombu/tests/test_common.py
+++ b/kombu/tests/test_common.py
@@ -105,6 +105,8 @@ class test_maybe_declare(Case):
def test_with_retry(self):
channel = Mock()
+ client = channel.connection.client = Mock()
+ client.declared_entities = set()
entity = Mock()
entity.can_cache_declaration = True
entity.is_bound = True
@@ -265,8 +267,8 @@ class test_itermessages(Case):
conn = self.MockConnection()
channel = Mock()
channel.connection.client = conn
- it = common.itermessages(conn, channel, 'q', limit=1,
- Consumer=MockConsumer)
+ conn.Consumer = MockConsumer
+ it = common.itermessages(conn, channel, 'q', limit=1)
ret = next(it)
self.assertTupleEqual(ret, ('body', 'message'))
@@ -279,8 +281,8 @@ class test_itermessages(Case):
conn.should_raise_timeout = True
channel = Mock()
channel.connection.client = conn
- it = common.itermessages(conn, channel, 'q', limit=1,
- Consumer=MockConsumer)
+ conn.Consumer = MockConsumer
+ it = common.itermessages(conn, channel, 'q', limit=1)
with self.assertRaises(StopIteration):
next(it)
@@ -291,8 +293,8 @@ class test_itermessages(Case):
deque_instance.popleft.side_effect = IndexError()
conn = self.MockConnection()
channel = Mock()
- it = common.itermessages(conn, channel, 'q', limit=1,
- Consumer=MockConsumer)
+ conn.Consumer = MockConsumer
+ it = common.itermessages(conn, channel, 'q', limit=1)
with self.assertRaises(StopIteration):
next(it)
diff --git a/kombu/tests/test_entities.py b/kombu/tests/test_entities.py
index 165160f1..0ce359d4 100644
--- a/kombu/tests/test_entities.py
+++ b/kombu/tests/test_entities.py
@@ -285,7 +285,7 @@ class test_Queue(Case):
def test_can_cache_declaration(self):
self.assertTrue(Queue('a', durable=True).can_cache_declaration)
- self.assertFalse(Queue('a', durable=False).can_cache_declaration)
+ self.assertTrue(Queue('a', durable=False).can_cache_declaration)
def test_eq(self):
q1 = Queue('xxx', Exchange('xxx', 'direct'), 'xxx')
diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py
index 877d36cc..6bcf89b3 100644
--- a/kombu/tests/test_messaging.py
+++ b/kombu/tests/test_messaging.py
@@ -36,7 +36,7 @@ class test_Producer(Case):
p = Producer(None)
self.assertFalse(p._channel)
- @patch('kombu.common.maybe_declare')
+ @patch('kombu.messaging.maybe_declare')
def test_maybe_declare(self, maybe_declare):
p = self.connection.Producer()
q = Queue('foo')
diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py
index f9dc685a..6b142d1f 100644
--- a/kombu/tests/transport/test_redis.py
+++ b/kombu/tests/transport/test_redis.py
@@ -220,6 +220,7 @@ class Transport(redis.Transport):
class test_Channel(Case):
+ @skip_if_not_module('redis')
def setUp(self):
self.connection = self.create_connection()
self.channel = self.connection.default_channel
@@ -788,6 +789,7 @@ class test_Channel(Case):
class test_Redis(Case):
+ @skip_if_not_module('redis')
def setUp(self):
self.connection = Connection(transport=Transport)
self.exchange = Exchange('test_Redis', type='direct')
@@ -944,6 +946,7 @@ def _redis_modules():
class test_MultiChannelPoller(Case):
+ @skip_if_not_module('redis')
def setUp(self):
self.Poller = redis.MultiChannelPoller
diff --git a/kombu/transport/amqplib.py b/kombu/transport/amqplib.py
index fff82a1f..2a8e103a 100644
--- a/kombu/transport/amqplib.py
+++ b/kombu/transport/amqplib.py
@@ -17,11 +17,26 @@ except ImportError:
pass
from struct import unpack
-from amqplib import client_0_8 as amqp
-from amqplib.client_0_8 import transport
-from amqplib.client_0_8.channel import Channel as _Channel
-from amqplib.client_0_8.exceptions import AMQPConnectionException
-from amqplib.client_0_8.exceptions import AMQPChannelException
+try:
+ from amqplib import client_0_8 as amqp
+ from amqplib.client_0_8 import transport
+ from amqplib.client_0_8.channel import Channel as _Channel
+ from amqplib.client_0_8.exceptions import AMQPConnectionException
+ from amqplib.client_0_8.exceptions import AMQPChannelException
+except ImportError: # pragma: no cover
+
+ class NA(object):
+ pass
+
+ class NAx(object):
+ pass
+ amqp = NA
+ amqp.Connection = NA
+ transport = _Channel = NA # noqa
+ # Sphinx crashes if this is NA, must be different class
+ transport.TCPTransport = transport.SSLTransport = NAx
+ AMQPConnectionException = AMQPChannelException = NA # noqa
+
from kombu.five import items
from kombu.utils.encoding import str_to_bytes
@@ -321,6 +336,9 @@ class Transport(base.Transport):
self.client = client
self.default_port = kwargs.get('default_port') or self.default_port
+ if amqp is NA:
+ raise ImportError('Missing amqplib library (pip install amqplib)')
+
def create_channel(self, connection):
return connection.channel()
diff --git a/kombu/transport/beanstalk.py b/kombu/transport/beanstalk.py
index b7688631..fd12575d 100644
--- a/kombu/transport/beanstalk.py
+++ b/kombu/transport/beanstalk.py
@@ -10,7 +10,6 @@ Beanstalk transport.
"""
from __future__ import absolute_import
-import beanstalkc
import socket
from kombu.five import Empty
@@ -19,6 +18,11 @@ from kombu.utils.json import loads, dumps
from . import virtual
+try:
+ import beanstalkc
+except ImportError: # pragma: no cover
+ beanstalkc = None # noqa
+
DEFAULT_PORT = 11300
__author__ = 'David Ziegler <david.ziegler@gmail.com>'
@@ -126,16 +130,25 @@ class Transport(virtual.Transport):
default_port = DEFAULT_PORT
connection_errors = (
virtual.Transport.connection_errors + (
- socket.error, beanstalkc.SocketError, IOError)
+ socket.error, IOError,
+ getattr(beanstalkc, 'SocketError', None),
+ )
)
channel_errors = (
virtual.Transport.channel_errors + (
socket.error, IOError,
- beanstalkc.SocketError,
- beanstalkc.BeanstalkcException)
+ getattr(beanstalkc, 'SocketError', None),
+ getattr(beanstalkc, 'BeanstalkcException', None),
+ )
)
driver_type = 'beanstalk'
driver_name = 'beanstalkc'
+ def __init__(self, *args, **kwargs):
+ if beanstalkc is None:
+ raise ImportError(
+ 'Missing beanstalkc library (pip install beanstalkc)')
+ super(Transport, self).__init__(*args, **kwargs)
+
def driver_version(self):
return beanstalkc.__version__
diff --git a/kombu/transport/couchdb.py b/kombu/transport/couchdb.py
index a4d87d99..c7811ab9 100644
--- a/kombu/transport/couchdb.py
+++ b/kombu/transport/couchdb.py
@@ -11,7 +11,6 @@ CouchDB transport.
from __future__ import absolute_import
import socket
-import couchdb
from kombu.five import Empty
from kombu.utils import uuid4
@@ -20,6 +19,11 @@ from kombu.utils.json import loads, dumps
from . import virtual
+try:
+ import couchdb
+except ImportError: # pragma: no cover
+ couchdb = None # noqa
+
DEFAULT_PORT = 5984
DEFAULT_DATABASE = 'kombu_default'
@@ -79,7 +83,9 @@ class Channel(virtual.Channel):
port))
# Use username and password if avaliable
try:
- server.resource.credentials = (conninfo.userid, conninfo.password)
+ if conninfo.userid:
+ server.resource.credentials = (conninfo.userid,
+ conninfo.password)
except AttributeError:
pass
try:
@@ -109,20 +115,27 @@ class Transport(virtual.Transport):
connection_errors = (
virtual.Transport.connection_errors + (
socket.error,
- couchdb.HTTPError,
- couchdb.ServerError,
- couchdb.Unauthorized)
+ getattr(couchdb, 'HTTPError', None),
+ getattr(couchdb, 'ServerError', None),
+ getattr(couchdb, 'Unauthorized', None),
+ )
)
channel_errors = (
virtual.Transport.channel_errors + (
- couchdb.HTTPError,
- couchdb.ServerError,
- couchdb.PreconditionFailed,
- couchdb.ResourceConflict,
- couchdb.ResourceNotFound)
+ getattr(couchdb, 'HTTPError', None),
+ getattr(couchdb, 'ServerError', None),
+ getattr(couchdb, 'PreconditionFailed', None),
+ getattr(couchdb, 'ResourceConflict', None),
+ getattr(couchdb, 'ResourceNotFound', None),
+ )
)
driver_type = 'couchdb'
driver_name = 'couchdb'
+ def __init__(self, *args, **kwargs):
+ if couchdb is None:
+ raise ImportError('Missing couchdb library (pip install couchdb)')
+ super(Transport, self).__init__(*args, **kwargs)
+
def driver_version(self):
return couchdb.__version__
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 30015a60..96ca0177 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -475,6 +475,8 @@ class Channel(virtual.Channel):
crit('Could not restore message: %r', payload, exc_info=True)
def _restore(self, message, leftmost=False):
+ if not self.ack_emulation:
+ return super(Channel, self)._restore(message)
tag = message.delivery_tag
with self.conn_or_acquire() as client:
P, _ = client.pipeline() \
@@ -907,6 +909,8 @@ class Transport(virtual.Transport):
driver_name = 'redis'
def __init__(self, *args, **kwargs):
+ if redis is None:
+ raise ImportError('Missing redis library (pip install redis)')
super(Transport, self).__init__(*args, **kwargs)
# Get redis-py exceptions.
diff --git a/kombu/transport/sqlalchemy/__init__.py b/kombu/transport/sqlalchemy/__init__.py
index 0dd5a9da..27c6e65d 100644
--- a/kombu/transport/sqlalchemy/__init__.py
+++ b/kombu/transport/sqlalchemy/__init__.py
@@ -153,6 +153,7 @@ class Transport(virtual.Transport):
default_port = 0
driver_type = 'sql'
driver_name = 'sqlalchemy'
+ connection_errors = (OperationalError, )
def driver_version(self):
import sqlalchemy
diff --git a/kombu/utils/json.py b/kombu/utils/json.py
index 9dd03429..a5227467 100644
--- a/kombu/utils/json.py
+++ b/kombu/utils/json.py
@@ -17,14 +17,11 @@ class JSONEncoder(json.JSONEncoder):
def default(self, obj, _super=json.JSONEncoder.default):
try:
- _super(self, obj)
- except TypeError:
- try:
- reducer = obj.__json__
- except AttributeError:
- raise
- else:
- return reducer()
+ reducer = obj.__json__
+ except AttributeError:
+ return _super(self, obj)
+ else:
+ return reducer()
def dumps(s, _dumps=json.dumps, cls=JSONEncoder):
diff --git a/requirements/extras/librabbitmq.txt b/requirements/extras/librabbitmq.txt
index f43f4558..866d11bc 100644
--- a/requirements/extras/librabbitmq.txt
+++ b/requirements/extras/librabbitmq.txt
@@ -1 +1 @@
-librabbitmq>=1.5.1
+librabbitmq>=1.5.2
diff --git a/tox.ini b/tox.ini
new file mode 100644
index 00000000..5b8c32d7
--- /dev/null
+++ b/tox.ini
@@ -0,0 +1,59 @@
+[tox]
+envlist =
+ 2.7,
+ 3.3,
+ 3.4,
+ pypy
+
+[testenv]
+sitepackages = False
+commands = nosetests
+
+[testenv:3.4]
+basepython = python3.4
+deps = -r{toxinidir}/requirements/default.txt
+ -r{toxinidir}/requirements/test3.txt
+ -r{toxinidir}/requirements/test-ci3.txt
+commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
+ pip install -U -r{toxinidir}/requirements/dev.txt
+ nosetests -vds kombu.tests \
+ --with-coverage --cover-inclusive --cover-erase []
+
+[testenv:3.3]
+basepython = python3.3
+deps = -r{toxinidir}/requirements/default.txt
+ -r{toxinidir}/requirements/test3.txt
+ -r{toxinidir}/requirements/test-ci3.txt
+commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
+ pip install -U -r{toxinidir}/requirements/dev.txt
+ nosetests -vds kombu.tests \
+ --with-coverage --cover-inclusive --cover-erase []
+
+[testenv:2.7]
+basepython = python2.7
+deps = -r{toxinidir}/requirements/default.txt
+ -r{toxinidir}/requirements/test.txt
+ -r{toxinidir}/requirements/test-ci.txt
+commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
+ pip install -U -r{toxinidir}/requirements/dev.txt
+ nosetests --with-coverage --cover-inclusive --cover-erase []
+
+[testenv:pypy]
+basepython = pypy
+deps = -r{toxinidir}/requirements/default.txt
+ -r{toxinidir}/requirements/test.txt
+ -r{toxinidir}/requirements/test-ci.txt
+commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
+ pip install -U -r{toxinidir}/requirements/dev.txt
+ nosetests --with-coverage --cover-inclusive --cover-erase []
+
+[testenv:jython]
+basepython = jython
+recreate = True
+where = .tox
+deps = -r{toxinidir}/requirements/default.txt
+ -r{toxinidir}/requirements/test.txt
+ -r{toxinidir}/requirements/test-ci.txt
+commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
+ pip install -U -r{toxinidir}/requirements/dev.txt
+ nosetests --with-coverage --cover-inclusive --cover-erase []