diff options
author | Ask Solem <ask@celeryproject.org> | 2011-09-12 10:25:42 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2011-09-12 10:25:42 +0100 |
commit | 09146d0b933b96aa5d5675a54cba6402f0970210 (patch) | |
tree | b0bc24a4401a8051f7a094f12c60339ba6ae68ed /examples | |
parent | e845c0140448c4afbeeb7484f891defda3d6211f (diff) | |
download | kombu-09146d0b933b96aa5d5675a54cba6402f0970210.tar.gz |
Adds examples to userguide
Diffstat (limited to 'examples')
-rw-r--r-- | examples/simple_task_queue/__init__.py | 0 | ||||
-rw-r--r-- | examples/simple_task_queue/client.py | 28 | ||||
-rw-r--r-- | examples/simple_task_queue/queues.py | 7 | ||||
-rw-r--r-- | examples/simple_task_queue/tasks.py | 4 | ||||
-rw-r--r-- | examples/simple_task_queue/worker.py | 30 |
5 files changed, 69 insertions, 0 deletions
diff --git a/examples/simple_task_queue/__init__.py b/examples/simple_task_queue/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/examples/simple_task_queue/__init__.py diff --git a/examples/simple_task_queue/client.py b/examples/simple_task_queue/client.py new file mode 100644 index 00000000..489aa2e6 --- /dev/null +++ b/examples/simple_task_queue/client.py @@ -0,0 +1,28 @@ +from __future__ import with_statement + +from kombu.common import maybe_declare +from kombu.pools import producers + +from .queues import task_exchange + +priority_to_routing_key = {"high": "hipri", + "mid": "midpri", + "low": "lopri"} + +def send_as_task(connection, fun, args, kwargs, priority="mid"): + payload = {"fun": fun, "args": args, "kwargs": kwargs} + routing_key = priority_to_routing_key[priority] + + with producers[connection].acquire(block=True) as producer: + maybe_declare(task_exchange, producer.channel) + producer.publish(payload, serializer="pickle", + compression="zlib", + routing_key=routing_key) + +if __name__ == "__main__": + from kombu import BrokerConnection + from .tasks import hello_task + + conection = BrokerConnection("amqp://guest:guest@localhost:5672//") + send_as_task(connection, fun=hello_task, args=("Kombu", ), + priority="high") diff --git a/examples/simple_task_queue/queues.py b/examples/simple_task_queue/queues.py new file mode 100644 index 00000000..5f1feef2 --- /dev/null +++ b/examples/simple_task_queue/queues.py @@ -0,0 +1,7 @@ +from kombu import Exchange, Queue + +task_exchange = Exchange("tasks", type="direct") +task_queues = [Queue("hipri", task_exchange, routing_key="hipri"), + Queue("midpri", task_exchange, routing_key="midpri"), + Queue("lopri", task_exchange, routing_key="lopri")] + diff --git a/examples/simple_task_queue/tasks.py b/examples/simple_task_queue/tasks.py new file mode 100644 index 00000000..47146e45 --- /dev/null +++ b/examples/simple_task_queue/tasks.py @@ -0,0 +1,4 @@ +def hello_task(who="world"): + print("Hello %s" % (who, )) + + diff --git a/examples/simple_task_queue/worker.py b/examples/simple_task_queue/worker.py new file mode 100644 index 00000000..68e7d6b6 --- /dev/null +++ b/examples/simple_task_queue/worker.py @@ -0,0 +1,30 @@ +from __future__ import with_statement + +from kombu import Exchange, Queue +from kombu.mixins import ConsumerMixin +from kombu.utils import kwdict + +from queues import task_queues + +class Worker(ConsumerMixin): + + def get_consumers(self, Consumer, channel): + return Consumer(queues=task_queues, + callbacks=[self.process_task]) + + def process_task(body, message): + fun = body["fun"] + args = body["args"] + kwargs = body["kwargs"] + fun(*args, **kwdict(kwargs)) + message.ack() + +if __name__ == "__main__": + from kombu import BrokerConnection + from kombu.log import setup_logging + setup_logging(loglevel="INFO") + + with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn: + Worker(conn).run() + + |