summaryrefslogtreecommitdiff
path: root/docs/userguide/simple.rst
blob: 650b8b280620db484d2b03462f97772f0f8c17df (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
.. _guide-simple:

==================
 Simple Interface
==================

.. contents::
    :local:


:mod:`kombu.simple` is a simple interface to AMQP queueing.
It is only slightly different from the :class:`~Queue.Queue` class in the
Python Standard Library, which makes it excellent for users with basic
messaging needs.

Instead of defining exchanges and queues, the simple classes only requires
two arguments, a connection channel and a name. The name is used as the
queue, exchange and routing key. If the need arises, you can specify
a :class:`~kombu.entity.Queue` as the name argument instead.

In addition, the :class:`~kombu.connection.BrokerConnection` comes with
shortcuts to create simple queues using the current connection::

    >>> queue = connection.SimpleQueue("myqueue")
    >>> # ... do something with queue
    >>> queue.close()


This is equivalent to::

    >>> from kombu import SimpleQueue, SimpleBuffer

    >>> channel = connection.channel()
    >>> queue = SimpleBuffer(channel)
    >>> # ... do something with queue
    >>> channel.close()
    >>> queue.close()

.. _simple-send-receive:

Sending and receiving messages
==============================

The simple interface defines two classes; :class:`~kombu.simple.SimpleQueue`,
and :class:`~kombu.simple.SimpleBuffer`. The former is used for persistent
messages, and the latter is used for transient, buffer-like queues.
They both have the same interface, so you can use them interchangeably.

Here is an example using the :class:`~kombu.simple.SimpleQueue` class
to produce and consume logging messages:

.. code-block:: python

    from __future__ import with_statement

    from socket import gethostname
    from time import time

    from kombu import BrokerConnection


    class Logger(object):

        def __init__(self, connection, queue_name="log_queue",
                serializer="json", compression=None):
            self.queue = connection.SimpleQueue(self.queue_name)
            self.serializer = serializer
            self.compression = compression

        def log(self, message, level="INFO", context={}):
            self.queue.put({"message": message,
                            "level": level,
                            "context": context,
                            "hostname": socket.gethostname(),
                            "timestamp": time()},
                            serializer=self.serializer,
                            compression=self.compression)

        def process(self, callback, n=1, timeout=1):
            for i in xrange(n):
                log_message = self.queue.get(block=True, timeout=1)
                entry = log_message.payload # deserialized data.
                callback(entry)
                log_message.ack() # remove message from queue

        def close(self):
            self.queue.close()


    if __name__ == "__main__":
        from contextlib import closing

        with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn:
            with closing(Logger(connection)) as logger:

                # Send message
                logger.log("Error happened while encoding video",
                            level="ERROR",
                            context={"filename": "cutekitten.mpg"})

                # Consume and process message

                # This is the callback called when a log message is
                # received.
                def dump_entry(entry):
                    date = datetime.fromtimestamp(entry["timestamp"])
                    print("[%s %s %s] %s %r" % (date,
                                                entry["hostname"],
                                                entry["level"],
                                                entry["message"],
                                                entry["context"]))

                # Process a single message using the callback above.
                logger.process(dump_entry, n=1)