summaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2011-09-12 10:25:42 +0100
committerAsk Solem <ask@celeryproject.org>2011-09-12 10:25:42 +0100
commit09146d0b933b96aa5d5675a54cba6402f0970210 (patch)
treeb0bc24a4401a8051f7a094f12c60339ba6ae68ed /examples
parente845c0140448c4afbeeb7484f891defda3d6211f (diff)
downloadkombu-09146d0b933b96aa5d5675a54cba6402f0970210.tar.gz
Adds examples to userguide
Diffstat (limited to 'examples')
-rw-r--r--examples/simple_task_queue/__init__.py0
-rw-r--r--examples/simple_task_queue/client.py28
-rw-r--r--examples/simple_task_queue/queues.py7
-rw-r--r--examples/simple_task_queue/tasks.py4
-rw-r--r--examples/simple_task_queue/worker.py30
5 files changed, 69 insertions, 0 deletions
diff --git a/examples/simple_task_queue/__init__.py b/examples/simple_task_queue/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/examples/simple_task_queue/__init__.py
diff --git a/examples/simple_task_queue/client.py b/examples/simple_task_queue/client.py
new file mode 100644
index 00000000..489aa2e6
--- /dev/null
+++ b/examples/simple_task_queue/client.py
@@ -0,0 +1,28 @@
+from __future__ import with_statement
+
+from kombu.common import maybe_declare
+from kombu.pools import producers
+
+from .queues import task_exchange
+
+priority_to_routing_key = {"high": "hipri",
+ "mid": "midpri",
+ "low": "lopri"}
+
+def send_as_task(connection, fun, args, kwargs, priority="mid"):
+ payload = {"fun": fun, "args": args, "kwargs": kwargs}
+ routing_key = priority_to_routing_key[priority]
+
+ with producers[connection].acquire(block=True) as producer:
+ maybe_declare(task_exchange, producer.channel)
+ producer.publish(payload, serializer="pickle",
+ compression="zlib",
+ routing_key=routing_key)
+
+if __name__ == "__main__":
+ from kombu import BrokerConnection
+ from .tasks import hello_task
+
+ conection = BrokerConnection("amqp://guest:guest@localhost:5672//")
+ send_as_task(connection, fun=hello_task, args=("Kombu", ),
+ priority="high")
diff --git a/examples/simple_task_queue/queues.py b/examples/simple_task_queue/queues.py
new file mode 100644
index 00000000..5f1feef2
--- /dev/null
+++ b/examples/simple_task_queue/queues.py
@@ -0,0 +1,7 @@
+from kombu import Exchange, Queue
+
+task_exchange = Exchange("tasks", type="direct")
+task_queues = [Queue("hipri", task_exchange, routing_key="hipri"),
+ Queue("midpri", task_exchange, routing_key="midpri"),
+ Queue("lopri", task_exchange, routing_key="lopri")]
+
diff --git a/examples/simple_task_queue/tasks.py b/examples/simple_task_queue/tasks.py
new file mode 100644
index 00000000..47146e45
--- /dev/null
+++ b/examples/simple_task_queue/tasks.py
@@ -0,0 +1,4 @@
+def hello_task(who="world"):
+ print("Hello %s" % (who, ))
+
+
diff --git a/examples/simple_task_queue/worker.py b/examples/simple_task_queue/worker.py
new file mode 100644
index 00000000..68e7d6b6
--- /dev/null
+++ b/examples/simple_task_queue/worker.py
@@ -0,0 +1,30 @@
+from __future__ import with_statement
+
+from kombu import Exchange, Queue
+from kombu.mixins import ConsumerMixin
+from kombu.utils import kwdict
+
+from queues import task_queues
+
+class Worker(ConsumerMixin):
+
+ def get_consumers(self, Consumer, channel):
+ return Consumer(queues=task_queues,
+ callbacks=[self.process_task])
+
+ def process_task(body, message):
+ fun = body["fun"]
+ args = body["args"]
+ kwargs = body["kwargs"]
+ fun(*args, **kwdict(kwargs))
+ message.ack()
+
+if __name__ == "__main__":
+ from kombu import BrokerConnection
+ from kombu.log import setup_logging
+ setup_logging(loglevel="INFO")
+
+ with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn:
+ Worker(conn).run()
+
+