summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--AUTHORS2
-rw-r--r--funtests/tests/test_couchdb.py2
-rw-r--r--kombu/transport/couchdb.py84
-rw-r--r--kombu/transport/django/managers.py2
-rw-r--r--kombu/transport/redis.py5
-rw-r--r--kombu/utils/eventio.py2
-rw-r--r--requirements/extras/couchdb.txt2
7 files changed, 57 insertions, 42 deletions
diff --git a/AUTHORS b/AUTHORS
index 519e59cf..000ab239 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -74,6 +74,7 @@ Maxime Rouyrre <rouyrre+git@gmail.com>
Mher Movsisyan <mher.movsisyan@gmail.com>
Michael Barrett <mb@eventbrite.com>
Michael Nelson <michaeln@telesign.com>
+Nathan Van Gheem <vangheem@gmail.com>
Nitzan Miron <bug.assembla@bugbug.me>
Noah Kantrowitz <noah@coderanger.net>
Ollie Walsh <ollie.walsh@geemail.kom>
@@ -83,6 +84,7 @@ Paul McLanahan <paul@mclanahan.net>
Petar Radosevic <petar@wunki.org>
Peter Hoffmann <tosh54@gmail.com>
Pierre Riteau <priteau@ci.uchicago.edu>
+Radek Czajka <radoslaw.czajka@nowoczesnapolska.org.pl>
Rafael Duran Castaneda <rafadurancastaneda@gmail.com>
Rafal Malinowski <malinowski@red-sky.pl>
Ralf Nyren <ralf-github@nyren.net>
diff --git a/funtests/tests/test_couchdb.py b/funtests/tests/test_couchdb.py
index 697d0e21..3f0f218f 100644
--- a/funtests/tests/test_couchdb.py
+++ b/funtests/tests/test_couchdb.py
@@ -10,7 +10,7 @@ class test_couchdb(transport.TransportCase):
def before_connect(self):
try:
- import couchdb # noqa
+ import pycouchdb # noqa
except ImportError:
raise SkipTest('couchdb not installed')
diff --git a/kombu/transport/couchdb.py b/kombu/transport/couchdb.py
index c7811ab9..7b0356e4 100644
--- a/kombu/transport/couchdb.py
+++ b/kombu/transport/couchdb.py
@@ -20,9 +20,11 @@ from kombu.utils.json import loads, dumps
from . import virtual
try:
- import couchdb
+ import pycouchdb
+ from pycouchdb import exceptions
+ from requests import exceptions as requests_exceptions
except ImportError: # pragma: no cover
- couchdb = None # noqa
+ pycouchdb = exceptions = requests_exceptions = None # noqa
DEFAULT_PORT = 5984
DEFAULT_DATABASE = 'kombu_default'
@@ -30,17 +32,22 @@ DEFAULT_DATABASE = 'kombu_default'
__author__ = 'David Clymer <david@zettazebra.com>'
-def create_message_view(db):
- from couchdb import design
-
- view = design.ViewDefinition('kombu', 'messages', """
+def create_message_view(container):
+ _id = '_design/kombu'
+ try:
+ existing = container.get(_id)
+ except exceptions.NotFound:
+ existing = {'_id': _id, 'views': {}}
+ existing['language'] = 'javascript'
+ existing['views']['messages'] = {
+ 'map': """
function (doc) {
if (doc.queue && doc.payload)
emit(doc.queue, doc);
}
- """)
- if not view.get_doc(db):
- view.sync(db)
+ """
+ }
+ container.save(existing)
class Channel(virtual.Channel):
@@ -58,14 +65,17 @@ class Channel(virtual.Channel):
if not result:
raise Empty()
- item = result.rows[0].value
- self.client.delete(item)
+ try:
+ item = result[0]['value']
+ except (KeyError, IndexError):
+ raise Empty()
+ self.client.delete(item['_id'])
return loads(bytes_to_str(item['payload']))
def _purge(self, queue):
result = self._query(queue)
for item in result:
- self.client.delete(item.value)
+ self.client.delete(item['value']['_id'])
return len(result)
def _size(self, queue):
@@ -78,19 +88,18 @@ class Channel(virtual.Channel):
if not dbname or dbname == '/':
dbname = DEFAULT_DATABASE
port = conninfo.port or DEFAULT_PORT
- server = couchdb.Server('%s://%s:%s/' % (proto,
- conninfo.hostname,
- port))
- # Use username and password if avaliable
- try:
- if conninfo.userid:
- server.resource.credentials = (conninfo.userid,
- conninfo.password)
- except AttributeError:
- pass
+
+ if conninfo.userid and conninfo.password:
+ server = pycouchdb.Server('%s://%s:%s@%s:%s/' % (
+ proto, conninfo.userid, conninfo.password,
+ conninfo.hostname, port),
+ authmethod='basic')
+ else:
+ server = pycouchdb.Server('%s://%s:%s/' % (
+ proto, conninfo.hostname, port))
try:
- return server[dbname]
- except couchdb.http.ResourceNotFound:
+ return server.database(dbname)
+ except exceptions.NotFound:
return server.create(dbname)
def _query(self, queue, **kwargs):
@@ -98,7 +107,7 @@ class Channel(virtual.Channel):
# if the message view is not yet set up, we'll need it now.
create_message_view(self.client)
self.view_created = True
- return self.client.view('kombu/messages', key=queue, **kwargs)
+ return list(self.client.query('kombu/messages', key=queue, **kwargs))
@property
def client(self):
@@ -115,27 +124,30 @@ class Transport(virtual.Transport):
connection_errors = (
virtual.Transport.connection_errors + (
socket.error,
- getattr(couchdb, 'HTTPError', None),
- getattr(couchdb, 'ServerError', None),
- getattr(couchdb, 'Unauthorized', None),
+ getattr(exceptions, 'AuthenticationFailed', None),
+ getattr(requests_exceptions, 'HTTPError', None),
+ getattr(requests_exceptions, 'ConnectionError', None),
+ getattr(requests_exceptions, 'SSLError', None),
+ getattr(requests_exceptions, 'Timeout', None)
)
)
channel_errors = (
virtual.Transport.channel_errors + (
- getattr(couchdb, 'HTTPError', None),
- getattr(couchdb, 'ServerError', None),
- getattr(couchdb, 'PreconditionFailed', None),
- getattr(couchdb, 'ResourceConflict', None),
- getattr(couchdb, 'ResourceNotFound', None),
+ getattr(exceptions, 'Error', None),
+ getattr(exceptions, 'UnexpectedError', None),
+ getattr(exceptions, 'NotFound', None),
+ getattr(exceptions, 'Conflict', None),
+ getattr(exceptions, 'ResourceNotFound', None),
+ getattr(exceptions, 'GenericError', None)
)
)
driver_type = 'couchdb'
driver_name = 'couchdb'
def __init__(self, *args, **kwargs):
- if couchdb is None:
- raise ImportError('Missing couchdb library (pip install couchdb)')
+ if pycouchdb is None:
+ raise ImportError('Missing pycouchdb library (pip install pycouchdb)') # noqa
super(Transport, self).__init__(*args, **kwargs)
def driver_version(self):
- return couchdb.__version__
+ return pycouchdb.__version__
diff --git a/kombu/transport/django/managers.py b/kombu/transport/django/managers.py
index 7d00c68c..8e816315 100644
--- a/kombu/transport/django/managers.py
+++ b/kombu/transport/django/managers.py
@@ -19,6 +19,7 @@ else:
def _commit(*args, **kwargs):
with transaction.atomic():
return fun(*args, **kwargs)
+ return _commit
@@ -75,7 +76,6 @@ class MessageManager(models.Manager):
recv[0] += 1
if not recv[0] % self.cleanup_every:
self.cleanup()
- transaction.commit()
return result.payload
except self.model.DoesNotExist:
pass
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 301b1ef3..d5e7489c 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -144,8 +144,9 @@ class QoS(virtual.QoS):
super(QoS, self).append(message, delivery_tag)
def restore_unacked(self):
- for tag in self._delivered:
- self.restore_by_tag(tag)
+ with self.channel.conn_or_acquire(client) as client:
+ for tag in self._delivered:
+ self.restore_by_tag(tag, client=client)
self._delivered.clear()
def ack(self, delivery_tag):
diff --git a/kombu/utils/eventio.py b/kombu/utils/eventio.py
index 6de7e7ae..660acf87 100644
--- a/kombu/utils/eventio.py
+++ b/kombu/utils/eventio.py
@@ -235,7 +235,7 @@ class _poll(object):
events |= ERR
if not isinstance(fd, Integral):
fd = fd.fileno()
- ready.append(fd, events)
+ ready.append((fd, events))
return ready
def close(self):
diff --git a/requirements/extras/couchdb.txt b/requirements/extras/couchdb.txt
index 3e100d4b..bc7a1a32 100644
--- a/requirements/extras/couchdb.txt
+++ b/requirements/extras/couchdb.txt
@@ -1 +1 @@
-couchdb
+pycouchdb