diff options
-rw-r--r-- | AUTHORS | 3 | ||||
-rw-r--r-- | README.rst | 8 | ||||
-rw-r--r-- | examples/simple_task_queue/client.py | 2 | ||||
-rw-r--r-- | examples/simple_task_queue/worker.py | 2 | ||||
-rw-r--r-- | kombu/connection.py | 4 | ||||
-rw-r--r-- | kombu/serialization.py | 3 | ||||
-rw-r--r-- | kombu/transport/SQS.py | 3 | ||||
-rw-r--r-- | kombu/transport/couchdb.py | 9 |
8 files changed, 25 insertions, 9 deletions
@@ -10,6 +10,7 @@ Andrew Watts Andy McCurdy <andy@andymccurdy.com> Anton Gyllenberg <anton@iki.fi> Ask Solem <ask@celeryproject.org> +Christophe Chauvet <christophe.chauvet@gmail.com> Christopher Grebs <cg@webshox.org> Clay Gerrard <clay.gerrard@gmail.com> David Clymer <david@zettazebra.com> @@ -33,6 +34,7 @@ Patrick Schneider <patrick.p2k.schneider@gmail.com> Paul McLanahan <paul@mclanahan.net> Petar Radosevic <petar@wunki.org> Peter Hoffmann <tosh54@gmail.com> +Rafael Duran Castaneda <rafadurancastaneda@gmail.com> Ralf Nyren <ralf-github@nyren.net> Rob Ottaway <robottaway@gmail.com> Rune Halvorsen <runeh@opera.com> @@ -40,6 +42,7 @@ Ryan Petrello <lists@ryanpetrello.com> Sean Bleier <sebleier@gmail.com> Sean Creeley <sean.creeley@gmail.com> Shane Caraveo <shane@caraveo.com> +Stefan Eletzhofer <se@nexiles.de> Stephen Day <stevvooe@gmail.com> Tareque Hossain Tomaž Muraus <kami@k5-storitve.net> @@ -120,19 +120,23 @@ Quick overview media_exchange = Exchange("media", "direct", durable=True) video_queue = Queue("video", exchange=media_exchange, routing_key="video") + def process_media(body, message): + print body + message.ack() + # connections with BrokerConnection("amqp://guest:guest@localhost//") as conn: # produce with conn.Producer(exchange=media_exchange, - serializer="json") as producer: + serializer="json", routing_key="video") as producer: producer.publish({"name": "/tmp/lolcat1.avi", "size": 1301013}) # consume with conn.Consumer(video_queue, callbacks=[process_media]) as consumer: # Process messages and handle events on all channels while True: - connection.drain_events() + conn.drain_events() # Consume from several queues on the same channel: video_queue = Queue("video", exchange=media_exchange, key="video") diff --git a/examples/simple_task_queue/client.py b/examples/simple_task_queue/client.py index eae36144..1ab6175a 100644 --- a/examples/simple_task_queue/client.py +++ b/examples/simple_task_queue/client.py @@ -25,5 +25,5 @@ if __name__ == "__main__": from tasks import hello_task connection = BrokerConnection("amqp://guest:guest@localhost:5672//") - send_as_task(connection, fun=hello_task, args=("Kombu", ), + send_as_task(connection, fun=hello_task, args=("Kombu", ), kwargs={}, priority="high") diff --git a/examples/simple_task_queue/worker.py b/examples/simple_task_queue/worker.py index 8b521a43..063a6b4d 100644 --- a/examples/simple_task_queue/worker.py +++ b/examples/simple_task_queue/worker.py @@ -28,7 +28,7 @@ class Worker(ConsumerMixin): if __name__ == "__main__": from kombu import BrokerConnection - from kombu.log import setup_logging + from kombu.utils.debug import setup_logging setup_logging(loglevel="INFO") with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn: diff --git a/kombu/connection.py b/kombu/connection.py index 7d39a1b8..c3309b84 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -464,11 +464,11 @@ class BrokerConnection(object): channel = self # use default channel support. return Producer(channel, *args, **kwargs) - def Consumer(self, channel=None, *args, **kwargs): + def Consumer(self, queues=None, channel=None, *args, **kwargs): from .messaging import Consumer if channel is None: channel = self # use default channel support. - return Consumer(channel, *args, **kwargs) + return Consumer(channel, queues, *args, **kwargs) def SimpleQueue(self, name, no_ack=None, queue_opts=None, exchange_opts=None, channel=None, **kwargs): diff --git a/kombu/serialization.py b/kombu/serialization.py index 0f5d3298..d0169c60 100644 --- a/kombu/serialization.py +++ b/kombu/serialization.py @@ -148,6 +148,9 @@ class SerializerRegistry(object): except KeyError: return data + if not data: + return data + return decoder(data) diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index 91a517a4..436a864b 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -26,6 +26,7 @@ from boto.sqs.connection import SQSConnection from boto.sqs.message import Message from ..utils import cached_property, uuid +from ..utils.encoding import safe_str from . import virtual @@ -155,7 +156,7 @@ class Channel(virtual.Channel): def entity_name(self, name, table=CHARS_REPLACE_TABLE): """Format AMQP queue name into a legal SQS queue name.""" - return name.encode(errors="replace").translate(table) + return safe_str(name).translate(table) def _new_queue(self, queue, **kwargs): """Ensures a queue exists in SQS.""" diff --git a/kombu/transport/couchdb.py b/kombu/transport/couchdb.py index 669edd21..748622c0 100644 --- a/kombu/transport/couchdb.py +++ b/kombu/transport/couchdb.py @@ -77,10 +77,15 @@ class Channel(virtual.Channel): server = couchdb.Server('%s://%s:%s/' % (proto, conninfo.hostname, port)) + # Use username and password if avaliable + try: + server.resource.credentials = (conninfo.userid, conninfo.password) + except AttributeError: + pass try: - return server.create(dbname) - except couchdb.PreconditionFailed: return server[dbname] + except couchdb.http.ResourceNotFound: + return server.create(dbname) def _query(self, queue, **kwargs): if not self.view_created: |