diff options
-rw-r--r-- | AUTHORS | 2 | ||||
-rw-r--r-- | funtests/tests/test_couchdb.py | 2 | ||||
-rw-r--r-- | kombu/transport/couchdb.py | 84 | ||||
-rw-r--r-- | kombu/transport/django/managers.py | 2 | ||||
-rw-r--r-- | kombu/transport/redis.py | 5 | ||||
-rw-r--r-- | kombu/utils/eventio.py | 2 | ||||
-rw-r--r-- | requirements/extras/couchdb.txt | 2 |
7 files changed, 57 insertions, 42 deletions
@@ -74,6 +74,7 @@ Maxime Rouyrre <rouyrre+git@gmail.com> Mher Movsisyan <mher.movsisyan@gmail.com> Michael Barrett <mb@eventbrite.com> Michael Nelson <michaeln@telesign.com> +Nathan Van Gheem <vangheem@gmail.com> Nitzan Miron <bug.assembla@bugbug.me> Noah Kantrowitz <noah@coderanger.net> Ollie Walsh <ollie.walsh@geemail.kom> @@ -83,6 +84,7 @@ Paul McLanahan <paul@mclanahan.net> Petar Radosevic <petar@wunki.org> Peter Hoffmann <tosh54@gmail.com> Pierre Riteau <priteau@ci.uchicago.edu> +Radek Czajka <radoslaw.czajka@nowoczesnapolska.org.pl> Rafael Duran Castaneda <rafadurancastaneda@gmail.com> Rafal Malinowski <malinowski@red-sky.pl> Ralf Nyren <ralf-github@nyren.net> diff --git a/funtests/tests/test_couchdb.py b/funtests/tests/test_couchdb.py index 697d0e21..3f0f218f 100644 --- a/funtests/tests/test_couchdb.py +++ b/funtests/tests/test_couchdb.py @@ -10,7 +10,7 @@ class test_couchdb(transport.TransportCase): def before_connect(self): try: - import couchdb # noqa + import pycouchdb # noqa except ImportError: raise SkipTest('couchdb not installed') diff --git a/kombu/transport/couchdb.py b/kombu/transport/couchdb.py index c7811ab9..7b0356e4 100644 --- a/kombu/transport/couchdb.py +++ b/kombu/transport/couchdb.py @@ -20,9 +20,11 @@ from kombu.utils.json import loads, dumps from . import virtual try: - import couchdb + import pycouchdb + from pycouchdb import exceptions + from requests import exceptions as requests_exceptions except ImportError: # pragma: no cover - couchdb = None # noqa + pycouchdb = exceptions = requests_exceptions = None # noqa DEFAULT_PORT = 5984 DEFAULT_DATABASE = 'kombu_default' @@ -30,17 +32,22 @@ DEFAULT_DATABASE = 'kombu_default' __author__ = 'David Clymer <david@zettazebra.com>' -def create_message_view(db): - from couchdb import design - - view = design.ViewDefinition('kombu', 'messages', """ +def create_message_view(container): + _id = '_design/kombu' + try: + existing = container.get(_id) + except exceptions.NotFound: + existing = {'_id': _id, 'views': {}} + existing['language'] = 'javascript' + existing['views']['messages'] = { + 'map': """ function (doc) { if (doc.queue && doc.payload) emit(doc.queue, doc); } - """) - if not view.get_doc(db): - view.sync(db) + """ + } + container.save(existing) class Channel(virtual.Channel): @@ -58,14 +65,17 @@ class Channel(virtual.Channel): if not result: raise Empty() - item = result.rows[0].value - self.client.delete(item) + try: + item = result[0]['value'] + except (KeyError, IndexError): + raise Empty() + self.client.delete(item['_id']) return loads(bytes_to_str(item['payload'])) def _purge(self, queue): result = self._query(queue) for item in result: - self.client.delete(item.value) + self.client.delete(item['value']['_id']) return len(result) def _size(self, queue): @@ -78,19 +88,18 @@ class Channel(virtual.Channel): if not dbname or dbname == '/': dbname = DEFAULT_DATABASE port = conninfo.port or DEFAULT_PORT - server = couchdb.Server('%s://%s:%s/' % (proto, - conninfo.hostname, - port)) - # Use username and password if avaliable - try: - if conninfo.userid: - server.resource.credentials = (conninfo.userid, - conninfo.password) - except AttributeError: - pass + + if conninfo.userid and conninfo.password: + server = pycouchdb.Server('%s://%s:%s@%s:%s/' % ( + proto, conninfo.userid, conninfo.password, + conninfo.hostname, port), + authmethod='basic') + else: + server = pycouchdb.Server('%s://%s:%s/' % ( + proto, conninfo.hostname, port)) try: - return server[dbname] - except couchdb.http.ResourceNotFound: + return server.database(dbname) + except exceptions.NotFound: return server.create(dbname) def _query(self, queue, **kwargs): @@ -98,7 +107,7 @@ class Channel(virtual.Channel): # if the message view is not yet set up, we'll need it now. create_message_view(self.client) self.view_created = True - return self.client.view('kombu/messages', key=queue, **kwargs) + return list(self.client.query('kombu/messages', key=queue, **kwargs)) @property def client(self): @@ -115,27 +124,30 @@ class Transport(virtual.Transport): connection_errors = ( virtual.Transport.connection_errors + ( socket.error, - getattr(couchdb, 'HTTPError', None), - getattr(couchdb, 'ServerError', None), - getattr(couchdb, 'Unauthorized', None), + getattr(exceptions, 'AuthenticationFailed', None), + getattr(requests_exceptions, 'HTTPError', None), + getattr(requests_exceptions, 'ConnectionError', None), + getattr(requests_exceptions, 'SSLError', None), + getattr(requests_exceptions, 'Timeout', None) ) ) channel_errors = ( virtual.Transport.channel_errors + ( - getattr(couchdb, 'HTTPError', None), - getattr(couchdb, 'ServerError', None), - getattr(couchdb, 'PreconditionFailed', None), - getattr(couchdb, 'ResourceConflict', None), - getattr(couchdb, 'ResourceNotFound', None), + getattr(exceptions, 'Error', None), + getattr(exceptions, 'UnexpectedError', None), + getattr(exceptions, 'NotFound', None), + getattr(exceptions, 'Conflict', None), + getattr(exceptions, 'ResourceNotFound', None), + getattr(exceptions, 'GenericError', None) ) ) driver_type = 'couchdb' driver_name = 'couchdb' def __init__(self, *args, **kwargs): - if couchdb is None: - raise ImportError('Missing couchdb library (pip install couchdb)') + if pycouchdb is None: + raise ImportError('Missing pycouchdb library (pip install pycouchdb)') # noqa super(Transport, self).__init__(*args, **kwargs) def driver_version(self): - return couchdb.__version__ + return pycouchdb.__version__ diff --git a/kombu/transport/django/managers.py b/kombu/transport/django/managers.py index 7d00c68c..8e816315 100644 --- a/kombu/transport/django/managers.py +++ b/kombu/transport/django/managers.py @@ -19,6 +19,7 @@ else: def _commit(*args, **kwargs): with transaction.atomic(): return fun(*args, **kwargs) + return _commit @@ -75,7 +76,6 @@ class MessageManager(models.Manager): recv[0] += 1 if not recv[0] % self.cleanup_every: self.cleanup() - transaction.commit() return result.payload except self.model.DoesNotExist: pass diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 301b1ef3..d5e7489c 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -144,8 +144,9 @@ class QoS(virtual.QoS): super(QoS, self).append(message, delivery_tag) def restore_unacked(self): - for tag in self._delivered: - self.restore_by_tag(tag) + with self.channel.conn_or_acquire(client) as client: + for tag in self._delivered: + self.restore_by_tag(tag, client=client) self._delivered.clear() def ack(self, delivery_tag): diff --git a/kombu/utils/eventio.py b/kombu/utils/eventio.py index 6de7e7ae..660acf87 100644 --- a/kombu/utils/eventio.py +++ b/kombu/utils/eventio.py @@ -235,7 +235,7 @@ class _poll(object): events |= ERR if not isinstance(fd, Integral): fd = fd.fileno() - ready.append(fd, events) + ready.append((fd, events)) return ready def close(self): diff --git a/requirements/extras/couchdb.txt b/requirements/extras/couchdb.txt index 3e100d4b..bc7a1a32 100644 --- a/requirements/extras/couchdb.txt +++ b/requirements/extras/couchdb.txt @@ -1 +1 @@ -couchdb +pycouchdb |