diff options
46 files changed, 1539 insertions, 701 deletions
@@ -64,11 +64,11 @@ Exchanges/Queue can be bound to a channel:: Introduction ------------ -``kombu`` is an `AMQP`_ messaging queue framework. AMQP is the Advanced Message +`kombu` is an `AMQP`_ messaging queue framework. AMQP is the Advanced Message Queuing Protocol, an open standard protocol for message orientation, queuing, routing, reliability and security. -The aim of ``kombu`` is to make messaging in Python as easy as possible by +The aim of `kombu` is to make messaging in Python as easy as possible by providing a high-level interface for producing and consuming messages. At the same time it is a goal to re-use what is already available as much as possible. @@ -81,7 +81,7 @@ Several AMQP message broker implementations exists, including `RabbitMQ`_, `Apache ActiveMQ`_. You'll need to have one of these installed, personally we've been using `RabbitMQ`_. -Before you start playing with ``kombu``, you should probably read up on +Before you start playing with `kombu`, you should probably read up on AMQP, and you could start with the excellent article about using RabbitMQ under Python, `Rabbits and warrens`_. For more detailed information, you can refer to the `Wikipedia article about AMQP`_. @@ -108,15 +108,15 @@ Kombu is using Sphinx, and the latest documentation is available at GitHub: Installation ============ -You can install ``kombu`` either via the Python Package Index (PyPI) +You can install `kombu` either via the Python Package Index (PyPI) or from source. -To install using ``pip``,:: +To install using `pip`,:: $ pip install kombu -To install using ``easy_install``,:: +To install using `easy_install`,:: $ easy_install kombu @@ -167,7 +167,7 @@ There are some concepts you should be familiar with before starting: * Direct exchange Matches if the routing key property of the message and - the ``routing_key`` attribute of the consumer are identical. + the `routing_key` attribute of the consumer are identical. * Fan-out exchange @@ -178,12 +178,12 @@ There are some concepts you should be familiar with before starting: Matches the routing key property of the message by a primitive pattern matching scheme. The message routing key then consists - of words separated by dots (``"."``, like domain names), and - two special characters are available; star (``"*"``) and hash - (``"#"``). The star matches any word, and the hash matches - zero or more words. For example ``"*.stock.#"`` matches the - routing keys ``"usd.stock"`` and ``"eur.stock.db"`` but not - ``"stock.nasdaq"``. + of words separated by dots (`"."`, like domain names), and + two special characters are available; star (`"*"`) and hash + (`"#"`). The star matches any word, and the hash matches + zero or more words. For example `"*.stock.#"` matches the + routing keys `"usd.stock"` and `"eur.stock.db"` but not + `"stock.nasdaq"`. Examples @@ -193,7 +193,7 @@ Creating a connection --------------------- You can set up a connection by creating an instance of - ``kombu.BrokerConnection``, with the appropriate options for + `kombu.BrokerConnection`, with the appropriate options for your broker: >>> from kombu import BrokerConnection @@ -207,8 +207,8 @@ Receiving messages using a Consumer First we open up a Python shell and start a message consumer. -This consumer declares a queue named ``"feed"``, receiving messages with -the routing key ``"importer"`` from the ``"feed"`` exchange. +This consumer declares a queue named `"feed"`, receiving messages with +the routing key `"importer"` from the `"feed"` exchange. >>> from kombu import Exchange, Queue, Consumer @@ -258,7 +258,7 @@ Serialization of Data By default every message is encoded using `JSON`_, so sending Python data structures like dictionaries and lists works. -`YAML`_, `msgpack`_ and Python's built-in ``pickle`` module is also supported, +`YAML`_, `msgpack`_ and Python's built-in `pickle` module is also supported, and if needed you can register any custom serialization scheme you want to use. @@ -268,12 +268,12 @@ want to use. Each option has its advantages and disadvantages. -``json`` -- JSON is supported in many programming languages, is now +`json` -- JSON is supported in many programming languages, is now a standard part of Python (since 2.6), and is fairly fast to - decode using the modern Python libraries such as ``cjson or - ``simplejson``. + decode using the modern Python libraries such as `cjson` or + `simplejson`. - The primary disadvantage to ``JSON`` is that it limits you to + The primary disadvantage to `JSON` is that it limits you to the following data types: strings, unicode, floats, boolean, dictionaries, and lists. Decimals and dates are notably missing. @@ -282,16 +282,16 @@ Each option has its advantages and disadvantages. encoding which supports native binary types. However, if your data fits inside the above constraints and - you need cross-language support, the default setting of ``JSON`` + you need cross-language support, the default setting of `JSON` is probably your best choice. -``pickle`` -- If you have no desire to support any language other than - Python, then using the ``pickle`` encoding will gain you +`pickle` -- If you have no desire to support any language other than + Python, then using the `pickle` encoding will gain you the support of all built-in Python data types (except class instances), smaller messages when sending binary files, and a slight speedup - over ``JSON`` processing. + over `JSON` processing. -``yaml`` -- YAML has many of the same characteristics as ``json``, +`yaml` -- YAML has many of the same characteristics as `json`, except that it natively supports more data types (including dates, recursive references, etc.) @@ -299,7 +299,7 @@ Each option has its advantages and disadvantages. than the libraries for JSON. If you need a more expressive set of data types and need to maintain - cross-language compatibility, then ``YAML`` may be a better fit + cross-language compatibility, then `YAML` may be a better fit than the above. To instruct carrot to use an alternate serialization method, @@ -316,7 +316,7 @@ use one of the following options. >>> producer.publish(message, routing_key=rkey, ... serializer="pickle") -Note that a ``Consumer`` do not need the serialization method specified. +Note that a `Consumer` do not need the serialization method specified. They can auto-detect the serialization method as the content-type is sent as a message header. @@ -327,7 +327,7 @@ In some cases, you don't need your message data to be serialized. If you pass in a plain string or unicode object as your message, then carrot will not waste cycles serializing/deserializing the data. -You can optionally specify a ``content_type`` and ``content_encoding`` +You can optionally specify a `content_type` and `content_encoding` for the raw data: >>> producer.send(open('~/my_picture.jpg','rb').read(), @@ -335,15 +335,15 @@ for the raw data: content_encoding="binary", routing_key=rkey) -The ``Message`` object returned by the ``Consumer`` class will have a -``content_type`` and ``content_encoding`` attribute. +The `Message` object returned by the `Consumer` class will have a +`content_type` and `content_encoding` attribute. Receiving messages without a callback -------------------------------------- -You can also poll the queue manually, by using the ``get`` method. -This method returns a ``Message`` object, from where you can get the +You can also poll the queue manually, by using the `get` method. +This method returns a `Message` object, from where you can get the message body, de-serialize the body to get the data, acknowledge, reject or re-queue the message. @@ -359,7 +359,7 @@ re-queue the message. Sub-classing the messaging classes ---------------------------------- -The ``Consumer``, and ``Producer`` classes can also be sub classed. Thus you +The `Consumer`, and `Producer` classes can also be sub classed. Thus you can define the above producer and consumer like so: >>> class FeedProducer(Producer): @@ -409,7 +409,7 @@ to our issue tracker at http://github.com/ask/kombu/issues/ Contributing ============ -Development of ``kombu`` happens at Github: http://github.com/ask/kombu +Development of `kombu` happens at Github: http://github.com/ask/kombu You are highly encouraged to participate in the development. If you don't like Github (for some reason) you're welcome to send regular patches. @@ -417,5 +417,5 @@ like Github (for some reason) you're welcome to send regular patches. License ======= -This software is licensed under the ``New BSD License``. See the ``LICENSE`` +This software is licensed under the `New BSD License`. See the :file:`LICENSE` file in the top distribution directory for the full license text. diff --git a/docs/_ext/literals_to_xrefs.py b/docs/_ext/literals_to_xrefs.py index 9a220028..e9dc7ca0 100644 --- a/docs/_ext/literals_to_xrefs.py +++ b/docs/_ext/literals_to_xrefs.py @@ -68,8 +68,8 @@ def fixliterals(fname): new.append(m.group(0)) continue - sys.stdout.write("\n"+"-"*80+"\n") - sys.stdout.write(data[prev_start+1:m.start()]) + sys.stdout.write("\n" + "-" * 80 + "\n") + sys.stdout.write(data[prev_start + 1:m.start()]) sys.stdout.write(colorize(m.group(0), fg="red")) sys.stdout.write(data[m.end():next_end]) sys.stdout.write("\n\n") diff --git a/docs/reference/kombu.abstract.rst b/docs/reference/kombu.abstract.rst index bf9334d7..0669a5a8 100644 --- a/docs/reference/kombu.abstract.rst +++ b/docs/reference/kombu.abstract.rst @@ -1,11 +1,10 @@ -========================================================== - Object Utilities - kombu.abstract -========================================================== - -.. contents:: - :local: .. currentmodule:: kombu.abstract .. automodule:: kombu.abstract - :members: - :undoc-members: + + .. contents:: + :local: + + .. autoclass:: MaybeChannelBound + :members: + :undoc-members: diff --git a/docs/reference/kombu.compat.rst b/docs/reference/kombu.compat.rst index 860a0ff6..ffb4793f 100644 --- a/docs/reference/kombu.compat.rst +++ b/docs/reference/kombu.compat.rst @@ -1,11 +1,36 @@ -========================================================== - Carrot Compatible API - kombu.compat -========================================================== - -.. contents:: - :local: .. currentmodule:: kombu.compat .. automodule:: kombu.compat - :members: - :undoc-members: + + .. contents:: + :local: + + Publisher + --------- + + Replace with :class:`kombu.messaging.Producer`. + + .. autoclass:: Publisher + :members: + :undoc-members: + :inherited-members: + + Consumer + -------- + + Replace with :class:`kombu.messaging.Consumer`. + + .. autoclass:: Consumer + :members: + :undoc-members: + :inherited-members: + + ConsumerSet + ----------- + + Replace with :class:`kombu.messaging.Consumer`. + + .. autoclass:: ConsumerSet + :members: + :undoc-members: + :inherited-members: diff --git a/docs/reference/kombu.compression.rst b/docs/reference/kombu.compression.rst index 446889c2..c7748577 100644 --- a/docs/reference/kombu.compression.rst +++ b/docs/reference/kombu.compression.rst @@ -1,11 +1,20 @@ -========================================================== - Compression - kombu.compression -========================================================== - -.. contents:: - :local: .. currentmodule:: kombu.compression .. automodule:: kombu.compression - :members: - :undoc-members: + + .. contents:: + :local: + + Encoding/decoding + ----------------- + + .. autofunction:: compress + .. autofunction:: decompress + + Registry + -------- + + .. autofunction:: encoders + .. autofunction:: get_encoder + .. autofunction:: get_decoder + .. autofunction:: register diff --git a/docs/reference/kombu.connection.rst b/docs/reference/kombu.connection.rst index 9313c172..511295bc 100644 --- a/docs/reference/kombu.connection.rst +++ b/docs/reference/kombu.connection.rst @@ -1,11 +1,63 @@ -========================================================== - Broker Connections - kombu.connection -========================================================== -.. contents:: - :local: + .. currentmodule:: kombu.connection .. automodule:: kombu.connection - :members: - :undoc-members: + + .. contents:: + :local: + + Connection + ---------- + + .. autoclass:: BrokerConnection + + **ATTRIBUTES** + + .. autoattribute:: connection_errors + .. autoattribute:: channel_errors + .. autoattribute:: transport + .. autoattribute:: host + .. autoattribute:: connection + + **METHODS** + + .. automethod:: connect + .. automethod:: channel + .. automethod:: drain_events + .. automethod:: release + .. automethod:: ensure_connection + .. automethod:: ensure + .. automethod:: create_transport + .. automethod:: get_transport_cls + .. automethod:: clone + .. automethod:: info + + .. automethod:: Pool + .. automethod:: ChannelPool + .. automethod:: SimpleQueue + .. automethod:: SimpleBuffer + + + Pools + ----- + + .. seealso:: + + The shortcut methods :meth:`BrokerConnection.Pool` and + :meth:`BrokerConnection.ChannelPool` is the recommended way + to instantiate these classes. + + .. autoclass:: ConnectionPool + + .. autoattribute:: LimitExceeded + + .. automethod:: acquire + .. automethod:: release + + .. autoclass:: ChannelPool + + .. autoattribute:: LimitExceeded + + .. automethod:: acquire + .. automethod:: release diff --git a/docs/reference/kombu.entity.rst b/docs/reference/kombu.entity.rst index 2bb6f72a..79da6ea4 100644 --- a/docs/reference/kombu.entity.rst +++ b/docs/reference/kombu.entity.rst @@ -1,11 +1,67 @@ -========================================================== - AMQP Entities - kombu.entity -========================================================== - -.. contents:: - :local: .. currentmodule:: kombu.entity .. automodule:: kombu.entity - :members: - :undoc-members: + + .. contents:: + :local: + + Exchange + -------- + + Example creating an exchange declaration:: + + >>> news_exchange = Exchange("news", type="topic") + + For now `news_exchange` is just a declaration, you can't perform + actions on it. It just describes the name and options for the exchange. + + The exchange can be bound or unbound. Bound means the exchange is + associated with a channel and operations can be performed on it. + To bind the exchange you call the exchange with the channel as argument:: + + >>> bound_exchange = news_exchange(channel) + + Now you can perform operations like :meth:`declare` or :meth:`delete`:: + + >>> bound_exchange.declare() + >>> message = bound_exchange.Message("Cure for cancer found!") + >>> bound_exchange.publish(message, routing_key="news.science") + >>> bound_exchange.delete() + + .. autoclass:: Exchange + :members: + :undoc-members: + + .. automethod:: maybe_bind + + Queue + ----- + + Example creating a queue using our exchange in the :class:`Exchange` + example:: + + >>> science_news = Queue("science_news", + ... exchange=news_exchange, + ... routing_key="news.science") + + For now `science_news` is just a declaration, you can't perform + actions on it. It just describes the name and options for the queue. + + The queue can be bound or unbound. Bound means the queue is + associated with a channel and operations can be performed on it. + To bind the queue you call the queue instance with the channel as + an argument:: + + >>> bound_science_news = science_news(channel) + + Now you can perform operations like :meth:`declare` or :meth:`purge`:: + + >>> bound_sicence_news.declare() + >>> bound_science_news.purge() + >>> bound_science_news.delete() + + .. autoclass:: Queue + :members: + :undoc-members: + + .. automethod:: maybe_bind diff --git a/docs/reference/kombu.exceptions.rst b/docs/reference/kombu.exceptions.rst index 9bf2cec5..d0b4bd50 100644 --- a/docs/reference/kombu.exceptions.rst +++ b/docs/reference/kombu.exceptions.rst @@ -1,11 +1,15 @@ -========================================================== - Exceptions - kombu.exceptions -========================================================== - -.. contents:: - :local: .. currentmodule:: kombu.exceptions .. automodule:: kombu.exceptions - :members: - :undoc-members: + + .. contents:: + :local: + + .. autoexception:: NotBoundError + .. autoexception:: MessageStateError + .. autoexception:: EnsureExhausted + .. autoexception:: TimeoutError + .. autoexception:: LimitExceeded + .. autoexception:: ConnectionLimitExceeded + .. autoexception:: ChannelLimitExceeded + diff --git a/docs/reference/kombu.messaging.rst b/docs/reference/kombu.messaging.rst index cf43440b..9b68d128 100644 --- a/docs/reference/kombu.messaging.rst +++ b/docs/reference/kombu.messaging.rst @@ -1,11 +1,47 @@ -========================================================== - Messaging - kombu.messaging -========================================================== - -.. contents:: - :local: .. currentmodule:: kombu.messaging .. automodule:: kombu.messaging - :members: - :undoc-members: + + .. contents:: + :local: + + Message Producer + ---------------- + + .. autoclass:: Producer + + .. autoattribute:: channel + .. autoattribute:: exchange + .. autoattribute:: routing_key + .. autoattribute:: serializer + .. autoattribute:: compression + .. autoattribute:: auto_declare + .. autoattribute:: on_return + + .. automethod:: declare + .. automethod:: publish + .. automethod:: revive + + Message Consumer + ---------------- + + .. autoclass:: Consumer + + .. autoattribute:: channel + .. autoattribute:: queues + .. autoattribute:: no_ack + .. autoattribute:: auto_declare + .. autoattribute:: callbacks + .. autoattribute:: on_decode_error + + .. automethod:: declare + .. automethod:: register_callback + .. automethod:: consume + .. automethod:: cancel + .. automethod:: cancel_by_queue + .. automethod:: purge + .. automethod:: flow + .. automethod:: qos + .. automethod:: recover + .. automethod:: receive + .. automethod:: revive diff --git a/docs/reference/kombu.pidbox.rst b/docs/reference/kombu.pidbox.rst index 1a8dba5f..f47bfd4c 100644 --- a/docs/reference/kombu.pidbox.rst +++ b/docs/reference/kombu.pidbox.rst @@ -1,11 +1,89 @@ -================================ - Process Mailbox - kombu.pidbox -================================ - -.. contents:: - :local: .. currentmodule:: kombu.pidbox .. automodule:: kombu.pidbox - :members: - :undoc-members: + + .. contents:: + :local: + + Introduction + ------------ + + Creating the applications Mailbox + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + .. code-block:: python + + >>> mailbox = pidbox.Mailbox("celerybeat", type="direct") + + >>> @mailbox.handler + >>> def reload_schedule(state, **kwargs): + ... state["beat"].reload_schedule() + + >>> @mailbox.handler + >>> def connection_info(state, **kwargs): + ... return {"connection": state["connection"].info()} + + Example Node + ~~~~~~~~~~~~ + + .. code-block:: python + + >>> connection = kombu.BrokerConnection() + >>> state = {"beat": beat, + "connection": connection} + >>> consumer = mailbox(connection).Node(hostname).listen() + >>> try: + ... while True: + ... connection.drain_events(timeout=1) + ... finally: + ... consumer.cancel() + + Example Client + ~~~~~~~~~~~~~~ + + .. code-block:: python + + >>> mailbox.cast("reload_schedule") # cast is async. + >>> info = celerybeat.call("connection_info", timeout=1) + + Mailbox + ------- + + .. autoclass:: Mailbox + + .. autoattribute:: namespace + .. autoattribute:: connection + .. autoattribute:: type + .. autoattribute:: exchange + .. autoattribute:: reply_exchange + + .. automethod:: Node + .. automethod:: call + .. automethod:: cast + .. automethod:: abcast + .. automethod:: multi_call + .. automethod:: get_reply_queue + .. automethod:: get_queue + + Node + ---- + + .. autoclass:: Node + + .. autoattribute:: hostname + .. autoattribute:: mailbox + .. autoattribute:: handlers + .. autoattribute:: state + .. autoattribute:: channel + + .. automethod:: Consumer + .. automethod:: handler + .. automethod:: listen + .. automethod:: dispatch + .. automethod:: dispatch_from_message + .. automethod:: handle_call + .. automethod:: handle_cast + .. automethod:: handle + .. automethod:: handle_message + .. automethod:: reply + diff --git a/docs/reference/kombu.serialization.rst b/docs/reference/kombu.serialization.rst index 8e21998a..7ed56ff9 100644 --- a/docs/reference/kombu.serialization.rst +++ b/docs/reference/kombu.serialization.rst @@ -1,11 +1,47 @@ -========================================================== - Serialization - kombu.serialization -========================================================== - -.. contents:: - :local: .. currentmodule:: kombu.serialization .. automodule:: kombu.serialization - :members: - :undoc-members: + + .. contents:: + :local: + + Overview + -------- + + Centralized support for encoding/decoding of data structures. + Contains json, pickle, msgpack, and yaml serializers. + + Optionally installs support for YAML if the `PyYAML`_ package + is installed. + + Optionally installs support for `msgpack`_ if the `msgpack-python`_ + package is installed. + + + Exceptions + ---------- + + .. autoexception:: SerializerNotInstalled + + Serialization + ------------- + + .. autofunction:: encode + + .. autofunction:: decode + + .. autofunction:: raw_encode + + Registry + -------- + + .. autofunction:: register + + .. autodata:: registry + +.. _`cjson`: http://pypi.python.org/pypi/python-cjson/ +.. _`simplejson`: http://code.google.com/p/simplejson/ +.. _`Python 2.6+`: http://docs.python.org/library/json.html +.. _`PyYAML`: http://pyyaml.org/ +.. _`msgpack`: http://msgpack.sourceforge.net/ +.. _`msgpack-python`: http://pypi.python.org/pypi/msgpack-python/ diff --git a/docs/reference/kombu.simple.rst b/docs/reference/kombu.simple.rst index 95ecd2de..b981f6dd 100644 --- a/docs/reference/kombu.simple.rst +++ b/docs/reference/kombu.simple.rst @@ -1,11 +1,89 @@ -========================================================== - Simple Interface - kombu.simple -========================================================== - -.. contents:: - :local: .. currentmodule:: kombu.simple .. automodule:: kombu.simple - :members: - :undoc-members: + + .. contents:: + :local: + + Persistent + ---------- + + .. autoclass:: SimpleQueue + + .. attribute:: channel + + Current channel + + .. attribute:: producer + + :class:`~kombu.messaging.Producer` used to publish messages. + + .. attribute:: consumer + + :class:`~kombu.messaging.Consumer` used to receive messages. + + .. attribute:: no_ack + + flag to enable/disable acknowledgements. + + .. attribute:: queue + + :class:`~kombu.entity.Queue` to consume from (if consuming). + + .. attribute:: queue_opts + + Additional options for the queue declaration. + + .. attribute:: exchange_opts + + Additional options for the exchange declaration. + + .. automethod:: get + .. automethod:: get_nowait + .. automethod:: put + .. automethod:: clear + .. automethod:: __len__ + .. automethod:: qsize + .. automethod:: close + + Buffer + ------ + + .. autoclass:: SimpleBuffer + + .. attribute:: channel + + Current channel + + .. attribute:: producer + + :class:`~kombu.messaging.Producer` used to publish messages. + + .. attribute:: consumer + + :class:`~kombu.messaging.Consumer` used to receive messages. + + .. attribute:: no_ack + + flag to enable/disable acknowledgements. + + .. attribute:: queue + + :class:`~kombu.entity.Queue` to consume from (if consuming). + + .. attribute:: queue_opts + + Additional options for the queue declaration. + + .. attribute:: exchange_opts + + Additional options for the exchange declaration. + + .. automethod:: get + .. automethod:: get_nowait + .. automethod:: put + .. automethod:: clear + .. automethod:: __len__ + .. automethod:: qsize + .. automethod:: close + diff --git a/docs/reference/kombu.transport.base.rst b/docs/reference/kombu.transport.base.rst index fef20eaa..58490138 100644 --- a/docs/reference/kombu.transport.base.rst +++ b/docs/reference/kombu.transport.base.rst @@ -1,11 +1,45 @@ -========================================================== - Base Transport Interface - kombu.transport.base -========================================================== - -.. contents:: - :local: .. currentmodule:: kombu.transport.base .. automodule:: kombu.transport.base - :members: - :undoc-members: + + .. contents:: + :local: + + Message + ------- + + .. autoclass:: Message + + .. autoattribute:: payload + .. autoattribute:: channel + .. autoattribute:: delivery_tag + .. autoattribute:: content_type + .. autoattribute:: content_encoding + .. autoattribute:: delivery_info + .. autoattribute:: headers + .. autoattribute:: properties + .. autoattribute:: body + .. autoattribute:: acknowledged + + .. automethod:: ack + .. automethod:: reject + .. automethod:: requeue + .. automethod:: decode + + Transport + --------- + + .. autoclass:: Transport + + .. autoattribute:: client + .. autoattribute:: default_port + .. autoattribute:: connection_errors + .. autoattribute:: channel_errors + + .. automethod:: establish_connection + .. automethod:: close_connection + .. automethod:: create_channel + .. automethod:: close_channel + .. automethod:: drain_events + + diff --git a/docs/reference/kombu.transport.memory.rst b/docs/reference/kombu.transport.memory.rst index e8ba5b33..c712b135 100644 --- a/docs/reference/kombu.transport.memory.rst +++ b/docs/reference/kombu.transport.memory.rst @@ -1,11 +1,20 @@ -========================================================== - In-memory Transport - kombu.transport.memory -========================================================== - -.. contents:: - :local: .. currentmodule:: kombu.transport.memory .. automodule:: kombu.transport.memory - :members: - :undoc-members: + + .. contents:: + :local: + + Transport + --------- + + .. autoclass:: Transport + :members: + :undoc-members: + + Channel + ------- + + .. autoclass:: Channel + :members: + :undoc-members: diff --git a/docs/reference/kombu.transport.pyamqplib.rst b/docs/reference/kombu.transport.pyamqplib.rst index 9dbbe516..ab35d156 100644 --- a/docs/reference/kombu.transport.pyamqplib.rst +++ b/docs/reference/kombu.transport.pyamqplib.rst @@ -1,11 +1,36 @@ -========================================================== - amqplib Transport - kombu.transport.pyamqplib -========================================================== - -.. contents:: - :local: .. currentmodule:: kombu.transport.pyamqplib .. automodule:: kombu.transport.pyamqplib - :members: - :undoc-members: + + .. contents:: + :local: + + Transport + --------- + + .. autoclass:: Transport + :members: + :undoc-members: + + Connection + ---------- + + .. autoclass:: Connection + :members: + :undoc-members: + :inherited-members: + + Channel + ------- + + .. autoclass:: Channel + :members: + :undoc-members: + + Message + ------- + + .. autoclass:: Message + :members: + :undoc-members: + diff --git a/docs/reference/kombu.transport.pypika.rst b/docs/reference/kombu.transport.pypika.rst index a7a0c8c0..2c9af997 100644 --- a/docs/reference/kombu.transport.pypika.rst +++ b/docs/reference/kombu.transport.pypika.rst @@ -1,11 +1,42 @@ -========================================================== - pika Transport - kombu.transport.pypika -========================================================== - -.. contents:: - :local: .. currentmodule:: kombu.transport.pypika .. automodule:: kombu.transport.pypika - :members: - :undoc-members: + + .. contents:: + :local: + + Transports + ---------- + + .. autoclass:: AsyncoreTransport + :members: + :undoc-members: + + .. autoclass:: SyncTransport + :members: + :undoc-members: + + Connections + ----------- + + .. autoclass:: AsyncoreConnection + :members: + :undoc-members: + + .. autoclass:: BlockingConnection + :members: + :undoc-members: + + Channel + ------- + + .. autoclass:: Channel + :members: + :undoc-members: + + Message + ------- + + .. autoclass:: Message + :members: + :undoc-members: diff --git a/docs/reference/kombu.transport.pyredis.rst b/docs/reference/kombu.transport.pyredis.rst index 0e2b489b..8e7e564c 100644 --- a/docs/reference/kombu.transport.pyredis.rst +++ b/docs/reference/kombu.transport.pyredis.rst @@ -1,11 +1,20 @@ -========================================================== - Redis Transport - kombu.transport.pyredis -========================================================== - -.. contents:: - :local: .. currentmodule:: kombu.transport.pyredis .. automodule:: kombu.transport.pyredis - :members: - :undoc-members: + + .. contents:: + :local: + + Transport + --------- + + .. autoclass:: Transport + :members: + :undoc-members: + + Channel + ------- + + .. autoclass:: Channel + :members: + :undoc-members: diff --git a/docs/reference/kombu.transport.rst b/docs/reference/kombu.transport.rst index 6c9c209c..51dd7ec3 100644 --- a/docs/reference/kombu.transport.rst +++ b/docs/reference/kombu.transport.rst @@ -1,11 +1,23 @@ -========================================================== - Transports - kombu.transport -========================================================== - -.. contents:: - :local: .. currentmodule:: kombu.transport -.. automodule:: kombu.transport - :members: - :undoc-members: +.. automodule:: kombu.transpor + + .. contents:: + :local: + + Data + ---- + + .. data:: DEFAULT_TRANSPORT + + Default transport used when no transport specified. + + .. data:: TRANSPORT_ALIASES + + Mapping of transport aliases/class names. + + Functions + --------- + + .. autofunction:: get_transport_cls + .. autofunction:: resolve_transport diff --git a/docs/reference/kombu.transport.virtual.exchange.rst b/docs/reference/kombu.transport.virtual.exchange.rst index c775a463..220b0176 100644 --- a/docs/reference/kombu.transport.virtual.exchange.rst +++ b/docs/reference/kombu.transport.virtual.exchange.rst @@ -1,7 +1,35 @@ -.. contents:: - :local: .. currentmodule:: kombu.transport.virtual.exchange .. automodule:: kombu.transport.virtual.exchange - :members: - :undoc-members: + + .. contents:: + :local: + + Direct + ------ + + .. autoclass:: DirectExchange + :members: + :undoc-members: + + Topic + ----- + + .. autoclass:: TopicExchange + :members: + :undoc-members: + + Fanout + ------ + + .. autoclass:: FanoutExchange + :members: + :undoc-members: + + Interface + --------- + + .. autoclass:: ExchangeType + :members: + :undoc-members: + diff --git a/docs/reference/kombu.transport.virtual.rst b/docs/reference/kombu.transport.virtual.rst index d7f5381e..8e60977a 100644 --- a/docs/reference/kombu.transport.virtual.rst +++ b/docs/reference/kombu.transport.virtual.rst @@ -1,7 +1,117 @@ -.. contents:: - :local: .. currentmodule:: kombu.transport.virtual .. automodule:: kombu.transport.virtual - :members: - :undoc-members: + + .. contents:: + :local: + + Transports + ---------- + + .. autoclass:: Transport + + .. autoattribute:: Channel + + .. autoattribute:: Cycle + + .. autoattribute:: interval + + .. autoattribute:: default_port + + .. autoattribute:: state + + .. autoattribute:: cycle + + .. automethod:: establish_connection + + .. automethod:: close_connection + + .. automethod:: create_channel + + .. automethod:: close_channel + + .. automethod:: drain_events + + Channel + ------- + + .. autoclass:: AbstractChannel + :members: + + .. autoclass:: Channel + + .. autoattribute:: Message + + .. autoattribute:: state + + .. autoattribute:: qos + + .. autoattribute:: do_restore + + .. autoattribute:: exchange_types + + .. automethod:: exchange_declare + + .. automethod:: exchange_delete + + .. automethod:: queue_declare + + .. automethod:: queue_delete + + .. automethod:: queue_bind + + .. automethod:: queue_purge + + .. automethod:: basic_publish + + .. automethod:: basic_consume + + .. automethod:: basic_cancel + + .. automethod:: basic_get + + .. automethod:: basic_ack + + .. automethod:: basic_recover + + .. automethod:: basic_reject + + .. automethod:: basic_qos + + .. automethod:: get_table + + .. automethod:: typeof + + .. automethod:: drain_events + + .. automethod:: prepare_message + + .. automethod:: message_to_python + + .. automethod:: flow + + .. automethod:: close + + Message + ------- + + .. autoclass:: Message + :members: + :undoc-members: + :inherited-members: + + Quality Of Service + ------------------ + + .. autoclass:: QoS + :members: + :undoc-members: + :inherited-members: + + In-memory State + --------------- + + .. autoclass:: BrokerState + :members: + :undoc-members: + :inherited-members: diff --git a/docs/reference/kombu.transport.virtual.scheduling.rst b/docs/reference/kombu.transport.virtual.scheduling.rst index b59994ab..5eca4b91 100644 --- a/docs/reference/kombu.transport.virtual.scheduling.rst +++ b/docs/reference/kombu.transport.virtual.scheduling.rst @@ -1,7 +1,7 @@ .. contents:: :local: -.. currentmodule:: kombu.transport.scheduling +.. currentmodule:: kombu.transport.virtual.scheduling -.. automodule:: kombu.transport.scheduling +.. automodule:: kombu.transport.virtual.scheduling :members: :undoc-members: diff --git a/docs/userguide/connections.rst b/docs/userguide/connections.rst index 37e516c3..7c495cfa 100644 --- a/docs/userguide/connections.rst +++ b/docs/userguide/connections.rst @@ -19,8 +19,8 @@ method:: >>> connection.connect() This connection will use the default connection settings, which is using -the localhost host, default port, username ``guest``, -password ``guest`` and virtual host "/". A connection without arguments +the localhost host, default port, username `guest`, +password `guest` and virtual host "/". A connection without arguments is the same as:: >>> BrokerConnection(hostname="localhost", @@ -32,7 +32,7 @@ is the same as:: The default port is transport specific, for AMQP this is 6379. Other fields may also have different meaning depending on the transport -used. For example, the Redis transport uses the ``virtual_host`` argument as +used. For example, the Redis transport uses the `virtual_host` argument as the redis database number. See the :class:`~kombu.connection.BrokerConnection` reference documentation diff --git a/kombu/abstract.py b/kombu/abstract.py index 50c70011..5680c4f4 100644 --- a/kombu/abstract.py +++ b/kombu/abstract.py @@ -1,9 +1,21 @@ +""" +kombu.compression +================= + +Object utilities. + +:copyright: (c) 2009 - 2010 by Ask Solem. +:license: BSD, see LICENSE for more details. + +""" from copy import copy from kombu.exceptions import NotBoundError class Object(object): + """Common baseclass supporting automatic kwargs->attributes handling, + and cloning.""" attrs = () def __init__(self, *args, **kwargs): @@ -29,6 +41,7 @@ class MaybeChannelBound(Object): _is_bound = False def __call__(self, channel): + """`self(channel) -> self.bind(channel)`""" return self.bind(channel) def bind(self, channel): @@ -65,11 +78,12 @@ class MaybeChannelBound(Object): @property def is_bound(self): - """Returns ``True`` if the entity is bound.""" + """Flag set if the channel is bound.""" return self._is_bound and self._channel is not None @property def channel(self): + """Current channel if the object is bound.""" if self._channel is None: raise NotBoundError( "Can't call method on %s not bound to a channel" % ( diff --git a/kombu/compat.py b/kombu/compat.py index cef10af6..a8dfc6c7 100644 --- a/kombu/compat.py +++ b/kombu/compat.py @@ -1,10 +1,22 @@ +""" +kombu.compat +============ + +Carrot compatible interface for :class:`Publisher` and :class:`Producer`. + +See http://packages.python.org/pypi/carrot for documentation. + +:copyright: (c) 2009 - 2010 by Ask Solem. +:license: BSD, see LICENSE for more details. + +""" from itertools import count from kombu import entity from kombu import messaging -def iterconsume(connection, consumer, no_ack=False, limit=None): +def _iterconsume(connection, consumer, no_ack=False, limit=None): consumer.consume(no_ack=no_ack) for iteration in count(0): if limit and iteration >= limit: @@ -161,7 +173,7 @@ class Consumer(messaging.Consumer): return self.purge() def iterconsume(self, limit=None, no_ack=None): - return iterconsume(self.connection, self, no_ack, limit) + return _iterconsume(self.connection, self, no_ack, limit) def wait(self, limit=None): it = self.iterconsume(limit) @@ -176,15 +188,25 @@ class Consumer(messaging.Consumer): yield item -class _CSet(messaging.Consumer): +class ConsumerSet(messaging.Consumer): - def __init__(self, connection, *args, **kwargs): + def __init__(self, connection, from_dict=None, consumers=None, + callbacks=None, **kwargs): self.connection = connection self.backend = connection.channel() - super(_CSet, self).__init__(self.backend, *args, **kwargs) + + queues = [] + if consumers: + for consumer in consumers: + map(queues.extend, consumer.queues) + if from_dict: + for queue_name, queue_options in from_dict.items(): + queues.append(entry_to_queue(queue_name, **queue_options)) + + super(_CSet, self).__init__(self.backend, queues, **kwargs) def iterconsume(self, limit=None, no_ack=False): - return iterconsume(self.connection, self, no_ack, limit) + return _iterconsume(self.connection, self, no_ack, limit) def discard_all(self): return self.purge() @@ -200,17 +222,3 @@ class _CSet(messaging.Consumer): def close(self): self.cancel() self.channel.close() - - -def ConsumerSet(connection, from_dict=None, consumers=None, - callbacks=None, **kwargs): - - queues = [] - if consumers: - for consumer in consumers: - map(queues.extend, consumer.queues) - if from_dict: - for queue_name, queue_options in from_dict.items(): - queues.append(entry_to_queue(queue_name, **queue_options)) - - return _CSet(connection, queues, **kwargs) diff --git a/kombu/compression.py b/kombu/compression.py index ebe9ee96..e6f04702 100644 --- a/kombu/compression.py +++ b/kombu/compression.py @@ -1,29 +1,67 @@ +""" +kombu.compression +================= + +Compression utilities. + +:copyright: (c) 2009 - 2010 by Ask Solem. +:license: BSD, see LICENSE for more details. + +""" + _aliases = {} _encoders = {} _decoders = {} def register(encoder, decoder, content_type, aliases=[]): + """Register new compression method. + + :param encoder: Function used to compress text. + :param decoder: Function used to decompress previously compressed text. + :param content_type: The mime type this compression method identifies as. + :param aliases: A list of names to associate with this compression method. + + """ _encoders[content_type] = encoder _decoders[content_type] = decoder _aliases.update((alias, content_type) for alias in aliases) +def encoders(): + """Returns a list of available compression methods.""" + return _encoders.keys() + + def get_encoder(t): + """Get encoder by alias name.""" t = _aliases.get(t, t) return _encoders[t], t def get_decoder(t): + """Get decoder by alias name.""" return _decoders[_aliases.get(t, t)] def compress(body, content_type): + """Compress text. + + :param body: The text to compress. + :param content_type: mime-type of compression method to use. + + """ encoder, content_type = get_encoder(content_type) return encoder(body), content_type def decompress(body, content_type): + """Decompress compressed text. + + :param body: Previously compressed text to uncompress. + :param content_type: mime-type of compression method used. + + """ return get_decoder(content_type)(body) diff --git a/kombu/connection.py b/kombu/connection.py index c30db523..7ab6f8a9 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -1,3 +1,13 @@ +""" +kombu.connection +================ + +Broker connection and pools. + +:copyright: (c) 2009 - 2010 by Ask Solem. +:license: BSD, see LICENSE for more details. + +""" import socket from copy import copy @@ -16,31 +26,22 @@ class BrokerConnection(object): :keyword hostname: Hostname/address of the server to connect to. Default is ``"localhost"``. - :keyword userid: Username. Default is ``"guest"``. - :keyword password: Password. Default is ``"guest"``. - :keyword virtual_host: Virtual host. Default is ``"/"``. - :keyword port: Port of the server. Default is transport specific. - :keyword insist: Insist on connecting to a server. In a configuration with multiple load-sharing servers, the insist option tells the server that the client is insisting on a connection to the specified server. Default is ``False``. - :keyword ssl: Use ssl to connect to the server. Default is ``False``. - :keyword transport: Transport class to use. Can be a class, or a string specifying the path to the class. (e.g. ``kombu.transport.pyamqplib.Transport``), or one of the aliases: ``amqplib``, ``pika``, ``redis``, ``memory``. - :keyword connect_timeout: Timeout in seconds for connecting to the server. May not be suported by the specified transport. - **Usage** Creating a connection:: @@ -48,7 +49,8 @@ class BrokerConnection(object): >>> conn = BrokerConnection("rabbit.example.com") The connection is established lazily when needed. If you need the - connection to be established, then do so expliclty using :meth:`connect`:: + connection to be established, then force it to do so using + :meth:`connect`:: >>> conn.connect() @@ -99,8 +101,7 @@ class BrokerConnection(object): """ return self.transport.drain_events(self.connection, **kwargs) - def close(self): - """Close the connection (if open).""" + def _close(self): try: if self._connection: try: @@ -112,6 +113,11 @@ class BrokerConnection(object): pass self._closed = True + def release(self): + """Close the connection (if open).""" + self._close() + close = release + def ensure_connection(self, errback=None, max_retries=None, interval_start=2, interval_step=2, interval_max=30): """Ensure we have a connection to the server. @@ -205,12 +211,7 @@ class BrokerConnection(object): def create_transport(self): return self.get_transport_cls()(client=self) - create_backend = create_transport # FIXME - - def clone(self, **kwargs): - """Create a copy of the connection with the same connection - settings.""" - return self.__class__(**dict(self.info(), **kwargs)) + create_backend = create_transport # FIXME def get_transport_cls(self): """Get the currently used transport class.""" @@ -219,6 +220,11 @@ class BrokerConnection(object): transport_cls = get_transport_cls(transport_cls) return transport_cls + def clone(self, **kwargs): + """Create a copy of the connection with the same connection + settings.""" + return self.__class__(**dict(self.info(), **kwargs)) + def info(self): """Get connection info.""" transport_cls = self.transport_cls or "amqplib" @@ -240,7 +246,7 @@ class BrokerConnection(object): :keyword limit: Maximum number of active connections. Default is no limit. - :keyword preload: Number of connections to preload + :keyword preload: Number of connections to preload when the pool is created. Default is 0. *Example usage*:: @@ -267,7 +273,7 @@ class BrokerConnection(object): :keyword limit: Maximum number of active channels. Default is no limit. - :keyword preload: Number of channels to preload + :keyword preload: Number of channels to preload when the pool is created. Default is 0. *Example usage*:: @@ -344,6 +350,7 @@ class BrokerConnection(object): return "<BrokerConnection: %s>" % ( ", ".join("%s=%r" % (item, info[item]) for item in info.keys()[:8])) + def __copy__(self): """``x.__copy__() <==> copy(x)``""" return self.clone() @@ -355,10 +362,17 @@ class BrokerConnection(object): return self def __exit__(self, *args): - self.close() + self.release() @property def connection(self): + """The underlying connection object. + + .. warning:: + This instance is transport specific, so do not + depend on the interface of this object. + + """ if self._closed: return if not self._connection: @@ -449,7 +463,6 @@ class Resource(object): self._resource.put_nowait(resource) - class ConnectionPool(Resource): LimitExceeded = exceptions.ConnectionLimitExceeded diff --git a/kombu/entity.py b/kombu/entity.py index 6cc44d63..20f6f097 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -1,3 +1,13 @@ +""" +kombu.entity +================ + +Exchange and Queue declarations. + +:copyright: (c) 2009 - 2010 by Ask Solem. +:license: BSD, see LICENSE for more details. + +""" from kombu.abstract import MaybeChannelBound TRANSIENT_DELIVERY_MODE = 1 @@ -7,7 +17,7 @@ DELIVERY_MODES = {"transient": TRANSIENT_DELIVERY_MODE, class Exchange(MaybeChannelBound): - """An Exchange. + """An Exchange declaration. :keyword name: See :attr:`name`. :keyword type: See :attr:`type`. @@ -28,34 +38,34 @@ class Exchange(MaybeChannelBound): also define additional exchange types, so see your broker manual for more information about available exchange types. - * ``direct`` (*default*) + * `direct` (*default*) Direct match between the routing key in the message, and the routing criteria used when a queue is bound to this exchange. - * ``topic`` + * `topic` Wildcard match between the routing key and the routing pattern specified in the exchange/queue binding. The routing key is - treated as zero or more words delimited by ``"."`` and - supports special wildcard characters. ``"*"`` matches a - single word and ``"#"`` matches zero or more words. + treated as zero or more words delimited by `"."` and + supports special wildcard characters. `"*"` matches a + single word and `"#"` matches zero or more words. - * ``fanout`` + * `fanout` Queues are bound to this exchange with no arguments. Hence any message sent to this exchange will be forwarded to all queues bound to this exchange. - * ``headers`` + * `headers` Queues are bound to this exchange with a table of arguments containing headers and values (optional). A special argument named "x-match" determines the matching algorithm, where - ``"all"`` implies an ``AND`` (all pairs must match) and - ``"any"`` implies ``OR`` (at least one pair must match). + `"all"` implies an `AND` (all pairs must match) and + `"any"` implies `OR` (at least one pair must match). - :attr:`arguments`` is used to specify the arguments. + :attr:`arguments` is used to specify the arguments. This description of AMQP exchange types was shamelessly stolen from the blog post `AMQP in 10 minutes: Part 4`_ by @@ -72,18 +82,19 @@ class Exchange(MaybeChannelBound): Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged when a server restarts. - Default is ``True``. + Default is :const:`True`. .. attribute:: auto_delete If set, the exchange is deleted when all queues have finished - using it. Default is ``False``. + using it. Default is :const:`False`. .. attribute:: delivery_mode - The default delivery mode used for messages. The value is an integer. + The default delivery mode used for messages. The value is an integer, + or alias string. - * 1 or "transient" + * 1 or `"transient"` The message is transient. Which means it is stored in memory only, and is lost if the server dies or restarts. @@ -93,38 +104,16 @@ class Exchange(MaybeChannelBound): stored both in-memory, and on disk, and therefore preserved if the server dies or restarts. - The default value is ``2`` (persistent). + The default value is 2 (persistent). .. attribute:: arguments Additional arguments to specify when the exchange is declared. - - **Usage** - - Example creating an exchange declaration:: - - >>> news_exchange = Exchange("news", type="topic") - - For now ``news_exchange`` is just a declaration, you can't perform - actions on it. It just describes the name and options for the exchange. - - The exchange can be bound or unbound. Bound means the exchange is - associated with a channel and operations can be performed on it. - To bind the exchange you call the exchange with the channel as argument:: - - >>> bound_exchange = news_exchange(channel) - - Now you can perform operations like :meth:`declare` or :meth:`delete`:: - - >>> bound_exchange.declare() - >>> message = bound_exchange.Message("Cure for cancer found!") - >>> bound_exchange.publish(message, routing_key="news.science") - >>> bound_exchange.delete() - """ TRANSIENT_DELIVERY_MODE = TRANSIENT_DELIVERY_MODE PERSISTENT_DELIVERY_MODE = PERSISTENT_DELIVERY_MODE + name = "" type = "direct" durable = True @@ -150,7 +139,7 @@ class Exchange(MaybeChannelBound): Creates the exchange on the broker. :keyword nowait: If set the server will not respond, and a - response will not be waited for. Default is ``False``. + response will not be waited for. Default is :const:`False`. """ return self.channel.exchange_declare(exchange=self.name, @@ -160,9 +149,9 @@ class Exchange(MaybeChannelBound): arguments=self.arguments, nowait=nowait) - def Message(self, body, delivery_mode=None, - priority=None, content_type=None, content_encoding=None, - properties=None, headers=None): + def Message(self, body, delivery_mode=None, priority=None, + content_type=None, content_encoding=None, properties=None, + headers=None): """Create message instance to be sent with :meth:`publish`. :param body: Message body. @@ -170,7 +159,7 @@ class Exchange(MaybeChannelBound): :keyword delivery_mode: Set custom delivery mode. Defaults to :attr:`delivery_mode`. - :keyword priority: Message priority, ``0`` to ``9``. (currently not + :keyword priority: Message priority, 0 to 9. (currently not supported by RabbitMQ). :keyword content_type: The messages content_type. If content_type @@ -219,10 +208,10 @@ class Exchange(MaybeChannelBound): """Delete the exchange declaration on server. :keyword if_unused: Delete only if the exchange has no bindings. - Default is ``False``. + Default is :const:`False`. :keyword nowait: If set the server will not respond, and a - response will not be waited for. Default is ``False``. + response will not be waited for. Default is :const:`False`. """ return self.channel.exchange_delete(exchange=self.name, @@ -275,12 +264,12 @@ class Queue(MaybeChannelBound): Matches the routing key property of the message by a primitive pattern matching scheme. The message routing key then consists - of words separated by dots (``"."``, like domain names), and - two special characters are available; star (``"*"``) and hash - (``"#"``). The star matches any word, and the hash matches - zero or more words. For example ``"*.stock.#"`` matches the - routing keys ``"usd.stock"`` and ``"eur.stock.db"`` but not - ``"stock.nasdaq"``. + of words separated by dots (`"."`, like domain names), and + two special characters are available; star (`"*"`) and hash + (`"#"`). The star matches any word, and the hash matches + zero or more words. For example `"*.stock.#"` matches the + routing keys `"usd.stock"` and `"eur.stock.db"` but not + `"stock.nasdaq"`. .. attribute:: channel @@ -295,7 +284,7 @@ class Queue(MaybeChannelBound): messages, although it does not make sense to send persistent messages to a transient queue. - Default is ``True``. + Default is :const:`True`. .. attribute:: exclusive @@ -303,7 +292,7 @@ class Queue(MaybeChannelBound): current connection. Setting the 'exclusive' flag always implies 'auto-delete'. - Default is ``False``. + Default is :const:`False`. .. attribute:: auto_delete @@ -321,32 +310,7 @@ class Queue(MaybeChannelBound): Additional arguments used when binding the queue. - **Usage** - - Example creating a queue using our exchange in the :class:`Exchange` - example:: - - >>> science_news = Queue("science_news", - ... exchange=news_exchange, - ... routing_key="news.science") - - For now ``science_news`` is just a declaration, you can't perform - actions on it. It just describes the name and options for the queue. - - The queue can be bound or unbound. Bound means the queue is - associated with a channel and operations can be performed on it. - To bind the queue you call the queue instance with the channel as - an argument:: - - >>> bound_science_news = science_news(channel) - - Now you can perform operations like :meth:`declare` or :meth:`purge`:: - - >>> bound_sicence_news.declare() - >>> bound_science_news.purge() - >>> bound_science_news.delete() """ - name = "" exchange = None routing_key = "" @@ -389,7 +353,7 @@ class Queue(MaybeChannelBound): """Declare queue on the server. :keyword nowait: Do not wait for a reply. - :keyword passive: If set, the server will not create the queue. + :keyword passive: If set, the server will not create the queue. The client can use this to check whether a queue exists without modifying the server state. @@ -414,7 +378,6 @@ class Queue(MaybeChannelBound): arguments=self.binding_arguments, nowait=nowait) - def get(self, no_ack=None): """Poll the server for a new message. @@ -497,6 +460,4 @@ class Queue(MaybeChannelBound): return super(Queue, self).__repr__( "Queue %s -> %s -> %s" % (self.name, self.exchange, - self.routing_key)) - -Binding = Queue + self.routing_key)) diff --git a/kombu/exceptions.py b/kombu/exceptions.py index fe88ac38..fb4db655 100644 --- a/kombu/exceptions.py +++ b/kombu/exceptions.py @@ -1,17 +1,33 @@ +""" +kombu.exceptions +================ + +Exceptions. + +:copyright: (c) 2009 - 2010 by Ask Solem. +:license: BSD, see LICENSE for more details. + +""" + + class NotBoundError(Exception): """Trying to call channel dependent method on unbound entity.""" + pass class MessageStateError(Exception): """The message has already been acknowledged.""" + pass class EnsureExhausted(Exception): """ensure() limit exceeded.""" + pass class TimeoutError(Exception): """Operation timed out.""" + pass class LimitExceeded(Exception): diff --git a/kombu/messaging.py b/kombu/messaging.py index 88c4d49c..bac4e320 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -1,9 +1,18 @@ +""" +kombu.messaging +=============== + +Sending and receiving messages. + +:copyright: (c) 2009 - 2010 by Ask Solem. +:license: BSD, see LICENSE for more details. + +""" from itertools import count -from kombu import serialization from kombu.compression import compress from kombu.entity import Exchange, Queue -from kombu.entity import Binding # TODO Remove +from kombu.serialization import encode from kombu.utils import maybe_list @@ -13,43 +22,38 @@ class Producer(object): :param channel: Connection channel. :keyword exchange: Default exchange. :keyword routing_key: Default routing key. - :keyword serializer: Default serializer. Default is ``"json"``. + :keyword serializer: Default serializer. Default is `"json"`. :keyword compression: Default compression method. Default is no compression. :keyword auto_declare: Automatically declare the exchange - at instantiation. Default is ``True``. + at instantiation. Default is :const:`True`. :keyword on_return: Callback to call for undeliverable messages, - when the ``mandatory`` or ``imediate`` arguments to + when the `mandatory` or `imediate` arguments to :meth:`publish` is used. This callback needs the following - signature: ``(exception, exchange, routing_key, message)``. - - .. attribute:: channel - - The connection channel to use. - - .. attribute:: exchange + signature: `(exception, exchange, routing_key, message)`. - Exchange to publish to. - - .. attribute:: routing_key - - Default routing key. + """ + #: The connection channel used. + channel = None - .. attribute:: serializer + #: Default exchange. + exchange = Exchange("") - Default serializer to use. Default is autodetect. + # Default routing key. + routing_key = "" - .. attribute:: auto_declare + #: Default serializer to use. Default is JSON. + serializer = None - By default the exchange is declared at instantiation. - If you want to declare manually you can set this to ``False``. + #: Default compression method. Disabled by default. + compression = None - """ - exchange = Exchange("") - serializer = None + #: By default the exchange is declared at instantiation. + #: If you want to declare manually then you can set this + #: to :const:`False`. auto_declare = True - routing_key = "" - compression = None + + #: Basic return callback. on_return = None def __init__(self, channel, exchange=None, routing_key=None, @@ -71,47 +75,16 @@ class Producer(object): if self.on_return: self.channel.events["basic_return"].append(self.on_return) - def revive(self, channel): - self.channel = channel - self.exchange.revive(channel) - def declare(self): """Declare the exchange. This is done automatically at instantiation if :attr:`auto_declare` - is set. + is set to :const:`True`. """ if self.exchange.name: self.exchange.declare() - def _prepare(self, body, serializer=None, - content_type=None, content_encoding=None, compression=None, - headers=None): - - # No content_type? Then we're serializing the data internally. - if not content_type: - serializer = serializer or self.serializer - (content_type, content_encoding, - body) = serialization.encode(body, serializer=serializer) - else: - # If the programmer doesn't want us to serialize, - # make sure content_encoding is set. - if isinstance(body, unicode): - if not content_encoding: - content_encoding = 'utf-8' - body = body.encode(content_encoding) - - # If they passed in a string, we can't know anything - # about it. So assume it's binary data. - elif not content_encoding: - content_encoding = 'binary' - - if compression: - body, headers["compression"] = compress(body, compression) - - return body, content_type, content_encoding - def publish(self, body, routing_key=None, delivery_mode=None, mandatory=False, immediate=False, priority=0, content_type=None, content_encoding=None, serializer=None, headers=None, @@ -151,6 +124,38 @@ class Producer(object): return self.exchange.publish(message, routing_key, mandatory, immediate, exchange=exchange) + def revive(self, channel): + """Revive the producer after connection loss.""" + self.channel = channel + self.exchange.revive(channel) + + def _prepare(self, body, serializer=None, + content_type=None, content_encoding=None, compression=None, + headers=None): + + # No content_type? Then we're serializing the data internally. + if not content_type: + serializer = serializer or self.serializer + (content_type, content_encoding, + body) = encode(body, serializer=serializer) + else: + # If the programmer doesn't want us to serialize, + # make sure content_encoding is set. + if isinstance(body, unicode): + if not content_encoding: + content_encoding = 'utf-8' + body = body.encode(content_encoding) + + # If they passed in a string, we can't know anything + # about it. So assume it's binary data. + elif not content_encoding: + content_encoding = 'binary' + + if compression: + body, headers["compression"] = compress(body, compression) + + return body, content_type, content_encoding + class Consumer(object): """Message consumer. @@ -162,43 +167,38 @@ class Consumer(object): :keyword callbacks: see :attr:`callbacks`. :keyword on_decode_error: see :attr:`on_decode_error`. - .. attribute:: channel - - The connection channel to use. - - .. attribute:: queues - - A single :class:`~kombu.entity.Queue`, or a list of queues to - consume from. - - .. attribute:: auto_declare - - By default the entities will be declared at instantiation, - if you want to handle this manually you can set this to ``False``. - - .. attribute:: callbacks - - List of callbacks called in order when a message is received. - - The signature of the callbacks must take two arguments: - ``(body, message)``, which is the decoded message body and - the ``Message`` instance (a subclass of - :class:`~kombu.transport.base.Message`). - - .. attribute:: on_decode_error - - Callback called when a message can't be decoded. + """ + #: The connection channel to use. + channel = None - The signature of the callback must take two arguments: ``(message, - exc)``, which is the message that can't be decoded and the exception - that occured while trying to decode it. + #: A single :class:`~kombu.entity.Queue`, or a list of queues to + #: consume from. + queues = None - """ + #: Flag for message acknowledgment disabled/enabled. + #: Enabled by default. no_ack = False + + #: By default all entities will be declared at instantiation, if you + #: want to handle this manually you can set this to :const:`False`. auto_declare = True + + #: List of callbacks called in order when a message is received. + #: + #: The signature of the callbacks must take two arguments: + #: `(body, message)`, which is the decoded message body and + #: the `Message` instance (a subclass of + #: :class:`~kombu.transport.base.Message`). callbacks = None + + #: Callback called when a message can't be decoded. + #: + #: The signature of the callback must take two arguments: `(message, + #: exc)`, which is the message that can't be decoded and the exception + #: that occured while trying to decode it. on_decode_error = None - _next_tag = count(1).next # global + + _next_tag = count(1).next # global def __init__(self, channel, queues, no_ack=None, auto_declare=None, callbacks=None, on_decode_error=None): @@ -223,11 +223,6 @@ class Consumer(object): if self.auto_declare: self.declare() - def revive(self, channel): - for queue in self.queues: - queue.revive(channel) - self.channel = channel - def declare(self): """Declare queues, exchanges and bindings. @@ -238,62 +233,28 @@ class Consumer(object): for queue in self.queues: queue.declare() - def _basic_consume(self, queue, consumer_tag=None, - no_ack=no_ack, nowait=True): - if queue.name not in self._active_tags: - queue.consume(self._add_tag(queue, consumer_tag), - self._receive_callback, - no_ack=no_ack, - nowait=nowait) - - def consume(self, no_ack=None): - """Register consumer on server. - - """ - if no_ack is None: - no_ack = self.no_ack - H, T = self.queues[:-1], self.queues[-1] - for queue in H: - self._basic_consume(queue, no_ack=no_ack, nowait=True) - self._basic_consume(T, no_ack=no_ack, nowait=False) - - def receive(self, body, message): - """Method called when a message is received. - - This dispatches to the registered :attr:`callbacks`. - - :param body: The decoded message body. - :param message: The ``Message`` instance. - - :raises NotImplementedError: If no consumer callbacks have been - registered. - - """ - if not self.callbacks: - raise NotImplementedError("No consumer callbacks registered") - for callback in self.callbacks: - callback(body, message) - def register_callback(self, callback): """Register a new callback to be called when a message is received. The signature of the callback needs to accept two arguments: - ``(body, message)``, which is the decoded message body - and the ``Message`` instance (a subclass of + `(body, message)`, which is the decoded message body + and the `Message` instance (a subclass of :class:`~kombu.transport.base.Message`. """ self.callbacks.append(callback) - def purge(self): - """Purge messages from all queues. - - **WARNING**: This will *delete all ready messages*, there is no - undo operation available. + def consume(self, no_ack=None): + """Register consumer on server. """ - return sum(queue.purge() for queue in self.queues) + if no_ack is None: + no_ack = self.no_ack + H, T = self.queues[:-1], self.queues[-1] + for queue in H: + self._basic_consume(queue, no_ack=no_ack, nowait=True) + self._basic_consume(T, no_ack=no_ack, nowait=False) def cancel(self): """End all active queue consumers. @@ -307,8 +268,19 @@ class Consumer(object): self._active_tags.clear() def cancel_by_queue(self, queue): + """Cancel consumer by queue name.""" self.channel.basic_cancel(self._active_tags[queue]) + def purge(self): + """Purge messages from all queues. + + .. warning:: + This will *delete all ready messages*, there is no + undo operation available. + + """ + return sum(queue.purge() for queue in self.queues) + def flow(self, active): """Enable/disable flow from peer. @@ -359,13 +331,44 @@ class Consumer(object): on the specified channel. :keyword requeue: By default the messages will be redelivered - to the original recipient. With ``requeue`` set to true, the + to the original recipient. With `requeue` set to true, the server will attempt to requeue the message, potentially then delivering it to an alternative subscriber. """ return self.channel.basic_recover(requeue=requeue) + def receive(self, body, message): + """Method called when a message is received. + + This dispatches to the registered :attr:`callbacks`. + + :param body: The decoded message body. + :param message: The `Message` instance. + + :raises NotImplementedError: If no consumer callbacks have been + registered. + + """ + if not self.callbacks: + raise NotImplementedError("No consumer callbacks registered") + for callback in self.callbacks: + callback(body, message) + + def revive(self, channel): + """Revive consumer after connection loss.""" + for queue in self.queues: + queue.revive(channel) + self.channel = channel + + def _basic_consume(self, queue, consumer_tag=None, + no_ack=no_ack, nowait=True): + if queue.name not in self._active_tags: + queue.consume(self._add_tag(queue, consumer_tag), + self._receive_callback, + no_ack=no_ack, + nowait=nowait) + def _add_tag(self, queue, consumer_tag=None): tag = consumer_tag or str(self._next_tag()) self._active_tags[queue.name] = tag @@ -388,3 +391,6 @@ class Consumer(object): def __exit__(self, *args): self.cancel() + + def __repr__(self): + return "<Consumer: %s>" % (self.queues, ) diff --git a/kombu/pidbox.py b/kombu/pidbox.py index d8af9c0f..8bb4af42 100644 --- a/kombu/pidbox.py +++ b/kombu/pidbox.py @@ -1,44 +1,16 @@ """ +kombu.pidbox +=============== -Creating the applications Mailbox -================================= +Generic process mailbox. -:: - - >>> mailbox = pidbox.Mailbox("celerybeat", type="direct") - - >>> @mailbox.handler - >>> def reload_schedule(state, **kwargs): - ... state["beat"].reload_schedule() - - >>> @mailbox.handler - >>> def connection_info(state, **kwargs): - ... return {"connection": state["connection"].info()} - -Example Node -============ - -:: - >>> connection = kombu.BrokerConnection() - >>> state = {"beat": beat, - "connection": connection} - >>> consumer = mailbox(connection).Node(hostname).listen() - >>> try: - ... while True: - ... connection.drain_events(timeout=1) - ... finally: - ... consumer.cancel() - -Example Client -============== - -:: - >>> mailbox.cast("reload_schedule") # cast is async. - >>> info = celerybeat.call("connection_info", timeout=1) +:copyright: (c) 2009 - 2010 by Ask Solem. +:license: BSD, see LICENSE for more details. """ import socket + from copy import copy from itertools import count @@ -49,6 +21,21 @@ from kombu.utils import gen_unique_id, kwdict class Node(object): + #: hostname of the node. + hostname = None + + #: the :class:`Mailbox` this is a node for. + mailbox = None + + #: map of method name/handlers. + handlers = None + + #: current context (passed on to handlers) + state = None + + #: current channel. + channel = None + def __init__(self, hostname, state=None, channel=None, handlers=None, mailbox=None): self.channel = channel @@ -65,6 +52,10 @@ class Node(object): [self.mailbox.get_queue(self.hostname)], **options) + def handler(self, fun): + self.handlers[fun.__name__] = fun + return fun + def listen(self, channel=None, callback=None): callback = callback or self.handle_message consumer = self.Consumer(channel=channel, @@ -72,10 +63,6 @@ class Node(object): consumer.consume() return consumer - def reply(self, data, exchange, routing_key, **kwargs): - self.mailbox._publish_reply(data, exchange, routing_key, - channel=self.channel) - def dispatch_from_message(self, message): message = dict(message) method = message["method"] @@ -99,28 +86,43 @@ class Node(object): self.reply({self.hostname: reply}, **reply_to) return reply + def handle(self, method, arguments={}): + return self.handlers[method](self.state, **arguments) + def handle_call(self, method, arguments): return self.handle(method, arguments) def handle_cast(self, method, arguments): return self.handle(method, arguments) - def handler(self, fun): - self.handlers[fun.__name__] = fun - return fun - - def handle(self, method, arguments={}): - return self.handlers[method](self.state, **arguments) - def handle_message(self, message_data, message): self.dispatch_from_message(message_data) + def reply(self, data, exchange, routing_key, **kwargs): + self.mailbox._publish_reply(data, exchange, routing_key, + channel=self.channel) + class Mailbox(object): node_cls = Node exchange_fmt = "%s.pidbox" reply_exchange_fmt = "reply.%s.pidbox" + #: Name of application. + namespace = None + + #: Connection (if bound). + connection = None + + #: Exchange type (usually direct, or fanout for broadcast). + type = "direct" + + #: mailbox exchange (init by constructor). + exchange = None + + #: exchange to send replies to. + reply_exchange = None + def __init__(self, namespace, type="direct", connection=None): self.namespace = namespace self.connection = connection @@ -168,7 +170,6 @@ class Mailbox(object): return Queue("%s.%s.pidbox" % (hostname, self.namespace), exchange=self.exchange) - def _publish_reply(self, reply, exchange, routing_key, channel=None): chan = channel or self.connection.channel() try: diff --git a/kombu/serialization.py b/kombu/serialization.py index 0e6d1f66..2e5507aa 100644 --- a/kombu/serialization.py +++ b/kombu/serialization.py @@ -1,31 +1,20 @@ """ -Centralized support for encoding/decoding of data structures. -Requires a json library (`cjson`_, `simplejson`_, or `Python 2.6+`_). +kombu.serialization +=================== -Pickle support is built-in. +Serialization utilities. -Optionally installs support for ``YAML`` if the `PyYAML`_ package -is installed. - -Optionally installs support for `msgpack`_ if the `msgpack-python`_ -package is installed. - -.. _`cjson`: http://pypi.python.org/pypi/python-cjson/ -.. _`simplejson`: http://code.google.com/p/simplejson/ -.. _`Python 2.6+`: http://docs.python.org/library/json.html -.. _`PyYAML`: http://pyyaml.org/ -.. _`msgpack`: http://msgpack.sourceforge.net/ -.. _`msgpack-python`: http://pypi.python.org/pypi/msgpack-python/ +:copyright: (c) 2009 - 2010 +:license: BSD, see LICENSE for more details. """ import codecs -__all__ = ['SerializerNotInstalled', 'registry'] - class SerializerNotInstalled(StandardError): """Support for the requested serialization type is not installed""" + pass class SerializerRegistry(object): @@ -40,28 +29,6 @@ class SerializerRegistry(object): def register(self, name, encoder, decoder, content_type, content_encoding='utf-8'): - """Register a new encoder/decoder. - - :param name: A convenience name for the serialization method. - - :param encoder: A method that will be passed a python data structure - and should return a string representing the serialized data. - If ``None``, then only a decoder will be registered. Encoding - will not be possible. - - :param decoder: A method that will be passed a string representing - serialized data and should return a python data structure. - If ``None``, then only an encoder will be registered. - Decoding will not be possible. - - :param content_type: The mime-type describing the serialized - structure. - - :param content_encoding: The content encoding (character set) that - the :param:`decoder` method will be returning. Will usually be - ``utf-8``, ``us-ascii``, or ``binary``. - - """ if encoder: self._encoders[name] = (content_type, content_encoding, encoder) if decoder: @@ -72,7 +39,7 @@ class SerializerRegistry(object): Set the default serialization method used by this library. :param name: The name of the registered serialization method. - For example, ``json`` (default), ``pickle``, ``yaml``, + For example, `json` (default), `pickle`, `yaml`, `msgpack`, or any custom methods registered using :meth:`register`. :raises SerializerNotInstalled: If the serialization method @@ -86,34 +53,6 @@ class SerializerRegistry(object): "No encoder installed for %s" % name) def encode(self, data, serializer=None): - """ - Serialize a data structure into a string suitable for sending - as an AMQP message body. - - :param data: The message data to send. Can be a list, - dictionary or a string. - - :keyword serializer: An optional string representing - the serialization method you want the data marshalled - into. (For example, ``json``, ``raw``, or ``pickle``). - - If ``None`` (default), then `JSON`_ will be used, unless - ``data`` is a ``str`` or ``unicode`` object. In this - latter case, no serialization occurs as it would be - unnecessary. - - Note that if ``serializer`` is specified, then that - serialization method will be used even if a ``str`` - or ``unicode`` object is passed in. - - :returns: A three-item tuple containing the content type - (e.g., ``application/json``), content encoding, (e.g., - ``utf-8``) and a string containing the serialized - data. - - :raises SerializerNotInstalled: If the serialization method - requested is not available. - """ if serializer == "raw": return raw_encode(data) if serializer and not self._encoders.get(serializer): @@ -145,19 +84,6 @@ class SerializerRegistry(object): return content_type, content_encoding, payload def decode(self, data, content_type, content_encoding): - """Deserialize a data stream as serialized using ``encode`` - based on :param:`content_type`. - - :param data: The message data to deserialize. - - :param content_type: The content-type of the data. - (e.g., ``application/json``). - - :param content_encoding: The content-encoding of the data. - (e.g., ``utf-8``, ``binary``, or ``us-ascii``). - - :returns: The unserialized data. - """ content_type = content_type or 'application/data' content_encoding = (content_encoding or 'utf-8').lower() @@ -182,23 +108,87 @@ Global registry of serializers/deserializers. """ registry = SerializerRegistry() + """ .. function:: encode(data, serializer=default_serializer) -Encode data using the registry's default encoder. + Serialize a data structure into a string suitable for sending + as an AMQP message body. + + :param data: The message data to send. Can be a list, + dictionary or a string. + + :keyword serializer: An optional string representing + the serialization method you want the data marshalled + into. (For example, `json`, `raw`, or `pickle`). + + If :const:`None` (default), then json will be used, unless + `data` is a :class:`str` or :class:`unicode` object. In this + latter case, no serialization occurs as it would be + unnecessary. + + Note that if `serializer` is specified, then that + serialization method will be used even if a :class:`str` + or :class:`unicode` object is passed in. + :returns: A three-item tuple containing the content type + (e.g., `application/json`), content encoding, (e.g., + `utf-8`) and a string containing the serialized + data. + + :raises SerializerNotInstalled: If the serialization method + requested is not available. """ encode = registry.encode """ .. function:: decode(data, content_type, content_encoding): -Decode data using the registry's default decoder. + Deserialize a data stream as serialized using `encode` + based on `content_type`. + + :param data: The message data to deserialize. + + :param content_type: The content-type of the data. + (e.g., `application/json`). + + :param content_encoding: The content-encoding of the data. + (e.g., `utf-8`, `binary`, or `us-ascii`). + + :returns: The unserialized data. """ decode = registry.decode +""" +.. function:: register(name, encoder, decoder, content_type, + content_encoding="utf-8"): + Register a new encoder/decoder. + + :param name: A convenience name for the serialization method. + + :param encoder: A method that will be passed a python data structure + and should return a string representing the serialized data. + If :const:`None`, then only a decoder will be registered. Encoding + will not be possible. + + :param decoder: A method that will be passed a string representing + serialized data and should return a python data structure. + If :const:`None`, then only an encoder will be registered. + Decoding will not be possible. + + :param content_type: The mime-type describing the serialized + structure. + + :param content_encoding: The content encoding (character set) that + the `decoder` method will be returning. Will usually be + utf-8`, `us-ascii`, or `binary`. + + """ +register = registry.register + + def raw_encode(data): """Special case serializer.""" content_type = 'application/data' diff --git a/kombu/simple.py b/kombu/simple.py index e3968c50..eae5a5b6 100644 --- a/kombu/simple.py +++ b/kombu/simple.py @@ -1,3 +1,13 @@ +""" +kombu.simple +============ + +Simple interface. + +:copyright: (c) 2009 - 2010 by Ask Solem. +:license: BSD, see LICENSE for more details. + +""" import socket from collections import deque @@ -22,20 +32,6 @@ class SimpleBase(object): self.buffer = deque() self.consumer.register_callback(self._receive) - def _receive(self, message_data, message): - self.buffer.append(message) - - def _consume(self): - if not self._consuming: - self.consumer.consume(no_ack=self.no_ack) - self._consuming = True - - def get_nowait(self): - m = self.queue.get(no_ack=self.no_ack) - if not m: - raise Empty() - return m - def get(self, block=True, timeout=None, sync=False): if block: return self.get_nowait() @@ -54,6 +50,12 @@ class SimpleBase(object): elapsed += time() - time_start remaining = timeout - elapsed + def get_nowait(self): + m = self.queue.get(no_ack=self.no_ack) + if not m: + raise Empty() + return m + def put(self, message, serializer=None, headers=None, compression=None, routing_key=None, **kwargs): self.producer.publish(message, @@ -75,10 +77,19 @@ class SimpleBase(object): self.channel.close() self.consumer.cancel() + def _receive(self, message_data, message): + self.buffer.append(message) + + def _consume(self): + if not self._consuming: + self.consumer.consume(no_ack=self.no_ack) + self._consuming = True + def __del__(self): self.close() def __len__(self): + """`len(self) -> self.qsize()`""" return self.qsize() diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py index b2116edb..00f62e7f 100644 --- a/kombu/tests/test_connection.py +++ b/kombu/tests/test_connection.py @@ -28,7 +28,7 @@ class test_Connection(unittest.TestCase): self.assertTrue(conn.connection.connected) conn.__exit__() self.assertIsNone(conn.connection) - conn.close() # again + conn.close() # again class ResourceCase(unittest.TestCase): diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py index 1f8130e4..a90dc51b 100644 --- a/kombu/tests/test_messaging.py +++ b/kombu/tests/test_messaging.py @@ -240,12 +240,12 @@ class test_Consumer(unittest.TestCase): channel = self.connection.channel() b1 = Queue("qname1", self.exchange, "rkey") consumer = Consumer(channel, [b1]) - received = [] + def callback(message_data, message): received.append(message_data) message.ack() - message.payload # trigger cache + message.payload # trigger cache consumer.register_callback(callback) consumer._receive_callback({u"foo": u"bar"}) @@ -339,8 +339,8 @@ class test_Consumer(unittest.TestCase): b1 = Queue("qname1", self.exchange, "rkey") consumer = Consumer(channel, [b1]) consumer.channel.throw_decode_error = True - thrown = [] + def on_decode_error(msg, exc): thrown.append((msg.body, exc)) diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index 4e406149..a6de13c9 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -1,3 +1,13 @@ +""" +kombu.transport +=============== + +Built-in transports. + +:copyright: (c) 2009 - 2010 by Ask Solem. +:license: BSD, see LICENSE for more details. + +""" import sys from kombu.utils import rpartition @@ -37,7 +47,7 @@ def get_transport_cls(transport=None): "kombu.transport.pyamqplib.Transport" - If the name does not include "``.``" (is not fully qualified), + If the name does not include `"."` (is not fully qualified), the alias table will be consulted. """ diff --git a/kombu/transport/base.py b/kombu/transport/base.py index 23033394..2fcdf35b 100644 --- a/kombu/transport/base.py +++ b/kombu/transport/base.py @@ -1,8 +1,14 @@ """ +kombu.transport.base +==================== -Transport base classes. +Base transport interface. + +:copyright: (c) 2009 - 2010 by Ask Solem. +:license: BSD, see LICENSE for more details. """ + from kombu import serialization from kombu.compression import decompress from kombu.exceptions import MessageStateError @@ -16,6 +22,30 @@ class Message(object): MessageStateError = MessageStateError + #: The channel the message was received on. + channel = None + + #: Delivery tag used to identify the message in this channel. + delivery_tag = None + + #: Content type used to identify the type of content. + content_type = None + + #: Content encoding used to identify the text encoding of the body. + content_encoding = None + + #: Additional delivery information. + delivery_info = None + + #: Message headers + headers = None + + #: Application properties + properties = None + + #: Raw message body (may be serialized), see :attr:`payload` instead. + body = None + def __init__(self, channel, body=None, delivery_tag=None, content_type=None, content_encoding=None, delivery_info={}, properties=None, headers=None, @@ -35,19 +65,6 @@ class Message(object): if compression: self.body = decompress(self.body, compression) - def decode(self): - """Deserialize the message body, returning the original - python structure sent by the publisher.""" - return serialization.decode(self.body, self.content_type, - self.content_encoding) - - @property - def payload(self): - """The decoded message.""" - if not self._decoded_cache: - self._decoded_cache = self.decode() - return self._decoded_cache - def ack(self): """Acknowledge this message as being processed., This will remove the message from the queue. @@ -93,29 +110,54 @@ class Message(object): self.channel.basic_reject(self.delivery_tag, requeue=True) self._state = "REQUEUED" + def decode(self): + """Deserialize the message body, returning the original + python structure sent by the publisher.""" + return serialization.decode(self.body, self.content_type, + self.content_encoding) + @property def acknowledged(self): + """Set to true if the message has been acknowledged.""" return self._state in ACKNOWLEDGED_STATES + @property + def payload(self): + """The decoded message body.""" + if not self._decoded_cache: + self._decoded_cache = self.decode() + return self._decoded_cache + class Transport(object): """Base class for transports.""" + + #: The :class:`~kombu.connection.BrokerConnection` owning this instance. client = None + + #: Default port used when no port has been specified. default_port = None + + #: Tuple of errors that can happen due to connection failure. connection_errors = () + + #: Tuple of errors that can happen due to channel/method failure. channel_errors = () def __init__(self, client, **kwargs): self.client = client - def create_channel(self, connection): + def establish_connection(self): raise NotImplementedError("Subclass responsibility") - def drain_events(self, connection, **kwargs): + def close_connection(self, connection): raise NotImplementedError("Subclass responsibility") - def establish_connection(self): + def create_channel(self, connection): raise NotImplementedError("Subclass responsibility") - def close_connection(self, connection): + def close_channel(self, connection): + raise NotImplementedError("Subclass responsibility") + + def drain_events(self, connection, **kwargs): raise NotImplementedError("Subclass responsibility") diff --git a/kombu/transport/memory.py b/kombu/transport/memory.py index 194c07d6..8f08bcde 100644 --- a/kombu/transport/memory.py +++ b/kombu/transport/memory.py @@ -1,3 +1,13 @@ +""" +kombu.transport.memory +====================== + +In-memory transport. + +:copyright: (c) 2009 - 2010 by Ask Solem. +:license: BSD, see LICENSE for more details. + +""" from Queue import Queue from kombu.transport import virtual diff --git a/kombu/transport/pyamqplib.py b/kombu/transport/pyamqplib.py index 8c5e9942..b3521af8 100644 --- a/kombu/transport/pyamqplib.py +++ b/kombu/transport/pyamqplib.py @@ -1,3 +1,13 @@ +""" +kombu.transport.pyamqplib +========================= + +amqplib transport. + +:copyright: (c) 2009 - 2010 by Ask Solem. +:license: BSD, see LICENSE for more details. + +""" import socket from amqplib import client_0_8 as amqp diff --git a/kombu/transport/pypika.py b/kombu/transport/pypika.py index 7305cddd..01567bcb 100644 --- a/kombu/transport/pypika.py +++ b/kombu/transport/pypika.py @@ -1,3 +1,13 @@ +""" +kombu.transport.pypika +====================== + +Pika transport. + +:copyright: (c) 2009 - 2010 by Ask Solem. +:license: BSD, see LICENSE for more details. + +""" from pika import asyncore_adapter from pika import blocking_adapter from pika import channel @@ -107,7 +117,6 @@ class SyncTransport(base.Transport): exceptions.UnknownConsumerTag, exceptions.ProtocolSyntaxError) - Message = Message Connection = BlockingConnection diff --git a/kombu/transport/pyredis.py b/kombu/transport/pyredis.py index 57ef5f1a..1f155a0a 100644 --- a/kombu/transport/pyredis.py +++ b/kombu/transport/pyredis.py @@ -1,3 +1,13 @@ +""" +kombu.transport.pyredis +======================= + +Redis transport. + +:copyright: (c) 2009 - 2010 by Ask Solem. +:license: BSD, see LICENSE for more details. + +""" from Queue import Empty from anyjson import serialize, deserialize diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index 29557977..38dccb38 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -1,12 +1,13 @@ """ - kombu.transport.virtual - ~~~~~~~~~~~~~~~~~~~~~~~ +kombu.transport.virtual +======================= - Virtual transport implementation. - Emulates the AMQ API for non-AMQ transports. +Virtual transport implementation. - :copyright: (c) 2009 - 2010 by Ask Solem. - :license: BSD, see LICENSE for more details. +Emulates the AMQ API for non-AMQ transports. + +:copyright: (c) 2009, 2010 by Ask Solem. +:license: BSD, see LICENSE for more details. """ import socket @@ -49,9 +50,11 @@ class QoS(object): :param channel: AMQ Channel. :keyword prefetch_count: Initial prefetch count (defaults to 0). - """ + #: current prefetch count value + prefetch_count = 0 + #: :class:`~collections.OrderedDict` of active messages. _delivered = None @@ -155,34 +158,58 @@ class Message(base.Message): class AbstractChannel(object): - """Methods implemented by subclasses of :class:`Channel`.""" + """This is an abstract class defining the channel methods + you'd usually want to implement in a virtual channel. + + Do not subclass directly, but rather inherit from :class:`Channel` + instead. + + """ def _get(self, queue): + """Get next message from `queue`.""" raise NotImplementedError("Virtual channels must implement _get") def _put(self, queue, message): + """Put `message` onto `queue`.""" raise NotImplementedError("Virtual channels must implement _put") def _purge(self, queue): + """Remove all messages from `queue`.""" raise NotImplementedError("Virtual channels must implement _purge") def _size(self, queue): + """Return the number of messages in `queue` as an :class:`int`.""" return 0 def _delete(self, queue): + """Delete `queue`. + + This just purges the queue, if you need to do more you can + override this method. + + """ self._purge(queue) def _new_queue(self, queue, **kwargs): + """Create new queue. + + Some implementations needs to do additiona actions when + the queue is created. You can do so by overriding this + method. + + """ pass def _poll(self, queues): + """Poll a list of queues for available messages.""" return FairCycle(self._get, queues, Empty).get() class Channel(AbstractChannel): """Virtual channel. - :param connection: The connection this channel is part of. + :param connection: The transport instance this channel is part of. """ #: message class used. @@ -208,45 +235,6 @@ class Channel(AbstractChannel): self.exchange_types = dict((typ, cls(self)) for typ, cls in self.exchange_types.items()) - def get_table(self, exchange): - """Get table of bindings for `exchange`.""" - return self.state.exchanges[exchange]["table"] - - def typeof(self, exchange): - """Get the exchange type instance for `exchange`.""" - type = self.state.exchanges[exchange]["type"] - return self.exchange_types[type] - - def _lookup(self, exchange, routing_key, default="ae.undeliver"): - """Find all queues matching `routing_key` for the given `exchange`. - - Returns `default` if no queues matched. - - """ - table = self.get_table(exchange) - try: - return self.typeof(exchange).lookup(table, exchange, - routing_key, default) - except KeyError: - self._new_queue(default) - return [default] - - def _restore(self, message): - """Redeliver message to its original destination.""" - delivery_info = message.delivery_info - message = message.serializable() - message["redelivered"] = True - for queue in self._lookup(delivery_info["exchange"], - delivery_info["routing_key"]): - self._put(queue, message) - - def drain_events(self, timeout=None): - if self._consumers and self.qos.can_consume(): - if hasattr(self, "_get_many"): - return self._get_many(self._active_queues, timeout=timeout) - return self._poll(self._active_queues) - raise Empty() - def exchange_declare(self, exchange, type="direct", durable=False, auto_delete=False, arguments=None, nowait=False): """Declare exchange.""" @@ -305,13 +293,33 @@ class Channel(AbstractChannel): """Remove all ready messages from queue.""" return self._purge(queue) - def basic_qos(self, prefetch_size, prefetch_count, apply_global=False): - """Change QoS settings for this channel. + def basic_publish(self, message, exchange, routing_key, **kwargs): + """Publish message.""" + message["properties"]["delivery_info"]["exchange"] = exchange + message["properties"]["delivery_info"]["routing_key"] = routing_key + message["properties"]["delivery_tag"] = self._next_delivery_tag() + for queue in self._lookup(exchange, routing_key): + self._put(queue, message, **kwargs) - Only `prefetch_count` is supported. + def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs): + """Consume from `queue`""" + self._tag_to_queue[consumer_tag] = queue - """ - self.qos.prefetch_count = prefetch_count + def _callback(raw_message): + message = self.Message(self, raw_message) + if not no_ack: + self.qos.append(message, message.delivery_tag) + return callback(message) + + self.connection._callbacks[queue] = _callback + self._consumers.add(consumer_tag) + + def basic_cancel(self, consumer_tag): + """Cancel consumer by consumer tag.""" + self._consumers.remove(consumer_tag) + queue = self._tag_to_queue.pop(consumer_tag, None) + if queue: + self.connection._callbacks.pop(queue, None) def basic_get(self, queue, **kwargs): """Get message by direct access (synchronous).""" @@ -334,33 +342,52 @@ class Channel(AbstractChannel): """Reject message.""" self.qos.reject(delivery_tag, requeue=requeue) - def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs): - """Start active consumer.""" - self._tag_to_queue[consumer_tag] = queue + def basic_qos(self, prefetch_size, prefetch_count, apply_global=False): + """Change QoS settings for this channel. - def _callback(raw_message): - message = self.Message(self, raw_message) - if not no_ack: - self.qos.append(message, message.delivery_tag) - return callback(message) + Only `prefetch_count` is supported. - self.connection._callbacks[queue] = _callback - self._consumers.add(consumer_tag) + """ + self.qos.prefetch_count = prefetch_count - def basic_publish(self, message, exchange, routing_key, **kwargs): - """Publish message.""" - message["properties"]["delivery_info"]["exchange"] = exchange - message["properties"]["delivery_info"]["routing_key"] = routing_key - message["properties"]["delivery_tag"] = self._next_delivery_tag() - for queue in self._lookup(exchange, routing_key): - self._put(queue, message, **kwargs) + def get_table(self, exchange): + """Get table of bindings for `exchange`.""" + return self.state.exchanges[exchange]["table"] - def basic_cancel(self, consumer_tag): - """Cancel consumer by consumer tag.""" - self._consumers.remove(consumer_tag) - queue = self._tag_to_queue.pop(consumer_tag, None) - if queue: - self.connection._callbacks.pop(queue, None) + def typeof(self, exchange): + """Get the exchange type instance for `exchange`.""" + type = self.state.exchanges[exchange]["type"] + return self.exchange_types[type] + + def _lookup(self, exchange, routing_key, default="ae.undeliver"): + """Find all queues matching `routing_key` for the given `exchange`. + + Returns `default` if no queues matched. + + """ + table = self.get_table(exchange) + try: + return self.typeof(exchange).lookup(table, exchange, + routing_key, default) + except KeyError: + self._new_queue(default) + return [default] + + def _restore(self, message): + """Redeliver message to its original destination.""" + delivery_info = message.delivery_info + message = message.serializable() + message["redelivered"] = True + for queue in self._lookup(delivery_info["exchange"], + delivery_info["routing_key"]): + self._put(queue, message) + + def drain_events(self, timeout=None): + if self._consumers and self.qos.can_consume(): + if hasattr(self, "_get_many"): + return self._get_many(self._active_queues, timeout=timeout) + return self._poll(self._active_queues) + raise Empty() def message_to_python(self, raw_message): """Convert raw message to :class:`Message` instance.""" @@ -383,6 +410,12 @@ class Channel(AbstractChannel): "properties": properties or {}} def flow(self, active=True): + """Enable/disable message flow. + + :raises NotImplementedError: as flow + is not implemented by the base virtual implementation. + + """ raise NotImplementedError("virtual channels does not support flow.") def close(self): @@ -415,9 +448,20 @@ class Transport(base.Transport): :param client: :class:`~kombu.connection.BrokerConnection` instance """ + #: channel class used. Channel = Channel + + #: cycle class used. Cycle = FairCycle + #: :class:`BrokerState` containing declared exchanges and + #: bindings (set by constructor). + state = None + + #: :class:`~kombu.transport.virtual.scheduling.FairCycle` instance + #: used to fairly drain events from channels (set by constructor). + cycle = None + #: default interval between polling channels for new events. interval = 1 diff --git a/kombu/transport/virtual/exchange.py b/kombu/transport/virtual/exchange.py index 975460c5..e3c1f836 100644 --- a/kombu/transport/virtual/exchange.py +++ b/kombu/transport/virtual/exchange.py @@ -1,13 +1,12 @@ """ - kombu.transport.virtual.exchange - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +kombu.transport.virtual.exchange +================================ - Implementations of the standard exchanges defined - by the AMQ protocol. Currently excluding the `headers` - exchange. +Implementations of the standard exchanges defined +by the AMQ protocol (excluding the `headers` exchange). - :copyright: (c) 2009 - 2010 by Ask Solem. - :license: BSD, see LICENSE for more details. +:copyright: (c) 2009 - 2010 by Ask Solem. +:license: BSD, see LICENSE for more details. """ import re @@ -37,7 +36,7 @@ class ExchangeType(object): raise NotImplementedError("subclass responsibility") def prepare_bind(self, queue, exchange, routing_key, arguments): - """:returns: ``(routing_key, regex, queue)`` tuple to store + """:returns: `(routing_key, regex, queue)` tuple to store for bindings to this exchange.""" return routing_key, None, queue @@ -63,7 +62,7 @@ class DirectExchange(ExchangeType): class TopicExchange(ExchangeType): - """The `topic` exchanges routes based on words separated by dots, and the + """The `topic` exchanges routes based on words separated by dots, and wildcard characters `*` (any single word), and `#` (one or more words).""" #: map of wildcard to regex conversions diff --git a/kombu/transport/virtual/scheduling.py b/kombu/transport/virtual/scheduling.py index ebe2f9a6..e3d4b01a 100644 --- a/kombu/transport/virtual/scheduling.py +++ b/kombu/transport/virtual/scheduling.py @@ -1,3 +1,13 @@ +""" + kombu.transport.virtual.scheduling + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Consumer utilities. + + :copyright: (c) 2009 - 2010 by Ask Solem. + :license: BSD, see LICENSE for more details. + +""" from itertools import count diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py index 24f967ca..f6752614 100644 --- a/kombu/utils/__init__.py +++ b/kombu/utils/__init__.py @@ -54,7 +54,7 @@ def repeatlast(it): yield the last value infinitely.""" for item in it: yield item - while 1: # pragma: no cover + while 1: # pragma: no cover yield item diff --git a/kombu/utils/functional.py b/kombu/utils/functional.py index a112b65a..85908944 100644 --- a/kombu/utils/functional.py +++ b/kombu/utils/functional.py @@ -65,12 +65,14 @@ # update_wrapper() and wraps() are tools to help write # wrapper functions that can handle naive introspection + def _compat_partial(fun, *args, **kwargs): """New function with partial application of the given arguments and keywords.""" def _curried(*addargs, **addkwargs): - return fun(*(args+addargs), **dict(kwargs, **addkwargs)) + return fun(*(args + addargs), **dict(kwargs, **addkwargs)) + return _curried @@ -81,6 +83,8 @@ except ImportError: WRAPPER_ASSIGNMENTS = ('__module__', '__name__', '__doc__') WRAPPER_UPDATES = ('__dict__',) + + def _compat_update_wrapper(wrapper, wrapped, assigned=WRAPPER_ASSIGNMENTS, updated=WRAPPER_UPDATES): """Update a wrapper function to look like the wrapped function @@ -98,7 +102,7 @@ def _compat_update_wrapper(wrapper, wrapped, assigned=WRAPPER_ASSIGNMENTS, for attr in assigned: try: setattr(wrapper, attr, getattr(wrapped, attr)) - except TypeError: # Python 2.3 doesn't allow assigning to __name__. + except TypeError: # Python 2.3 doesn't allow assigning to __name__. pass for attr in updated: getattr(wrapper, attr).update(getattr(wrapped, attr)) diff --git a/pavement.py b/pavement.py index 5f79ca7e..b0a418fd 100644 --- a/pavement.py +++ b/pavement.py @@ -6,6 +6,7 @@ options( sphinx=Bunch(builddir=".build"), ) + def sphinx_builddir(options): return path("docs") / options.sphinx.builddir / "html" |