summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--AUTHORS3
-rw-r--r--README.rst8
-rw-r--r--examples/simple_task_queue/client.py2
-rw-r--r--examples/simple_task_queue/worker.py2
-rw-r--r--kombu/connection.py4
-rw-r--r--kombu/serialization.py3
-rw-r--r--kombu/transport/SQS.py3
-rw-r--r--kombu/transport/couchdb.py9
8 files changed, 25 insertions, 9 deletions
diff --git a/AUTHORS b/AUTHORS
index 656c23d3..9fe137d6 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -10,6 +10,7 @@ Andrew Watts
Andy McCurdy <andy@andymccurdy.com>
Anton Gyllenberg <anton@iki.fi>
Ask Solem <ask@celeryproject.org>
+Christophe Chauvet <christophe.chauvet@gmail.com>
Christopher Grebs <cg@webshox.org>
Clay Gerrard <clay.gerrard@gmail.com>
David Clymer <david@zettazebra.com>
@@ -33,6 +34,7 @@ Patrick Schneider <patrick.p2k.schneider@gmail.com>
Paul McLanahan <paul@mclanahan.net>
Petar Radosevic <petar@wunki.org>
Peter Hoffmann <tosh54@gmail.com>
+Rafael Duran Castaneda <rafadurancastaneda@gmail.com>
Ralf Nyren <ralf-github@nyren.net>
Rob Ottaway <robottaway@gmail.com>
Rune Halvorsen <runeh@opera.com>
@@ -40,6 +42,7 @@ Ryan Petrello <lists@ryanpetrello.com>
Sean Bleier <sebleier@gmail.com>
Sean Creeley <sean.creeley@gmail.com>
Shane Caraveo <shane@caraveo.com>
+Stefan Eletzhofer <se@nexiles.de>
Stephen Day <stevvooe@gmail.com>
Tareque Hossain
Tomaž Muraus <kami@k5-storitve.net>
diff --git a/README.rst b/README.rst
index 6c1d7e20..b00a5b9a 100644
--- a/README.rst
+++ b/README.rst
@@ -120,19 +120,23 @@ Quick overview
media_exchange = Exchange("media", "direct", durable=True)
video_queue = Queue("video", exchange=media_exchange, routing_key="video")
+ def process_media(body, message):
+ print body
+ message.ack()
+
# connections
with BrokerConnection("amqp://guest:guest@localhost//") as conn:
# produce
with conn.Producer(exchange=media_exchange,
- serializer="json") as producer:
+ serializer="json", routing_key="video") as producer:
producer.publish({"name": "/tmp/lolcat1.avi", "size": 1301013})
# consume
with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
# Process messages and handle events on all channels
while True:
- connection.drain_events()
+ conn.drain_events()
# Consume from several queues on the same channel:
video_queue = Queue("video", exchange=media_exchange, key="video")
diff --git a/examples/simple_task_queue/client.py b/examples/simple_task_queue/client.py
index eae36144..1ab6175a 100644
--- a/examples/simple_task_queue/client.py
+++ b/examples/simple_task_queue/client.py
@@ -25,5 +25,5 @@ if __name__ == "__main__":
from tasks import hello_task
connection = BrokerConnection("amqp://guest:guest@localhost:5672//")
- send_as_task(connection, fun=hello_task, args=("Kombu", ),
+ send_as_task(connection, fun=hello_task, args=("Kombu", ), kwargs={},
priority="high")
diff --git a/examples/simple_task_queue/worker.py b/examples/simple_task_queue/worker.py
index 8b521a43..063a6b4d 100644
--- a/examples/simple_task_queue/worker.py
+++ b/examples/simple_task_queue/worker.py
@@ -28,7 +28,7 @@ class Worker(ConsumerMixin):
if __name__ == "__main__":
from kombu import BrokerConnection
- from kombu.log import setup_logging
+ from kombu.utils.debug import setup_logging
setup_logging(loglevel="INFO")
with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn:
diff --git a/kombu/connection.py b/kombu/connection.py
index 7d39a1b8..c3309b84 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -464,11 +464,11 @@ class BrokerConnection(object):
channel = self # use default channel support.
return Producer(channel, *args, **kwargs)
- def Consumer(self, channel=None, *args, **kwargs):
+ def Consumer(self, queues=None, channel=None, *args, **kwargs):
from .messaging import Consumer
if channel is None:
channel = self # use default channel support.
- return Consumer(channel, *args, **kwargs)
+ return Consumer(channel, queues, *args, **kwargs)
def SimpleQueue(self, name, no_ack=None, queue_opts=None,
exchange_opts=None, channel=None, **kwargs):
diff --git a/kombu/serialization.py b/kombu/serialization.py
index 0f5d3298..d0169c60 100644
--- a/kombu/serialization.py
+++ b/kombu/serialization.py
@@ -148,6 +148,9 @@ class SerializerRegistry(object):
except KeyError:
return data
+ if not data:
+ return data
+
return decoder(data)
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py
index 91a517a4..436a864b 100644
--- a/kombu/transport/SQS.py
+++ b/kombu/transport/SQS.py
@@ -26,6 +26,7 @@ from boto.sqs.connection import SQSConnection
from boto.sqs.message import Message
from ..utils import cached_property, uuid
+from ..utils.encoding import safe_str
from . import virtual
@@ -155,7 +156,7 @@ class Channel(virtual.Channel):
def entity_name(self, name, table=CHARS_REPLACE_TABLE):
"""Format AMQP queue name into a legal SQS queue name."""
- return name.encode(errors="replace").translate(table)
+ return safe_str(name).translate(table)
def _new_queue(self, queue, **kwargs):
"""Ensures a queue exists in SQS."""
diff --git a/kombu/transport/couchdb.py b/kombu/transport/couchdb.py
index 669edd21..748622c0 100644
--- a/kombu/transport/couchdb.py
+++ b/kombu/transport/couchdb.py
@@ -77,10 +77,15 @@ class Channel(virtual.Channel):
server = couchdb.Server('%s://%s:%s/' % (proto,
conninfo.hostname,
port))
+ # Use username and password if avaliable
+ try:
+ server.resource.credentials = (conninfo.userid, conninfo.password)
+ except AttributeError:
+ pass
try:
- return server.create(dbname)
- except couchdb.PreconditionFailed:
return server[dbname]
+ except couchdb.http.ResourceNotFound:
+ return server.create(dbname)
def _query(self, queue, **kwargs):
if not self.view_created: