diff options
| author | Ask Solem <ask@celeryproject.org> | 2011-09-12 10:47:22 +0100 |
|---|---|---|
| committer | Ask Solem <ask@celeryproject.org> | 2011-09-12 10:47:22 +0100 |
| commit | ac1af44fbaff0c743f3259e9fb3c8ca2047a5109 (patch) | |
| tree | 84eb7c2ec51656f6108152f2ca5d79d94112836b /examples/simple_task_queue | |
| parent | 3e82865f7f131c9e89a7cd762f251586ad594a7b (diff) | |
| download | kombu-ac1af44fbaff0c743f3259e9fb3c8ca2047a5109.tar.gz | |
Task queue example working
Diffstat (limited to 'examples/simple_task_queue')
| -rw-r--r-- | examples/simple_task_queue/client.py | 11 | ||||
| -rw-r--r-- | examples/simple_task_queue/queues.py | 1 | ||||
| -rw-r--r-- | examples/simple_task_queue/tasks.py | 2 | ||||
| -rw-r--r-- | examples/simple_task_queue/worker.py | 21 |
4 files changed, 19 insertions, 16 deletions
diff --git a/examples/simple_task_queue/client.py b/examples/simple_task_queue/client.py index 489aa2e6..eae36144 100644 --- a/examples/simple_task_queue/client.py +++ b/examples/simple_task_queue/client.py @@ -3,26 +3,27 @@ from __future__ import with_statement from kombu.common import maybe_declare from kombu.pools import producers -from .queues import task_exchange +from queues import task_exchange priority_to_routing_key = {"high": "hipri", "mid": "midpri", "low": "lopri"} -def send_as_task(connection, fun, args, kwargs, priority="mid"): + +def send_as_task(connection, fun, args=(), kwargs={}, priority="mid"): payload = {"fun": fun, "args": args, "kwargs": kwargs} routing_key = priority_to_routing_key[priority] with producers[connection].acquire(block=True) as producer: maybe_declare(task_exchange, producer.channel) producer.publish(payload, serializer="pickle", - compression="zlib", + compression="bzip2", routing_key=routing_key) if __name__ == "__main__": from kombu import BrokerConnection - from .tasks import hello_task + from tasks import hello_task - conection = BrokerConnection("amqp://guest:guest@localhost:5672//") + connection = BrokerConnection("amqp://guest:guest@localhost:5672//") send_as_task(connection, fun=hello_task, args=("Kombu", ), priority="high") diff --git a/examples/simple_task_queue/queues.py b/examples/simple_task_queue/queues.py index 5f1feef2..680e7575 100644 --- a/examples/simple_task_queue/queues.py +++ b/examples/simple_task_queue/queues.py @@ -4,4 +4,3 @@ task_exchange = Exchange("tasks", type="direct") task_queues = [Queue("hipri", task_exchange, routing_key="hipri"), Queue("midpri", task_exchange, routing_key="midpri"), Queue("lopri", task_exchange, routing_key="lopri")] - diff --git a/examples/simple_task_queue/tasks.py b/examples/simple_task_queue/tasks.py index 47146e45..f6e9da03 100644 --- a/examples/simple_task_queue/tasks.py +++ b/examples/simple_task_queue/tasks.py @@ -1,4 +1,2 @@ def hello_task(who="world"): print("Hello %s" % (who, )) - - diff --git a/examples/simple_task_queue/worker.py b/examples/simple_task_queue/worker.py index 68e7d6b6..0fa4c261 100644 --- a/examples/simple_task_queue/worker.py +++ b/examples/simple_task_queue/worker.py @@ -1,22 +1,29 @@ from __future__ import with_statement -from kombu import Exchange, Queue from kombu.mixins import ConsumerMixin -from kombu.utils import kwdict +from kombu.utils import kwdict, reprcall from queues import task_queues + class Worker(ConsumerMixin): + def __init__(self, connection): + self.connection = connection + def get_consumers(self, Consumer, channel): - return Consumer(queues=task_queues, - callbacks=[self.process_task]) + return [Consumer(queues=task_queues, + callbacks=[self.process_task])] - def process_task(body, message): + def process_task(self, body, message): fun = body["fun"] args = body["args"] kwargs = body["kwargs"] - fun(*args, **kwdict(kwargs)) + self.info("Got task: %s", reprcall(fun.__name__, args, kwargs)) + try: + fun(*args, **kwdict(kwargs)) + except Exception, exc: + self.error("task raised exception: %r", exc) message.ack() if __name__ == "__main__": @@ -26,5 +33,3 @@ if __name__ == "__main__": with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn: Worker(conn).run() - - |
