summaryrefslogtreecommitdiff
path: root/examples/simple_task_queue/worker.py
blob: 8b521a43c193dc70a222d6e2ed292b9b2c3fc79e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from __future__ import with_statement

from kombu.mixins import ConsumerMixin
from kombu.utils import kwdict, reprcall

from queues import task_queues


class Worker(ConsumerMixin):

    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=task_queues,
                         callbacks=[self.process_task])]

    def process_task(self, body, message):
        fun = body["fun"]
        args = body["args"]
        kwargs = body["kwargs"]
        self.info("Got task: %s", reprcall(fun.__name__, args, kwargs))
        try:
            fun(*args, **kwdict(kwargs))
        except Exception, exc:
            self.error("task raised exception: %r", exc)
        message.ack()

if __name__ == "__main__":
    from kombu import BrokerConnection
    from kombu.log import setup_logging
    setup_logging(loglevel="INFO")

    with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn:
        try:
            Worker(conn).run()
        except KeyboardInterrupt:
            print("bye bye")