summaryrefslogtreecommitdiff
path: root/examples/simple_task_queue
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2011-09-12 10:47:22 +0100
committerAsk Solem <ask@celeryproject.org>2011-09-12 10:47:22 +0100
commitac1af44fbaff0c743f3259e9fb3c8ca2047a5109 (patch)
tree84eb7c2ec51656f6108152f2ca5d79d94112836b /examples/simple_task_queue
parent3e82865f7f131c9e89a7cd762f251586ad594a7b (diff)
downloadkombu-ac1af44fbaff0c743f3259e9fb3c8ca2047a5109.tar.gz
Task queue example working
Diffstat (limited to 'examples/simple_task_queue')
-rw-r--r--examples/simple_task_queue/client.py11
-rw-r--r--examples/simple_task_queue/queues.py1
-rw-r--r--examples/simple_task_queue/tasks.py2
-rw-r--r--examples/simple_task_queue/worker.py21
4 files changed, 19 insertions, 16 deletions
diff --git a/examples/simple_task_queue/client.py b/examples/simple_task_queue/client.py
index 489aa2e6..eae36144 100644
--- a/examples/simple_task_queue/client.py
+++ b/examples/simple_task_queue/client.py
@@ -3,26 +3,27 @@ from __future__ import with_statement
from kombu.common import maybe_declare
from kombu.pools import producers
-from .queues import task_exchange
+from queues import task_exchange
priority_to_routing_key = {"high": "hipri",
"mid": "midpri",
"low": "lopri"}
-def send_as_task(connection, fun, args, kwargs, priority="mid"):
+
+def send_as_task(connection, fun, args=(), kwargs={}, priority="mid"):
payload = {"fun": fun, "args": args, "kwargs": kwargs}
routing_key = priority_to_routing_key[priority]
with producers[connection].acquire(block=True) as producer:
maybe_declare(task_exchange, producer.channel)
producer.publish(payload, serializer="pickle",
- compression="zlib",
+ compression="bzip2",
routing_key=routing_key)
if __name__ == "__main__":
from kombu import BrokerConnection
- from .tasks import hello_task
+ from tasks import hello_task
- conection = BrokerConnection("amqp://guest:guest@localhost:5672//")
+ connection = BrokerConnection("amqp://guest:guest@localhost:5672//")
send_as_task(connection, fun=hello_task, args=("Kombu", ),
priority="high")
diff --git a/examples/simple_task_queue/queues.py b/examples/simple_task_queue/queues.py
index 5f1feef2..680e7575 100644
--- a/examples/simple_task_queue/queues.py
+++ b/examples/simple_task_queue/queues.py
@@ -4,4 +4,3 @@ task_exchange = Exchange("tasks", type="direct")
task_queues = [Queue("hipri", task_exchange, routing_key="hipri"),
Queue("midpri", task_exchange, routing_key="midpri"),
Queue("lopri", task_exchange, routing_key="lopri")]
-
diff --git a/examples/simple_task_queue/tasks.py b/examples/simple_task_queue/tasks.py
index 47146e45..f6e9da03 100644
--- a/examples/simple_task_queue/tasks.py
+++ b/examples/simple_task_queue/tasks.py
@@ -1,4 +1,2 @@
def hello_task(who="world"):
print("Hello %s" % (who, ))
-
-
diff --git a/examples/simple_task_queue/worker.py b/examples/simple_task_queue/worker.py
index 68e7d6b6..0fa4c261 100644
--- a/examples/simple_task_queue/worker.py
+++ b/examples/simple_task_queue/worker.py
@@ -1,22 +1,29 @@
from __future__ import with_statement
-from kombu import Exchange, Queue
from kombu.mixins import ConsumerMixin
-from kombu.utils import kwdict
+from kombu.utils import kwdict, reprcall
from queues import task_queues
+
class Worker(ConsumerMixin):
+ def __init__(self, connection):
+ self.connection = connection
+
def get_consumers(self, Consumer, channel):
- return Consumer(queues=task_queues,
- callbacks=[self.process_task])
+ return [Consumer(queues=task_queues,
+ callbacks=[self.process_task])]
- def process_task(body, message):
+ def process_task(self, body, message):
fun = body["fun"]
args = body["args"]
kwargs = body["kwargs"]
- fun(*args, **kwdict(kwargs))
+ self.info("Got task: %s", reprcall(fun.__name__, args, kwargs))
+ try:
+ fun(*args, **kwdict(kwargs))
+ except Exception, exc:
+ self.error("task raised exception: %r", exc)
message.ack()
if __name__ == "__main__":
@@ -26,5 +33,3 @@ if __name__ == "__main__":
with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn:
Worker(conn).run()
-
-