summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.rst72
-rw-r--r--docs/_ext/literals_to_xrefs.py4
-rw-r--r--docs/reference/kombu.abstract.rst15
-rw-r--r--docs/reference/kombu.compat.rst41
-rw-r--r--docs/reference/kombu.compression.rst25
-rw-r--r--docs/reference/kombu.connection.rst66
-rw-r--r--docs/reference/kombu.entity.rst72
-rw-r--r--docs/reference/kombu.exceptions.rst20
-rw-r--r--docs/reference/kombu.messaging.rst52
-rw-r--r--docs/reference/kombu.pidbox.rst94
-rw-r--r--docs/reference/kombu.serialization.rst52
-rw-r--r--docs/reference/kombu.simple.rst94
-rw-r--r--docs/reference/kombu.transport.base.rst50
-rw-r--r--docs/reference/kombu.transport.memory.rst25
-rw-r--r--docs/reference/kombu.transport.pyamqplib.rst41
-rw-r--r--docs/reference/kombu.transport.pypika.rst47
-rw-r--r--docs/reference/kombu.transport.pyredis.rst25
-rw-r--r--docs/reference/kombu.transport.rst30
-rw-r--r--docs/reference/kombu.transport.virtual.exchange.rst36
-rw-r--r--docs/reference/kombu.transport.virtual.rst118
-rw-r--r--docs/reference/kombu.transport.virtual.scheduling.rst4
-rw-r--r--docs/userguide/connections.rst6
-rw-r--r--kombu/abstract.py16
-rw-r--r--kombu/compat.py48
-rw-r--r--kombu/compression.py38
-rw-r--r--kombu/connection.py57
-rw-r--r--kombu/entity.py129
-rw-r--r--kombu/exceptions.py16
-rw-r--r--kombu/messaging.py286
-rw-r--r--kombu/pidbox.py93
-rw-r--r--kombu/serialization.py156
-rw-r--r--kombu/simple.py39
-rw-r--r--kombu/tests/test_connection.py2
-rw-r--r--kombu/tests/test_messaging.py6
-rw-r--r--kombu/transport/__init__.py12
-rw-r--r--kombu/transport/base.py78
-rw-r--r--kombu/transport/memory.py10
-rw-r--r--kombu/transport/pyamqplib.py10
-rw-r--r--kombu/transport/pypika.py11
-rw-r--r--kombu/transport/pyredis.py10
-rw-r--r--kombu/transport/virtual/__init__.py196
-rw-r--r--kombu/transport/virtual/exchange.py17
-rw-r--r--kombu/transport/virtual/scheduling.py10
-rw-r--r--kombu/utils/__init__.py2
-rw-r--r--kombu/utils/functional.py8
-rw-r--r--pavement.py1
46 files changed, 1539 insertions, 701 deletions
diff --git a/README.rst b/README.rst
index 3b796254..6f92f9c4 100644
--- a/README.rst
+++ b/README.rst
@@ -64,11 +64,11 @@ Exchanges/Queue can be bound to a channel::
Introduction
------------
-``kombu`` is an `AMQP`_ messaging queue framework. AMQP is the Advanced Message
+`kombu` is an `AMQP`_ messaging queue framework. AMQP is the Advanced Message
Queuing Protocol, an open standard protocol for message orientation, queuing,
routing, reliability and security.
-The aim of ``kombu`` is to make messaging in Python as easy as possible by
+The aim of `kombu` is to make messaging in Python as easy as possible by
providing a high-level interface for producing and consuming messages. At the
same time it is a goal to re-use what is already available as much as possible.
@@ -81,7 +81,7 @@ Several AMQP message broker implementations exists, including `RabbitMQ`_,
`Apache ActiveMQ`_. You'll need to have one of these installed,
personally we've been using `RabbitMQ`_.
-Before you start playing with ``kombu``, you should probably read up on
+Before you start playing with `kombu`, you should probably read up on
AMQP, and you could start with the excellent article about using RabbitMQ
under Python, `Rabbits and warrens`_. For more detailed information, you can
refer to the `Wikipedia article about AMQP`_.
@@ -108,15 +108,15 @@ Kombu is using Sphinx, and the latest documentation is available at GitHub:
Installation
============
-You can install ``kombu`` either via the Python Package Index (PyPI)
+You can install `kombu` either via the Python Package Index (PyPI)
or from source.
-To install using ``pip``,::
+To install using `pip`,::
$ pip install kombu
-To install using ``easy_install``,::
+To install using `easy_install`,::
$ easy_install kombu
@@ -167,7 +167,7 @@ There are some concepts you should be familiar with before starting:
* Direct exchange
Matches if the routing key property of the message and
- the ``routing_key`` attribute of the consumer are identical.
+ the `routing_key` attribute of the consumer are identical.
* Fan-out exchange
@@ -178,12 +178,12 @@ There are some concepts you should be familiar with before starting:
Matches the routing key property of the message by a primitive
pattern matching scheme. The message routing key then consists
- of words separated by dots (``"."``, like domain names), and
- two special characters are available; star (``"*"``) and hash
- (``"#"``). The star matches any word, and the hash matches
- zero or more words. For example ``"*.stock.#"`` matches the
- routing keys ``"usd.stock"`` and ``"eur.stock.db"`` but not
- ``"stock.nasdaq"``.
+ of words separated by dots (`"."`, like domain names), and
+ two special characters are available; star (`"*"`) and hash
+ (`"#"`). The star matches any word, and the hash matches
+ zero or more words. For example `"*.stock.#"` matches the
+ routing keys `"usd.stock"` and `"eur.stock.db"` but not
+ `"stock.nasdaq"`.
Examples
@@ -193,7 +193,7 @@ Creating a connection
---------------------
You can set up a connection by creating an instance of
- ``kombu.BrokerConnection``, with the appropriate options for
+ `kombu.BrokerConnection`, with the appropriate options for
your broker:
>>> from kombu import BrokerConnection
@@ -207,8 +207,8 @@ Receiving messages using a Consumer
First we open up a Python shell and start a message consumer.
-This consumer declares a queue named ``"feed"``, receiving messages with
-the routing key ``"importer"`` from the ``"feed"`` exchange.
+This consumer declares a queue named `"feed"`, receiving messages with
+the routing key `"importer"` from the `"feed"` exchange.
>>> from kombu import Exchange, Queue, Consumer
@@ -258,7 +258,7 @@ Serialization of Data
By default every message is encoded using `JSON`_, so sending
Python data structures like dictionaries and lists works.
-`YAML`_, `msgpack`_ and Python's built-in ``pickle`` module is also supported,
+`YAML`_, `msgpack`_ and Python's built-in `pickle` module is also supported,
and if needed you can register any custom serialization scheme you
want to use.
@@ -268,12 +268,12 @@ want to use.
Each option has its advantages and disadvantages.
-``json`` -- JSON is supported in many programming languages, is now
+`json` -- JSON is supported in many programming languages, is now
a standard part of Python (since 2.6), and is fairly fast to
- decode using the modern Python libraries such as ``cjson or
- ``simplejson``.
+ decode using the modern Python libraries such as `cjson` or
+ `simplejson`.
- The primary disadvantage to ``JSON`` is that it limits you to
+ The primary disadvantage to `JSON` is that it limits you to
the following data types: strings, unicode, floats, boolean,
dictionaries, and lists. Decimals and dates are notably missing.
@@ -282,16 +282,16 @@ Each option has its advantages and disadvantages.
encoding which supports native binary types.
However, if your data fits inside the above constraints and
- you need cross-language support, the default setting of ``JSON``
+ you need cross-language support, the default setting of `JSON`
is probably your best choice.
-``pickle`` -- If you have no desire to support any language other than
- Python, then using the ``pickle`` encoding will gain you
+`pickle` -- If you have no desire to support any language other than
+ Python, then using the `pickle` encoding will gain you
the support of all built-in Python data types (except class instances),
smaller messages when sending binary files, and a slight speedup
- over ``JSON`` processing.
+ over `JSON` processing.
-``yaml`` -- YAML has many of the same characteristics as ``json``,
+`yaml` -- YAML has many of the same characteristics as `json`,
except that it natively supports more data types (including dates,
recursive references, etc.)
@@ -299,7 +299,7 @@ Each option has its advantages and disadvantages.
than the libraries for JSON.
If you need a more expressive set of data types and need to maintain
- cross-language compatibility, then ``YAML`` may be a better fit
+ cross-language compatibility, then `YAML` may be a better fit
than the above.
To instruct carrot to use an alternate serialization method,
@@ -316,7 +316,7 @@ use one of the following options.
>>> producer.publish(message, routing_key=rkey,
... serializer="pickle")
-Note that a ``Consumer`` do not need the serialization method specified.
+Note that a `Consumer` do not need the serialization method specified.
They can auto-detect the serialization method as the
content-type is sent as a message header.
@@ -327,7 +327,7 @@ In some cases, you don't need your message data to be serialized. If you
pass in a plain string or unicode object as your message, then carrot will
not waste cycles serializing/deserializing the data.
-You can optionally specify a ``content_type`` and ``content_encoding``
+You can optionally specify a `content_type` and `content_encoding`
for the raw data:
>>> producer.send(open('~/my_picture.jpg','rb').read(),
@@ -335,15 +335,15 @@ for the raw data:
content_encoding="binary",
routing_key=rkey)
-The ``Message`` object returned by the ``Consumer`` class will have a
-``content_type`` and ``content_encoding`` attribute.
+The `Message` object returned by the `Consumer` class will have a
+`content_type` and `content_encoding` attribute.
Receiving messages without a callback
--------------------------------------
-You can also poll the queue manually, by using the ``get`` method.
-This method returns a ``Message`` object, from where you can get the
+You can also poll the queue manually, by using the `get` method.
+This method returns a `Message` object, from where you can get the
message body, de-serialize the body to get the data, acknowledge, reject or
re-queue the message.
@@ -359,7 +359,7 @@ re-queue the message.
Sub-classing the messaging classes
----------------------------------
-The ``Consumer``, and ``Producer`` classes can also be sub classed. Thus you
+The `Consumer`, and `Producer` classes can also be sub classed. Thus you
can define the above producer and consumer like so:
>>> class FeedProducer(Producer):
@@ -409,7 +409,7 @@ to our issue tracker at http://github.com/ask/kombu/issues/
Contributing
============
-Development of ``kombu`` happens at Github: http://github.com/ask/kombu
+Development of `kombu` happens at Github: http://github.com/ask/kombu
You are highly encouraged to participate in the development. If you don't
like Github (for some reason) you're welcome to send regular patches.
@@ -417,5 +417,5 @@ like Github (for some reason) you're welcome to send regular patches.
License
=======
-This software is licensed under the ``New BSD License``. See the ``LICENSE``
+This software is licensed under the `New BSD License`. See the :file:`LICENSE`
file in the top distribution directory for the full license text.
diff --git a/docs/_ext/literals_to_xrefs.py b/docs/_ext/literals_to_xrefs.py
index 9a220028..e9dc7ca0 100644
--- a/docs/_ext/literals_to_xrefs.py
+++ b/docs/_ext/literals_to_xrefs.py
@@ -68,8 +68,8 @@ def fixliterals(fname):
new.append(m.group(0))
continue
- sys.stdout.write("\n"+"-"*80+"\n")
- sys.stdout.write(data[prev_start+1:m.start()])
+ sys.stdout.write("\n" + "-" * 80 + "\n")
+ sys.stdout.write(data[prev_start + 1:m.start()])
sys.stdout.write(colorize(m.group(0), fg="red"))
sys.stdout.write(data[m.end():next_end])
sys.stdout.write("\n\n")
diff --git a/docs/reference/kombu.abstract.rst b/docs/reference/kombu.abstract.rst
index bf9334d7..0669a5a8 100644
--- a/docs/reference/kombu.abstract.rst
+++ b/docs/reference/kombu.abstract.rst
@@ -1,11 +1,10 @@
-==========================================================
- Object Utilities - kombu.abstract
-==========================================================
-
-.. contents::
- :local:
.. currentmodule:: kombu.abstract
.. automodule:: kombu.abstract
- :members:
- :undoc-members:
+
+ .. contents::
+ :local:
+
+ .. autoclass:: MaybeChannelBound
+ :members:
+ :undoc-members:
diff --git a/docs/reference/kombu.compat.rst b/docs/reference/kombu.compat.rst
index 860a0ff6..ffb4793f 100644
--- a/docs/reference/kombu.compat.rst
+++ b/docs/reference/kombu.compat.rst
@@ -1,11 +1,36 @@
-==========================================================
- Carrot Compatible API - kombu.compat
-==========================================================
-
-.. contents::
- :local:
.. currentmodule:: kombu.compat
.. automodule:: kombu.compat
- :members:
- :undoc-members:
+
+ .. contents::
+ :local:
+
+ Publisher
+ ---------
+
+ Replace with :class:`kombu.messaging.Producer`.
+
+ .. autoclass:: Publisher
+ :members:
+ :undoc-members:
+ :inherited-members:
+
+ Consumer
+ --------
+
+ Replace with :class:`kombu.messaging.Consumer`.
+
+ .. autoclass:: Consumer
+ :members:
+ :undoc-members:
+ :inherited-members:
+
+ ConsumerSet
+ -----------
+
+ Replace with :class:`kombu.messaging.Consumer`.
+
+ .. autoclass:: ConsumerSet
+ :members:
+ :undoc-members:
+ :inherited-members:
diff --git a/docs/reference/kombu.compression.rst b/docs/reference/kombu.compression.rst
index 446889c2..c7748577 100644
--- a/docs/reference/kombu.compression.rst
+++ b/docs/reference/kombu.compression.rst
@@ -1,11 +1,20 @@
-==========================================================
- Compression - kombu.compression
-==========================================================
-
-.. contents::
- :local:
.. currentmodule:: kombu.compression
.. automodule:: kombu.compression
- :members:
- :undoc-members:
+
+ .. contents::
+ :local:
+
+ Encoding/decoding
+ -----------------
+
+ .. autofunction:: compress
+ .. autofunction:: decompress
+
+ Registry
+ --------
+
+ .. autofunction:: encoders
+ .. autofunction:: get_encoder
+ .. autofunction:: get_decoder
+ .. autofunction:: register
diff --git a/docs/reference/kombu.connection.rst b/docs/reference/kombu.connection.rst
index 9313c172..511295bc 100644
--- a/docs/reference/kombu.connection.rst
+++ b/docs/reference/kombu.connection.rst
@@ -1,11 +1,63 @@
-==========================================================
- Broker Connections - kombu.connection
-==========================================================
-.. contents::
- :local:
+
.. currentmodule:: kombu.connection
.. automodule:: kombu.connection
- :members:
- :undoc-members:
+
+ .. contents::
+ :local:
+
+ Connection
+ ----------
+
+ .. autoclass:: BrokerConnection
+
+ **ATTRIBUTES**
+
+ .. autoattribute:: connection_errors
+ .. autoattribute:: channel_errors
+ .. autoattribute:: transport
+ .. autoattribute:: host
+ .. autoattribute:: connection
+
+ **METHODS**
+
+ .. automethod:: connect
+ .. automethod:: channel
+ .. automethod:: drain_events
+ .. automethod:: release
+ .. automethod:: ensure_connection
+ .. automethod:: ensure
+ .. automethod:: create_transport
+ .. automethod:: get_transport_cls
+ .. automethod:: clone
+ .. automethod:: info
+
+ .. automethod:: Pool
+ .. automethod:: ChannelPool
+ .. automethod:: SimpleQueue
+ .. automethod:: SimpleBuffer
+
+
+ Pools
+ -----
+
+ .. seealso::
+
+ The shortcut methods :meth:`BrokerConnection.Pool` and
+ :meth:`BrokerConnection.ChannelPool` is the recommended way
+ to instantiate these classes.
+
+ .. autoclass:: ConnectionPool
+
+ .. autoattribute:: LimitExceeded
+
+ .. automethod:: acquire
+ .. automethod:: release
+
+ .. autoclass:: ChannelPool
+
+ .. autoattribute:: LimitExceeded
+
+ .. automethod:: acquire
+ .. automethod:: release
diff --git a/docs/reference/kombu.entity.rst b/docs/reference/kombu.entity.rst
index 2bb6f72a..79da6ea4 100644
--- a/docs/reference/kombu.entity.rst
+++ b/docs/reference/kombu.entity.rst
@@ -1,11 +1,67 @@
-==========================================================
- AMQP Entities - kombu.entity
-==========================================================
-
-.. contents::
- :local:
.. currentmodule:: kombu.entity
.. automodule:: kombu.entity
- :members:
- :undoc-members:
+
+ .. contents::
+ :local:
+
+ Exchange
+ --------
+
+ Example creating an exchange declaration::
+
+ >>> news_exchange = Exchange("news", type="topic")
+
+ For now `news_exchange` is just a declaration, you can't perform
+ actions on it. It just describes the name and options for the exchange.
+
+ The exchange can be bound or unbound. Bound means the exchange is
+ associated with a channel and operations can be performed on it.
+ To bind the exchange you call the exchange with the channel as argument::
+
+ >>> bound_exchange = news_exchange(channel)
+
+ Now you can perform operations like :meth:`declare` or :meth:`delete`::
+
+ >>> bound_exchange.declare()
+ >>> message = bound_exchange.Message("Cure for cancer found!")
+ >>> bound_exchange.publish(message, routing_key="news.science")
+ >>> bound_exchange.delete()
+
+ .. autoclass:: Exchange
+ :members:
+ :undoc-members:
+
+ .. automethod:: maybe_bind
+
+ Queue
+ -----
+
+ Example creating a queue using our exchange in the :class:`Exchange`
+ example::
+
+ >>> science_news = Queue("science_news",
+ ... exchange=news_exchange,
+ ... routing_key="news.science")
+
+ For now `science_news` is just a declaration, you can't perform
+ actions on it. It just describes the name and options for the queue.
+
+ The queue can be bound or unbound. Bound means the queue is
+ associated with a channel and operations can be performed on it.
+ To bind the queue you call the queue instance with the channel as
+ an argument::
+
+ >>> bound_science_news = science_news(channel)
+
+ Now you can perform operations like :meth:`declare` or :meth:`purge`::
+
+ >>> bound_sicence_news.declare()
+ >>> bound_science_news.purge()
+ >>> bound_science_news.delete()
+
+ .. autoclass:: Queue
+ :members:
+ :undoc-members:
+
+ .. automethod:: maybe_bind
diff --git a/docs/reference/kombu.exceptions.rst b/docs/reference/kombu.exceptions.rst
index 9bf2cec5..d0b4bd50 100644
--- a/docs/reference/kombu.exceptions.rst
+++ b/docs/reference/kombu.exceptions.rst
@@ -1,11 +1,15 @@
-==========================================================
- Exceptions - kombu.exceptions
-==========================================================
-
-.. contents::
- :local:
.. currentmodule:: kombu.exceptions
.. automodule:: kombu.exceptions
- :members:
- :undoc-members:
+
+ .. contents::
+ :local:
+
+ .. autoexception:: NotBoundError
+ .. autoexception:: MessageStateError
+ .. autoexception:: EnsureExhausted
+ .. autoexception:: TimeoutError
+ .. autoexception:: LimitExceeded
+ .. autoexception:: ConnectionLimitExceeded
+ .. autoexception:: ChannelLimitExceeded
+
diff --git a/docs/reference/kombu.messaging.rst b/docs/reference/kombu.messaging.rst
index cf43440b..9b68d128 100644
--- a/docs/reference/kombu.messaging.rst
+++ b/docs/reference/kombu.messaging.rst
@@ -1,11 +1,47 @@
-==========================================================
- Messaging - kombu.messaging
-==========================================================
-
-.. contents::
- :local:
.. currentmodule:: kombu.messaging
.. automodule:: kombu.messaging
- :members:
- :undoc-members:
+
+ .. contents::
+ :local:
+
+ Message Producer
+ ----------------
+
+ .. autoclass:: Producer
+
+ .. autoattribute:: channel
+ .. autoattribute:: exchange
+ .. autoattribute:: routing_key
+ .. autoattribute:: serializer
+ .. autoattribute:: compression
+ .. autoattribute:: auto_declare
+ .. autoattribute:: on_return
+
+ .. automethod:: declare
+ .. automethod:: publish
+ .. automethod:: revive
+
+ Message Consumer
+ ----------------
+
+ .. autoclass:: Consumer
+
+ .. autoattribute:: channel
+ .. autoattribute:: queues
+ .. autoattribute:: no_ack
+ .. autoattribute:: auto_declare
+ .. autoattribute:: callbacks
+ .. autoattribute:: on_decode_error
+
+ .. automethod:: declare
+ .. automethod:: register_callback
+ .. automethod:: consume
+ .. automethod:: cancel
+ .. automethod:: cancel_by_queue
+ .. automethod:: purge
+ .. automethod:: flow
+ .. automethod:: qos
+ .. automethod:: recover
+ .. automethod:: receive
+ .. automethod:: revive
diff --git a/docs/reference/kombu.pidbox.rst b/docs/reference/kombu.pidbox.rst
index 1a8dba5f..f47bfd4c 100644
--- a/docs/reference/kombu.pidbox.rst
+++ b/docs/reference/kombu.pidbox.rst
@@ -1,11 +1,89 @@
-================================
- Process Mailbox - kombu.pidbox
-================================
-
-.. contents::
- :local:
.. currentmodule:: kombu.pidbox
.. automodule:: kombu.pidbox
- :members:
- :undoc-members:
+
+ .. contents::
+ :local:
+
+ Introduction
+ ------------
+
+ Creating the applications Mailbox
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ .. code-block:: python
+
+ >>> mailbox = pidbox.Mailbox("celerybeat", type="direct")
+
+ >>> @mailbox.handler
+ >>> def reload_schedule(state, **kwargs):
+ ... state["beat"].reload_schedule()
+
+ >>> @mailbox.handler
+ >>> def connection_info(state, **kwargs):
+ ... return {"connection": state["connection"].info()}
+
+ Example Node
+ ~~~~~~~~~~~~
+
+ .. code-block:: python
+
+ >>> connection = kombu.BrokerConnection()
+ >>> state = {"beat": beat,
+ "connection": connection}
+ >>> consumer = mailbox(connection).Node(hostname).listen()
+ >>> try:
+ ... while True:
+ ... connection.drain_events(timeout=1)
+ ... finally:
+ ... consumer.cancel()
+
+ Example Client
+ ~~~~~~~~~~~~~~
+
+ .. code-block:: python
+
+ >>> mailbox.cast("reload_schedule") # cast is async.
+ >>> info = celerybeat.call("connection_info", timeout=1)
+
+ Mailbox
+ -------
+
+ .. autoclass:: Mailbox
+
+ .. autoattribute:: namespace
+ .. autoattribute:: connection
+ .. autoattribute:: type
+ .. autoattribute:: exchange
+ .. autoattribute:: reply_exchange
+
+ .. automethod:: Node
+ .. automethod:: call
+ .. automethod:: cast
+ .. automethod:: abcast
+ .. automethod:: multi_call
+ .. automethod:: get_reply_queue
+ .. automethod:: get_queue
+
+ Node
+ ----
+
+ .. autoclass:: Node
+
+ .. autoattribute:: hostname
+ .. autoattribute:: mailbox
+ .. autoattribute:: handlers
+ .. autoattribute:: state
+ .. autoattribute:: channel
+
+ .. automethod:: Consumer
+ .. automethod:: handler
+ .. automethod:: listen
+ .. automethod:: dispatch
+ .. automethod:: dispatch_from_message
+ .. automethod:: handle_call
+ .. automethod:: handle_cast
+ .. automethod:: handle
+ .. automethod:: handle_message
+ .. automethod:: reply
+
diff --git a/docs/reference/kombu.serialization.rst b/docs/reference/kombu.serialization.rst
index 8e21998a..7ed56ff9 100644
--- a/docs/reference/kombu.serialization.rst
+++ b/docs/reference/kombu.serialization.rst
@@ -1,11 +1,47 @@
-==========================================================
- Serialization - kombu.serialization
-==========================================================
-
-.. contents::
- :local:
.. currentmodule:: kombu.serialization
.. automodule:: kombu.serialization
- :members:
- :undoc-members:
+
+ .. contents::
+ :local:
+
+ Overview
+ --------
+
+ Centralized support for encoding/decoding of data structures.
+ Contains json, pickle, msgpack, and yaml serializers.
+
+ Optionally installs support for YAML if the `PyYAML`_ package
+ is installed.
+
+ Optionally installs support for `msgpack`_ if the `msgpack-python`_
+ package is installed.
+
+
+ Exceptions
+ ----------
+
+ .. autoexception:: SerializerNotInstalled
+
+ Serialization
+ -------------
+
+ .. autofunction:: encode
+
+ .. autofunction:: decode
+
+ .. autofunction:: raw_encode
+
+ Registry
+ --------
+
+ .. autofunction:: register
+
+ .. autodata:: registry
+
+.. _`cjson`: http://pypi.python.org/pypi/python-cjson/
+.. _`simplejson`: http://code.google.com/p/simplejson/
+.. _`Python 2.6+`: http://docs.python.org/library/json.html
+.. _`PyYAML`: http://pyyaml.org/
+.. _`msgpack`: http://msgpack.sourceforge.net/
+.. _`msgpack-python`: http://pypi.python.org/pypi/msgpack-python/
diff --git a/docs/reference/kombu.simple.rst b/docs/reference/kombu.simple.rst
index 95ecd2de..b981f6dd 100644
--- a/docs/reference/kombu.simple.rst
+++ b/docs/reference/kombu.simple.rst
@@ -1,11 +1,89 @@
-==========================================================
- Simple Interface - kombu.simple
-==========================================================
-
-.. contents::
- :local:
.. currentmodule:: kombu.simple
.. automodule:: kombu.simple
- :members:
- :undoc-members:
+
+ .. contents::
+ :local:
+
+ Persistent
+ ----------
+
+ .. autoclass:: SimpleQueue
+
+ .. attribute:: channel
+
+ Current channel
+
+ .. attribute:: producer
+
+ :class:`~kombu.messaging.Producer` used to publish messages.
+
+ .. attribute:: consumer
+
+ :class:`~kombu.messaging.Consumer` used to receive messages.
+
+ .. attribute:: no_ack
+
+ flag to enable/disable acknowledgements.
+
+ .. attribute:: queue
+
+ :class:`~kombu.entity.Queue` to consume from (if consuming).
+
+ .. attribute:: queue_opts
+
+ Additional options for the queue declaration.
+
+ .. attribute:: exchange_opts
+
+ Additional options for the exchange declaration.
+
+ .. automethod:: get
+ .. automethod:: get_nowait
+ .. automethod:: put
+ .. automethod:: clear
+ .. automethod:: __len__
+ .. automethod:: qsize
+ .. automethod:: close
+
+ Buffer
+ ------
+
+ .. autoclass:: SimpleBuffer
+
+ .. attribute:: channel
+
+ Current channel
+
+ .. attribute:: producer
+
+ :class:`~kombu.messaging.Producer` used to publish messages.
+
+ .. attribute:: consumer
+
+ :class:`~kombu.messaging.Consumer` used to receive messages.
+
+ .. attribute:: no_ack
+
+ flag to enable/disable acknowledgements.
+
+ .. attribute:: queue
+
+ :class:`~kombu.entity.Queue` to consume from (if consuming).
+
+ .. attribute:: queue_opts
+
+ Additional options for the queue declaration.
+
+ .. attribute:: exchange_opts
+
+ Additional options for the exchange declaration.
+
+ .. automethod:: get
+ .. automethod:: get_nowait
+ .. automethod:: put
+ .. automethod:: clear
+ .. automethod:: __len__
+ .. automethod:: qsize
+ .. automethod:: close
+
diff --git a/docs/reference/kombu.transport.base.rst b/docs/reference/kombu.transport.base.rst
index fef20eaa..58490138 100644
--- a/docs/reference/kombu.transport.base.rst
+++ b/docs/reference/kombu.transport.base.rst
@@ -1,11 +1,45 @@
-==========================================================
- Base Transport Interface - kombu.transport.base
-==========================================================
-
-.. contents::
- :local:
.. currentmodule:: kombu.transport.base
.. automodule:: kombu.transport.base
- :members:
- :undoc-members:
+
+ .. contents::
+ :local:
+
+ Message
+ -------
+
+ .. autoclass:: Message
+
+ .. autoattribute:: payload
+ .. autoattribute:: channel
+ .. autoattribute:: delivery_tag
+ .. autoattribute:: content_type
+ .. autoattribute:: content_encoding
+ .. autoattribute:: delivery_info
+ .. autoattribute:: headers
+ .. autoattribute:: properties
+ .. autoattribute:: body
+ .. autoattribute:: acknowledged
+
+ .. automethod:: ack
+ .. automethod:: reject
+ .. automethod:: requeue
+ .. automethod:: decode
+
+ Transport
+ ---------
+
+ .. autoclass:: Transport
+
+ .. autoattribute:: client
+ .. autoattribute:: default_port
+ .. autoattribute:: connection_errors
+ .. autoattribute:: channel_errors
+
+ .. automethod:: establish_connection
+ .. automethod:: close_connection
+ .. automethod:: create_channel
+ .. automethod:: close_channel
+ .. automethod:: drain_events
+
+
diff --git a/docs/reference/kombu.transport.memory.rst b/docs/reference/kombu.transport.memory.rst
index e8ba5b33..c712b135 100644
--- a/docs/reference/kombu.transport.memory.rst
+++ b/docs/reference/kombu.transport.memory.rst
@@ -1,11 +1,20 @@
-==========================================================
- In-memory Transport - kombu.transport.memory
-==========================================================
-
-.. contents::
- :local:
.. currentmodule:: kombu.transport.memory
.. automodule:: kombu.transport.memory
- :members:
- :undoc-members:
+
+ .. contents::
+ :local:
+
+ Transport
+ ---------
+
+ .. autoclass:: Transport
+ :members:
+ :undoc-members:
+
+ Channel
+ -------
+
+ .. autoclass:: Channel
+ :members:
+ :undoc-members:
diff --git a/docs/reference/kombu.transport.pyamqplib.rst b/docs/reference/kombu.transport.pyamqplib.rst
index 9dbbe516..ab35d156 100644
--- a/docs/reference/kombu.transport.pyamqplib.rst
+++ b/docs/reference/kombu.transport.pyamqplib.rst
@@ -1,11 +1,36 @@
-==========================================================
- amqplib Transport - kombu.transport.pyamqplib
-==========================================================
-
-.. contents::
- :local:
.. currentmodule:: kombu.transport.pyamqplib
.. automodule:: kombu.transport.pyamqplib
- :members:
- :undoc-members:
+
+ .. contents::
+ :local:
+
+ Transport
+ ---------
+
+ .. autoclass:: Transport
+ :members:
+ :undoc-members:
+
+ Connection
+ ----------
+
+ .. autoclass:: Connection
+ :members:
+ :undoc-members:
+ :inherited-members:
+
+ Channel
+ -------
+
+ .. autoclass:: Channel
+ :members:
+ :undoc-members:
+
+ Message
+ -------
+
+ .. autoclass:: Message
+ :members:
+ :undoc-members:
+
diff --git a/docs/reference/kombu.transport.pypika.rst b/docs/reference/kombu.transport.pypika.rst
index a7a0c8c0..2c9af997 100644
--- a/docs/reference/kombu.transport.pypika.rst
+++ b/docs/reference/kombu.transport.pypika.rst
@@ -1,11 +1,42 @@
-==========================================================
- pika Transport - kombu.transport.pypika
-==========================================================
-
-.. contents::
- :local:
.. currentmodule:: kombu.transport.pypika
.. automodule:: kombu.transport.pypika
- :members:
- :undoc-members:
+
+ .. contents::
+ :local:
+
+ Transports
+ ----------
+
+ .. autoclass:: AsyncoreTransport
+ :members:
+ :undoc-members:
+
+ .. autoclass:: SyncTransport
+ :members:
+ :undoc-members:
+
+ Connections
+ -----------
+
+ .. autoclass:: AsyncoreConnection
+ :members:
+ :undoc-members:
+
+ .. autoclass:: BlockingConnection
+ :members:
+ :undoc-members:
+
+ Channel
+ -------
+
+ .. autoclass:: Channel
+ :members:
+ :undoc-members:
+
+ Message
+ -------
+
+ .. autoclass:: Message
+ :members:
+ :undoc-members:
diff --git a/docs/reference/kombu.transport.pyredis.rst b/docs/reference/kombu.transport.pyredis.rst
index 0e2b489b..8e7e564c 100644
--- a/docs/reference/kombu.transport.pyredis.rst
+++ b/docs/reference/kombu.transport.pyredis.rst
@@ -1,11 +1,20 @@
-==========================================================
- Redis Transport - kombu.transport.pyredis
-==========================================================
-
-.. contents::
- :local:
.. currentmodule:: kombu.transport.pyredis
.. automodule:: kombu.transport.pyredis
- :members:
- :undoc-members:
+
+ .. contents::
+ :local:
+
+ Transport
+ ---------
+
+ .. autoclass:: Transport
+ :members:
+ :undoc-members:
+
+ Channel
+ -------
+
+ .. autoclass:: Channel
+ :members:
+ :undoc-members:
diff --git a/docs/reference/kombu.transport.rst b/docs/reference/kombu.transport.rst
index 6c9c209c..51dd7ec3 100644
--- a/docs/reference/kombu.transport.rst
+++ b/docs/reference/kombu.transport.rst
@@ -1,11 +1,23 @@
-==========================================================
- Transports - kombu.transport
-==========================================================
-
-.. contents::
- :local:
.. currentmodule:: kombu.transport
-.. automodule:: kombu.transport
- :members:
- :undoc-members:
+.. automodule:: kombu.transpor
+
+ .. contents::
+ :local:
+
+ Data
+ ----
+
+ .. data:: DEFAULT_TRANSPORT
+
+ Default transport used when no transport specified.
+
+ .. data:: TRANSPORT_ALIASES
+
+ Mapping of transport aliases/class names.
+
+ Functions
+ ---------
+
+ .. autofunction:: get_transport_cls
+ .. autofunction:: resolve_transport
diff --git a/docs/reference/kombu.transport.virtual.exchange.rst b/docs/reference/kombu.transport.virtual.exchange.rst
index c775a463..220b0176 100644
--- a/docs/reference/kombu.transport.virtual.exchange.rst
+++ b/docs/reference/kombu.transport.virtual.exchange.rst
@@ -1,7 +1,35 @@
-.. contents::
- :local:
.. currentmodule:: kombu.transport.virtual.exchange
.. automodule:: kombu.transport.virtual.exchange
- :members:
- :undoc-members:
+
+ .. contents::
+ :local:
+
+ Direct
+ ------
+
+ .. autoclass:: DirectExchange
+ :members:
+ :undoc-members:
+
+ Topic
+ -----
+
+ .. autoclass:: TopicExchange
+ :members:
+ :undoc-members:
+
+ Fanout
+ ------
+
+ .. autoclass:: FanoutExchange
+ :members:
+ :undoc-members:
+
+ Interface
+ ---------
+
+ .. autoclass:: ExchangeType
+ :members:
+ :undoc-members:
+
diff --git a/docs/reference/kombu.transport.virtual.rst b/docs/reference/kombu.transport.virtual.rst
index d7f5381e..8e60977a 100644
--- a/docs/reference/kombu.transport.virtual.rst
+++ b/docs/reference/kombu.transport.virtual.rst
@@ -1,7 +1,117 @@
-.. contents::
- :local:
.. currentmodule:: kombu.transport.virtual
.. automodule:: kombu.transport.virtual
- :members:
- :undoc-members:
+
+ .. contents::
+ :local:
+
+ Transports
+ ----------
+
+ .. autoclass:: Transport
+
+ .. autoattribute:: Channel
+
+ .. autoattribute:: Cycle
+
+ .. autoattribute:: interval
+
+ .. autoattribute:: default_port
+
+ .. autoattribute:: state
+
+ .. autoattribute:: cycle
+
+ .. automethod:: establish_connection
+
+ .. automethod:: close_connection
+
+ .. automethod:: create_channel
+
+ .. automethod:: close_channel
+
+ .. automethod:: drain_events
+
+ Channel
+ -------
+
+ .. autoclass:: AbstractChannel
+ :members:
+
+ .. autoclass:: Channel
+
+ .. autoattribute:: Message
+
+ .. autoattribute:: state
+
+ .. autoattribute:: qos
+
+ .. autoattribute:: do_restore
+
+ .. autoattribute:: exchange_types
+
+ .. automethod:: exchange_declare
+
+ .. automethod:: exchange_delete
+
+ .. automethod:: queue_declare
+
+ .. automethod:: queue_delete
+
+ .. automethod:: queue_bind
+
+ .. automethod:: queue_purge
+
+ .. automethod:: basic_publish
+
+ .. automethod:: basic_consume
+
+ .. automethod:: basic_cancel
+
+ .. automethod:: basic_get
+
+ .. automethod:: basic_ack
+
+ .. automethod:: basic_recover
+
+ .. automethod:: basic_reject
+
+ .. automethod:: basic_qos
+
+ .. automethod:: get_table
+
+ .. automethod:: typeof
+
+ .. automethod:: drain_events
+
+ .. automethod:: prepare_message
+
+ .. automethod:: message_to_python
+
+ .. automethod:: flow
+
+ .. automethod:: close
+
+ Message
+ -------
+
+ .. autoclass:: Message
+ :members:
+ :undoc-members:
+ :inherited-members:
+
+ Quality Of Service
+ ------------------
+
+ .. autoclass:: QoS
+ :members:
+ :undoc-members:
+ :inherited-members:
+
+ In-memory State
+ ---------------
+
+ .. autoclass:: BrokerState
+ :members:
+ :undoc-members:
+ :inherited-members:
diff --git a/docs/reference/kombu.transport.virtual.scheduling.rst b/docs/reference/kombu.transport.virtual.scheduling.rst
index b59994ab..5eca4b91 100644
--- a/docs/reference/kombu.transport.virtual.scheduling.rst
+++ b/docs/reference/kombu.transport.virtual.scheduling.rst
@@ -1,7 +1,7 @@
.. contents::
:local:
-.. currentmodule:: kombu.transport.scheduling
+.. currentmodule:: kombu.transport.virtual.scheduling
-.. automodule:: kombu.transport.scheduling
+.. automodule:: kombu.transport.virtual.scheduling
:members:
:undoc-members:
diff --git a/docs/userguide/connections.rst b/docs/userguide/connections.rst
index 37e516c3..7c495cfa 100644
--- a/docs/userguide/connections.rst
+++ b/docs/userguide/connections.rst
@@ -19,8 +19,8 @@ method::
>>> connection.connect()
This connection will use the default connection settings, which is using
-the localhost host, default port, username ``guest``,
-password ``guest`` and virtual host "/". A connection without arguments
+the localhost host, default port, username `guest`,
+password `guest` and virtual host "/". A connection without arguments
is the same as::
>>> BrokerConnection(hostname="localhost",
@@ -32,7 +32,7 @@ is the same as::
The default port is transport specific, for AMQP this is 6379.
Other fields may also have different meaning depending on the transport
-used. For example, the Redis transport uses the ``virtual_host`` argument as
+used. For example, the Redis transport uses the `virtual_host` argument as
the redis database number.
See the :class:`~kombu.connection.BrokerConnection` reference documentation
diff --git a/kombu/abstract.py b/kombu/abstract.py
index 50c70011..5680c4f4 100644
--- a/kombu/abstract.py
+++ b/kombu/abstract.py
@@ -1,9 +1,21 @@
+"""
+kombu.compression
+=================
+
+Object utilities.
+
+:copyright: (c) 2009 - 2010 by Ask Solem.
+:license: BSD, see LICENSE for more details.
+
+"""
from copy import copy
from kombu.exceptions import NotBoundError
class Object(object):
+ """Common baseclass supporting automatic kwargs->attributes handling,
+ and cloning."""
attrs = ()
def __init__(self, *args, **kwargs):
@@ -29,6 +41,7 @@ class MaybeChannelBound(Object):
_is_bound = False
def __call__(self, channel):
+ """`self(channel) -> self.bind(channel)`"""
return self.bind(channel)
def bind(self, channel):
@@ -65,11 +78,12 @@ class MaybeChannelBound(Object):
@property
def is_bound(self):
- """Returns ``True`` if the entity is bound."""
+ """Flag set if the channel is bound."""
return self._is_bound and self._channel is not None
@property
def channel(self):
+ """Current channel if the object is bound."""
if self._channel is None:
raise NotBoundError(
"Can't call method on %s not bound to a channel" % (
diff --git a/kombu/compat.py b/kombu/compat.py
index cef10af6..a8dfc6c7 100644
--- a/kombu/compat.py
+++ b/kombu/compat.py
@@ -1,10 +1,22 @@
+"""
+kombu.compat
+============
+
+Carrot compatible interface for :class:`Publisher` and :class:`Producer`.
+
+See http://packages.python.org/pypi/carrot for documentation.
+
+:copyright: (c) 2009 - 2010 by Ask Solem.
+:license: BSD, see LICENSE for more details.
+
+"""
from itertools import count
from kombu import entity
from kombu import messaging
-def iterconsume(connection, consumer, no_ack=False, limit=None):
+def _iterconsume(connection, consumer, no_ack=False, limit=None):
consumer.consume(no_ack=no_ack)
for iteration in count(0):
if limit and iteration >= limit:
@@ -161,7 +173,7 @@ class Consumer(messaging.Consumer):
return self.purge()
def iterconsume(self, limit=None, no_ack=None):
- return iterconsume(self.connection, self, no_ack, limit)
+ return _iterconsume(self.connection, self, no_ack, limit)
def wait(self, limit=None):
it = self.iterconsume(limit)
@@ -176,15 +188,25 @@ class Consumer(messaging.Consumer):
yield item
-class _CSet(messaging.Consumer):
+class ConsumerSet(messaging.Consumer):
- def __init__(self, connection, *args, **kwargs):
+ def __init__(self, connection, from_dict=None, consumers=None,
+ callbacks=None, **kwargs):
self.connection = connection
self.backend = connection.channel()
- super(_CSet, self).__init__(self.backend, *args, **kwargs)
+
+ queues = []
+ if consumers:
+ for consumer in consumers:
+ map(queues.extend, consumer.queues)
+ if from_dict:
+ for queue_name, queue_options in from_dict.items():
+ queues.append(entry_to_queue(queue_name, **queue_options))
+
+ super(_CSet, self).__init__(self.backend, queues, **kwargs)
def iterconsume(self, limit=None, no_ack=False):
- return iterconsume(self.connection, self, no_ack, limit)
+ return _iterconsume(self.connection, self, no_ack, limit)
def discard_all(self):
return self.purge()
@@ -200,17 +222,3 @@ class _CSet(messaging.Consumer):
def close(self):
self.cancel()
self.channel.close()
-
-
-def ConsumerSet(connection, from_dict=None, consumers=None,
- callbacks=None, **kwargs):
-
- queues = []
- if consumers:
- for consumer in consumers:
- map(queues.extend, consumer.queues)
- if from_dict:
- for queue_name, queue_options in from_dict.items():
- queues.append(entry_to_queue(queue_name, **queue_options))
-
- return _CSet(connection, queues, **kwargs)
diff --git a/kombu/compression.py b/kombu/compression.py
index ebe9ee96..e6f04702 100644
--- a/kombu/compression.py
+++ b/kombu/compression.py
@@ -1,29 +1,67 @@
+"""
+kombu.compression
+=================
+
+Compression utilities.
+
+:copyright: (c) 2009 - 2010 by Ask Solem.
+:license: BSD, see LICENSE for more details.
+
+"""
+
_aliases = {}
_encoders = {}
_decoders = {}
def register(encoder, decoder, content_type, aliases=[]):
+ """Register new compression method.
+
+ :param encoder: Function used to compress text.
+ :param decoder: Function used to decompress previously compressed text.
+ :param content_type: The mime type this compression method identifies as.
+ :param aliases: A list of names to associate with this compression method.
+
+ """
_encoders[content_type] = encoder
_decoders[content_type] = decoder
_aliases.update((alias, content_type) for alias in aliases)
+def encoders():
+ """Returns a list of available compression methods."""
+ return _encoders.keys()
+
+
def get_encoder(t):
+ """Get encoder by alias name."""
t = _aliases.get(t, t)
return _encoders[t], t
def get_decoder(t):
+ """Get decoder by alias name."""
return _decoders[_aliases.get(t, t)]
def compress(body, content_type):
+ """Compress text.
+
+ :param body: The text to compress.
+ :param content_type: mime-type of compression method to use.
+
+ """
encoder, content_type = get_encoder(content_type)
return encoder(body), content_type
def decompress(body, content_type):
+ """Decompress compressed text.
+
+ :param body: Previously compressed text to uncompress.
+ :param content_type: mime-type of compression method used.
+
+ """
return get_decoder(content_type)(body)
diff --git a/kombu/connection.py b/kombu/connection.py
index c30db523..7ab6f8a9 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -1,3 +1,13 @@
+"""
+kombu.connection
+================
+
+Broker connection and pools.
+
+:copyright: (c) 2009 - 2010 by Ask Solem.
+:license: BSD, see LICENSE for more details.
+
+"""
import socket
from copy import copy
@@ -16,31 +26,22 @@ class BrokerConnection(object):
:keyword hostname: Hostname/address of the server to connect to.
Default is ``"localhost"``.
-
:keyword userid: Username. Default is ``"guest"``.
-
:keyword password: Password. Default is ``"guest"``.
-
:keyword virtual_host: Virtual host. Default is ``"/"``.
-
:keyword port: Port of the server. Default is transport specific.
-
:keyword insist: Insist on connecting to a server.
In a configuration with multiple load-sharing servers, the insist
option tells the server that the client is insisting on a connection
to the specified server. Default is ``False``.
-
:keyword ssl: Use ssl to connect to the server. Default is ``False``.
-
:keyword transport: Transport class to use. Can be a class,
or a string specifying the path to the class. (e.g.
``kombu.transport.pyamqplib.Transport``), or one of the aliases:
``amqplib``, ``pika``, ``redis``, ``memory``.
-
:keyword connect_timeout: Timeout in seconds for connecting to the
server. May not be suported by the specified transport.
-
**Usage**
Creating a connection::
@@ -48,7 +49,8 @@ class BrokerConnection(object):
>>> conn = BrokerConnection("rabbit.example.com")
The connection is established lazily when needed. If you need the
- connection to be established, then do so expliclty using :meth:`connect`::
+ connection to be established, then force it to do so using
+ :meth:`connect`::
>>> conn.connect()
@@ -99,8 +101,7 @@ class BrokerConnection(object):
"""
return self.transport.drain_events(self.connection, **kwargs)
- def close(self):
- """Close the connection (if open)."""
+ def _close(self):
try:
if self._connection:
try:
@@ -112,6 +113,11 @@ class BrokerConnection(object):
pass
self._closed = True
+ def release(self):
+ """Close the connection (if open)."""
+ self._close()
+ close = release
+
def ensure_connection(self, errback=None, max_retries=None,
interval_start=2, interval_step=2, interval_max=30):
"""Ensure we have a connection to the server.
@@ -205,12 +211,7 @@ class BrokerConnection(object):
def create_transport(self):
return self.get_transport_cls()(client=self)
- create_backend = create_transport # FIXME
-
- def clone(self, **kwargs):
- """Create a copy of the connection with the same connection
- settings."""
- return self.__class__(**dict(self.info(), **kwargs))
+ create_backend = create_transport # FIXME
def get_transport_cls(self):
"""Get the currently used transport class."""
@@ -219,6 +220,11 @@ class BrokerConnection(object):
transport_cls = get_transport_cls(transport_cls)
return transport_cls
+ def clone(self, **kwargs):
+ """Create a copy of the connection with the same connection
+ settings."""
+ return self.__class__(**dict(self.info(), **kwargs))
+
def info(self):
"""Get connection info."""
transport_cls = self.transport_cls or "amqplib"
@@ -240,7 +246,7 @@ class BrokerConnection(object):
:keyword limit: Maximum number of active connections.
Default is no limit.
- :keyword preload: Number of connections to preload
+ :keyword preload: Number of connections to preload
when the pool is created. Default is 0.
*Example usage*::
@@ -267,7 +273,7 @@ class BrokerConnection(object):
:keyword limit: Maximum number of active channels.
Default is no limit.
- :keyword preload: Number of channels to preload
+ :keyword preload: Number of channels to preload
when the pool is created. Default is 0.
*Example usage*::
@@ -344,6 +350,7 @@ class BrokerConnection(object):
return "<BrokerConnection: %s>" % (
", ".join("%s=%r" % (item, info[item])
for item in info.keys()[:8]))
+
def __copy__(self):
"""``x.__copy__() <==> copy(x)``"""
return self.clone()
@@ -355,10 +362,17 @@ class BrokerConnection(object):
return self
def __exit__(self, *args):
- self.close()
+ self.release()
@property
def connection(self):
+ """The underlying connection object.
+
+ .. warning::
+ This instance is transport specific, so do not
+ depend on the interface of this object.
+
+ """
if self._closed:
return
if not self._connection:
@@ -449,7 +463,6 @@ class Resource(object):
self._resource.put_nowait(resource)
-
class ConnectionPool(Resource):
LimitExceeded = exceptions.ConnectionLimitExceeded
diff --git a/kombu/entity.py b/kombu/entity.py
index 6cc44d63..20f6f097 100644
--- a/kombu/entity.py
+++ b/kombu/entity.py
@@ -1,3 +1,13 @@
+"""
+kombu.entity
+================
+
+Exchange and Queue declarations.
+
+:copyright: (c) 2009 - 2010 by Ask Solem.
+:license: BSD, see LICENSE for more details.
+
+"""
from kombu.abstract import MaybeChannelBound
TRANSIENT_DELIVERY_MODE = 1
@@ -7,7 +17,7 @@ DELIVERY_MODES = {"transient": TRANSIENT_DELIVERY_MODE,
class Exchange(MaybeChannelBound):
- """An Exchange.
+ """An Exchange declaration.
:keyword name: See :attr:`name`.
:keyword type: See :attr:`type`.
@@ -28,34 +38,34 @@ class Exchange(MaybeChannelBound):
also define additional exchange types, so see your broker
manual for more information about available exchange types.
- * ``direct`` (*default*)
+ * `direct` (*default*)
Direct match between the routing key in the message, and the
routing criteria used when a queue is bound to this exchange.
- * ``topic``
+ * `topic`
Wildcard match between the routing key and the routing pattern
specified in the exchange/queue binding. The routing key is
- treated as zero or more words delimited by ``"."`` and
- supports special wildcard characters. ``"*"`` matches a
- single word and ``"#"`` matches zero or more words.
+ treated as zero or more words delimited by `"."` and
+ supports special wildcard characters. `"*"` matches a
+ single word and `"#"` matches zero or more words.
- * ``fanout``
+ * `fanout`
Queues are bound to this exchange with no arguments. Hence any
message sent to this exchange will be forwarded to all queues
bound to this exchange.
- * ``headers``
+ * `headers`
Queues are bound to this exchange with a table of arguments
containing headers and values (optional). A special argument
named "x-match" determines the matching algorithm, where
- ``"all"`` implies an ``AND`` (all pairs must match) and
- ``"any"`` implies ``OR`` (at least one pair must match).
+ `"all"` implies an `AND` (all pairs must match) and
+ `"any"` implies `OR` (at least one pair must match).
- :attr:`arguments`` is used to specify the arguments.
+ :attr:`arguments` is used to specify the arguments.
This description of AMQP exchange types was shamelessly stolen
from the blog post `AMQP in 10 minutes: Part 4`_ by
@@ -72,18 +82,19 @@ class Exchange(MaybeChannelBound):
Durable exchanges remain active when a server restarts. Non-durable
exchanges (transient exchanges) are purged when a server restarts.
- Default is ``True``.
+ Default is :const:`True`.
.. attribute:: auto_delete
If set, the exchange is deleted when all queues have finished
- using it. Default is ``False``.
+ using it. Default is :const:`False`.
.. attribute:: delivery_mode
- The default delivery mode used for messages. The value is an integer.
+ The default delivery mode used for messages. The value is an integer,
+ or alias string.
- * 1 or "transient"
+ * 1 or `"transient"`
The message is transient. Which means it is stored in
memory only, and is lost if the server dies or restarts.
@@ -93,38 +104,16 @@ class Exchange(MaybeChannelBound):
stored both in-memory, and on disk, and therefore
preserved if the server dies or restarts.
- The default value is ``2`` (persistent).
+ The default value is 2 (persistent).
.. attribute:: arguments
Additional arguments to specify when the exchange is declared.
-
- **Usage**
-
- Example creating an exchange declaration::
-
- >>> news_exchange = Exchange("news", type="topic")
-
- For now ``news_exchange`` is just a declaration, you can't perform
- actions on it. It just describes the name and options for the exchange.
-
- The exchange can be bound or unbound. Bound means the exchange is
- associated with a channel and operations can be performed on it.
- To bind the exchange you call the exchange with the channel as argument::
-
- >>> bound_exchange = news_exchange(channel)
-
- Now you can perform operations like :meth:`declare` or :meth:`delete`::
-
- >>> bound_exchange.declare()
- >>> message = bound_exchange.Message("Cure for cancer found!")
- >>> bound_exchange.publish(message, routing_key="news.science")
- >>> bound_exchange.delete()
-
"""
TRANSIENT_DELIVERY_MODE = TRANSIENT_DELIVERY_MODE
PERSISTENT_DELIVERY_MODE = PERSISTENT_DELIVERY_MODE
+
name = ""
type = "direct"
durable = True
@@ -150,7 +139,7 @@ class Exchange(MaybeChannelBound):
Creates the exchange on the broker.
:keyword nowait: If set the server will not respond, and a
- response will not be waited for. Default is ``False``.
+ response will not be waited for. Default is :const:`False`.
"""
return self.channel.exchange_declare(exchange=self.name,
@@ -160,9 +149,9 @@ class Exchange(MaybeChannelBound):
arguments=self.arguments,
nowait=nowait)
- def Message(self, body, delivery_mode=None,
- priority=None, content_type=None, content_encoding=None,
- properties=None, headers=None):
+ def Message(self, body, delivery_mode=None, priority=None,
+ content_type=None, content_encoding=None, properties=None,
+ headers=None):
"""Create message instance to be sent with :meth:`publish`.
:param body: Message body.
@@ -170,7 +159,7 @@ class Exchange(MaybeChannelBound):
:keyword delivery_mode: Set custom delivery mode. Defaults
to :attr:`delivery_mode`.
- :keyword priority: Message priority, ``0`` to ``9``. (currently not
+ :keyword priority: Message priority, 0 to 9. (currently not
supported by RabbitMQ).
:keyword content_type: The messages content_type. If content_type
@@ -219,10 +208,10 @@ class Exchange(MaybeChannelBound):
"""Delete the exchange declaration on server.
:keyword if_unused: Delete only if the exchange has no bindings.
- Default is ``False``.
+ Default is :const:`False`.
:keyword nowait: If set the server will not respond, and a
- response will not be waited for. Default is ``False``.
+ response will not be waited for. Default is :const:`False`.
"""
return self.channel.exchange_delete(exchange=self.name,
@@ -275,12 +264,12 @@ class Queue(MaybeChannelBound):
Matches the routing key property of the message by a primitive
pattern matching scheme. The message routing key then consists
- of words separated by dots (``"."``, like domain names), and
- two special characters are available; star (``"*"``) and hash
- (``"#"``). The star matches any word, and the hash matches
- zero or more words. For example ``"*.stock.#"`` matches the
- routing keys ``"usd.stock"`` and ``"eur.stock.db"`` but not
- ``"stock.nasdaq"``.
+ of words separated by dots (`"."`, like domain names), and
+ two special characters are available; star (`"*"`) and hash
+ (`"#"`). The star matches any word, and the hash matches
+ zero or more words. For example `"*.stock.#"` matches the
+ routing keys `"usd.stock"` and `"eur.stock.db"` but not
+ `"stock.nasdaq"`.
.. attribute:: channel
@@ -295,7 +284,7 @@ class Queue(MaybeChannelBound):
messages, although it does not make sense to send
persistent messages to a transient queue.
- Default is ``True``.
+ Default is :const:`True`.
.. attribute:: exclusive
@@ -303,7 +292,7 @@ class Queue(MaybeChannelBound):
current connection. Setting the 'exclusive' flag
always implies 'auto-delete'.
- Default is ``False``.
+ Default is :const:`False`.
.. attribute:: auto_delete
@@ -321,32 +310,7 @@ class Queue(MaybeChannelBound):
Additional arguments used when binding the queue.
- **Usage**
-
- Example creating a queue using our exchange in the :class:`Exchange`
- example::
-
- >>> science_news = Queue("science_news",
- ... exchange=news_exchange,
- ... routing_key="news.science")
-
- For now ``science_news`` is just a declaration, you can't perform
- actions on it. It just describes the name and options for the queue.
-
- The queue can be bound or unbound. Bound means the queue is
- associated with a channel and operations can be performed on it.
- To bind the queue you call the queue instance with the channel as
- an argument::
-
- >>> bound_science_news = science_news(channel)
-
- Now you can perform operations like :meth:`declare` or :meth:`purge`::
-
- >>> bound_sicence_news.declare()
- >>> bound_science_news.purge()
- >>> bound_science_news.delete()
"""
-
name = ""
exchange = None
routing_key = ""
@@ -389,7 +353,7 @@ class Queue(MaybeChannelBound):
"""Declare queue on the server.
:keyword nowait: Do not wait for a reply.
- :keyword passive: If set, the server will not create the queue.
+ :keyword passive: If set, the server will not create the queue.
The client can use this to check whether a queue exists
without modifying the server state.
@@ -414,7 +378,6 @@ class Queue(MaybeChannelBound):
arguments=self.binding_arguments,
nowait=nowait)
-
def get(self, no_ack=None):
"""Poll the server for a new message.
@@ -497,6 +460,4 @@ class Queue(MaybeChannelBound):
return super(Queue, self).__repr__(
"Queue %s -> %s -> %s" % (self.name,
self.exchange,
- self.routing_key))
-
-Binding = Queue
+ self.routing_key))
diff --git a/kombu/exceptions.py b/kombu/exceptions.py
index fe88ac38..fb4db655 100644
--- a/kombu/exceptions.py
+++ b/kombu/exceptions.py
@@ -1,17 +1,33 @@
+"""
+kombu.exceptions
+================
+
+Exceptions.
+
+:copyright: (c) 2009 - 2010 by Ask Solem.
+:license: BSD, see LICENSE for more details.
+
+"""
+
+
class NotBoundError(Exception):
"""Trying to call channel dependent method on unbound entity."""
+ pass
class MessageStateError(Exception):
"""The message has already been acknowledged."""
+ pass
class EnsureExhausted(Exception):
"""ensure() limit exceeded."""
+ pass
class TimeoutError(Exception):
"""Operation timed out."""
+ pass
class LimitExceeded(Exception):
diff --git a/kombu/messaging.py b/kombu/messaging.py
index 88c4d49c..bac4e320 100644
--- a/kombu/messaging.py
+++ b/kombu/messaging.py
@@ -1,9 +1,18 @@
+"""
+kombu.messaging
+===============
+
+Sending and receiving messages.
+
+:copyright: (c) 2009 - 2010 by Ask Solem.
+:license: BSD, see LICENSE for more details.
+
+"""
from itertools import count
-from kombu import serialization
from kombu.compression import compress
from kombu.entity import Exchange, Queue
-from kombu.entity import Binding # TODO Remove
+from kombu.serialization import encode
from kombu.utils import maybe_list
@@ -13,43 +22,38 @@ class Producer(object):
:param channel: Connection channel.
:keyword exchange: Default exchange.
:keyword routing_key: Default routing key.
- :keyword serializer: Default serializer. Default is ``"json"``.
+ :keyword serializer: Default serializer. Default is `"json"`.
:keyword compression: Default compression method. Default is no
compression.
:keyword auto_declare: Automatically declare the exchange
- at instantiation. Default is ``True``.
+ at instantiation. Default is :const:`True`.
:keyword on_return: Callback to call for undeliverable messages,
- when the ``mandatory`` or ``imediate`` arguments to
+ when the `mandatory` or `imediate` arguments to
:meth:`publish` is used. This callback needs the following
- signature: ``(exception, exchange, routing_key, message)``.
-
- .. attribute:: channel
-
- The connection channel to use.
-
- .. attribute:: exchange
+ signature: `(exception, exchange, routing_key, message)`.
- Exchange to publish to.
-
- .. attribute:: routing_key
-
- Default routing key.
+ """
+ #: The connection channel used.
+ channel = None
- .. attribute:: serializer
+ #: Default exchange.
+ exchange = Exchange("")
- Default serializer to use. Default is autodetect.
+ # Default routing key.
+ routing_key = ""
- .. attribute:: auto_declare
+ #: Default serializer to use. Default is JSON.
+ serializer = None
- By default the exchange is declared at instantiation.
- If you want to declare manually you can set this to ``False``.
+ #: Default compression method. Disabled by default.
+ compression = None
- """
- exchange = Exchange("")
- serializer = None
+ #: By default the exchange is declared at instantiation.
+ #: If you want to declare manually then you can set this
+ #: to :const:`False`.
auto_declare = True
- routing_key = ""
- compression = None
+
+ #: Basic return callback.
on_return = None
def __init__(self, channel, exchange=None, routing_key=None,
@@ -71,47 +75,16 @@ class Producer(object):
if self.on_return:
self.channel.events["basic_return"].append(self.on_return)
- def revive(self, channel):
- self.channel = channel
- self.exchange.revive(channel)
-
def declare(self):
"""Declare the exchange.
This is done automatically at instantiation if :attr:`auto_declare`
- is set.
+ is set to :const:`True`.
"""
if self.exchange.name:
self.exchange.declare()
- def _prepare(self, body, serializer=None,
- content_type=None, content_encoding=None, compression=None,
- headers=None):
-
- # No content_type? Then we're serializing the data internally.
- if not content_type:
- serializer = serializer or self.serializer
- (content_type, content_encoding,
- body) = serialization.encode(body, serializer=serializer)
- else:
- # If the programmer doesn't want us to serialize,
- # make sure content_encoding is set.
- if isinstance(body, unicode):
- if not content_encoding:
- content_encoding = 'utf-8'
- body = body.encode(content_encoding)
-
- # If they passed in a string, we can't know anything
- # about it. So assume it's binary data.
- elif not content_encoding:
- content_encoding = 'binary'
-
- if compression:
- body, headers["compression"] = compress(body, compression)
-
- return body, content_type, content_encoding
-
def publish(self, body, routing_key=None, delivery_mode=None,
mandatory=False, immediate=False, priority=0, content_type=None,
content_encoding=None, serializer=None, headers=None,
@@ -151,6 +124,38 @@ class Producer(object):
return self.exchange.publish(message, routing_key, mandatory,
immediate, exchange=exchange)
+ def revive(self, channel):
+ """Revive the producer after connection loss."""
+ self.channel = channel
+ self.exchange.revive(channel)
+
+ def _prepare(self, body, serializer=None,
+ content_type=None, content_encoding=None, compression=None,
+ headers=None):
+
+ # No content_type? Then we're serializing the data internally.
+ if not content_type:
+ serializer = serializer or self.serializer
+ (content_type, content_encoding,
+ body) = encode(body, serializer=serializer)
+ else:
+ # If the programmer doesn't want us to serialize,
+ # make sure content_encoding is set.
+ if isinstance(body, unicode):
+ if not content_encoding:
+ content_encoding = 'utf-8'
+ body = body.encode(content_encoding)
+
+ # If they passed in a string, we can't know anything
+ # about it. So assume it's binary data.
+ elif not content_encoding:
+ content_encoding = 'binary'
+
+ if compression:
+ body, headers["compression"] = compress(body, compression)
+
+ return body, content_type, content_encoding
+
class Consumer(object):
"""Message consumer.
@@ -162,43 +167,38 @@ class Consumer(object):
:keyword callbacks: see :attr:`callbacks`.
:keyword on_decode_error: see :attr:`on_decode_error`.
- .. attribute:: channel
-
- The connection channel to use.
-
- .. attribute:: queues
-
- A single :class:`~kombu.entity.Queue`, or a list of queues to
- consume from.
-
- .. attribute:: auto_declare
-
- By default the entities will be declared at instantiation,
- if you want to handle this manually you can set this to ``False``.
-
- .. attribute:: callbacks
-
- List of callbacks called in order when a message is received.
-
- The signature of the callbacks must take two arguments:
- ``(body, message)``, which is the decoded message body and
- the ``Message`` instance (a subclass of
- :class:`~kombu.transport.base.Message`).
-
- .. attribute:: on_decode_error
-
- Callback called when a message can't be decoded.
+ """
+ #: The connection channel to use.
+ channel = None
- The signature of the callback must take two arguments: ``(message,
- exc)``, which is the message that can't be decoded and the exception
- that occured while trying to decode it.
+ #: A single :class:`~kombu.entity.Queue`, or a list of queues to
+ #: consume from.
+ queues = None
- """
+ #: Flag for message acknowledgment disabled/enabled.
+ #: Enabled by default.
no_ack = False
+
+ #: By default all entities will be declared at instantiation, if you
+ #: want to handle this manually you can set this to :const:`False`.
auto_declare = True
+
+ #: List of callbacks called in order when a message is received.
+ #:
+ #: The signature of the callbacks must take two arguments:
+ #: `(body, message)`, which is the decoded message body and
+ #: the `Message` instance (a subclass of
+ #: :class:`~kombu.transport.base.Message`).
callbacks = None
+
+ #: Callback called when a message can't be decoded.
+ #:
+ #: The signature of the callback must take two arguments: `(message,
+ #: exc)`, which is the message that can't be decoded and the exception
+ #: that occured while trying to decode it.
on_decode_error = None
- _next_tag = count(1).next # global
+
+ _next_tag = count(1).next # global
def __init__(self, channel, queues, no_ack=None, auto_declare=None,
callbacks=None, on_decode_error=None):
@@ -223,11 +223,6 @@ class Consumer(object):
if self.auto_declare:
self.declare()
- def revive(self, channel):
- for queue in self.queues:
- queue.revive(channel)
- self.channel = channel
-
def declare(self):
"""Declare queues, exchanges and bindings.
@@ -238,62 +233,28 @@ class Consumer(object):
for queue in self.queues:
queue.declare()
- def _basic_consume(self, queue, consumer_tag=None,
- no_ack=no_ack, nowait=True):
- if queue.name not in self._active_tags:
- queue.consume(self._add_tag(queue, consumer_tag),
- self._receive_callback,
- no_ack=no_ack,
- nowait=nowait)
-
- def consume(self, no_ack=None):
- """Register consumer on server.
-
- """
- if no_ack is None:
- no_ack = self.no_ack
- H, T = self.queues[:-1], self.queues[-1]
- for queue in H:
- self._basic_consume(queue, no_ack=no_ack, nowait=True)
- self._basic_consume(T, no_ack=no_ack, nowait=False)
-
- def receive(self, body, message):
- """Method called when a message is received.
-
- This dispatches to the registered :attr:`callbacks`.
-
- :param body: The decoded message body.
- :param message: The ``Message`` instance.
-
- :raises NotImplementedError: If no consumer callbacks have been
- registered.
-
- """
- if not self.callbacks:
- raise NotImplementedError("No consumer callbacks registered")
- for callback in self.callbacks:
- callback(body, message)
-
def register_callback(self, callback):
"""Register a new callback to be called when a message
is received.
The signature of the callback needs to accept two arguments:
- ``(body, message)``, which is the decoded message body
- and the ``Message`` instance (a subclass of
+ `(body, message)`, which is the decoded message body
+ and the `Message` instance (a subclass of
:class:`~kombu.transport.base.Message`.
"""
self.callbacks.append(callback)
- def purge(self):
- """Purge messages from all queues.
-
- **WARNING**: This will *delete all ready messages*, there is no
- undo operation available.
+ def consume(self, no_ack=None):
+ """Register consumer on server.
"""
- return sum(queue.purge() for queue in self.queues)
+ if no_ack is None:
+ no_ack = self.no_ack
+ H, T = self.queues[:-1], self.queues[-1]
+ for queue in H:
+ self._basic_consume(queue, no_ack=no_ack, nowait=True)
+ self._basic_consume(T, no_ack=no_ack, nowait=False)
def cancel(self):
"""End all active queue consumers.
@@ -307,8 +268,19 @@ class Consumer(object):
self._active_tags.clear()
def cancel_by_queue(self, queue):
+ """Cancel consumer by queue name."""
self.channel.basic_cancel(self._active_tags[queue])
+ def purge(self):
+ """Purge messages from all queues.
+
+ .. warning::
+ This will *delete all ready messages*, there is no
+ undo operation available.
+
+ """
+ return sum(queue.purge() for queue in self.queues)
+
def flow(self, active):
"""Enable/disable flow from peer.
@@ -359,13 +331,44 @@ class Consumer(object):
on the specified channel.
:keyword requeue: By default the messages will be redelivered
- to the original recipient. With ``requeue`` set to true, the
+ to the original recipient. With `requeue` set to true, the
server will attempt to requeue the message, potentially then
delivering it to an alternative subscriber.
"""
return self.channel.basic_recover(requeue=requeue)
+ def receive(self, body, message):
+ """Method called when a message is received.
+
+ This dispatches to the registered :attr:`callbacks`.
+
+ :param body: The decoded message body.
+ :param message: The `Message` instance.
+
+ :raises NotImplementedError: If no consumer callbacks have been
+ registered.
+
+ """
+ if not self.callbacks:
+ raise NotImplementedError("No consumer callbacks registered")
+ for callback in self.callbacks:
+ callback(body, message)
+
+ def revive(self, channel):
+ """Revive consumer after connection loss."""
+ for queue in self.queues:
+ queue.revive(channel)
+ self.channel = channel
+
+ def _basic_consume(self, queue, consumer_tag=None,
+ no_ack=no_ack, nowait=True):
+ if queue.name not in self._active_tags:
+ queue.consume(self._add_tag(queue, consumer_tag),
+ self._receive_callback,
+ no_ack=no_ack,
+ nowait=nowait)
+
def _add_tag(self, queue, consumer_tag=None):
tag = consumer_tag or str(self._next_tag())
self._active_tags[queue.name] = tag
@@ -388,3 +391,6 @@ class Consumer(object):
def __exit__(self, *args):
self.cancel()
+
+ def __repr__(self):
+ return "<Consumer: %s>" % (self.queues, )
diff --git a/kombu/pidbox.py b/kombu/pidbox.py
index d8af9c0f..8bb4af42 100644
--- a/kombu/pidbox.py
+++ b/kombu/pidbox.py
@@ -1,44 +1,16 @@
"""
+kombu.pidbox
+===============
-Creating the applications Mailbox
-=================================
+Generic process mailbox.
-::
-
- >>> mailbox = pidbox.Mailbox("celerybeat", type="direct")
-
- >>> @mailbox.handler
- >>> def reload_schedule(state, **kwargs):
- ... state["beat"].reload_schedule()
-
- >>> @mailbox.handler
- >>> def connection_info(state, **kwargs):
- ... return {"connection": state["connection"].info()}
-
-Example Node
-============
-
-::
- >>> connection = kombu.BrokerConnection()
- >>> state = {"beat": beat,
- "connection": connection}
- >>> consumer = mailbox(connection).Node(hostname).listen()
- >>> try:
- ... while True:
- ... connection.drain_events(timeout=1)
- ... finally:
- ... consumer.cancel()
-
-Example Client
-==============
-
-::
- >>> mailbox.cast("reload_schedule") # cast is async.
- >>> info = celerybeat.call("connection_info", timeout=1)
+:copyright: (c) 2009 - 2010 by Ask Solem.
+:license: BSD, see LICENSE for more details.
"""
import socket
+
from copy import copy
from itertools import count
@@ -49,6 +21,21 @@ from kombu.utils import gen_unique_id, kwdict
class Node(object):
+ #: hostname of the node.
+ hostname = None
+
+ #: the :class:`Mailbox` this is a node for.
+ mailbox = None
+
+ #: map of method name/handlers.
+ handlers = None
+
+ #: current context (passed on to handlers)
+ state = None
+
+ #: current channel.
+ channel = None
+
def __init__(self, hostname, state=None, channel=None, handlers=None,
mailbox=None):
self.channel = channel
@@ -65,6 +52,10 @@ class Node(object):
[self.mailbox.get_queue(self.hostname)],
**options)
+ def handler(self, fun):
+ self.handlers[fun.__name__] = fun
+ return fun
+
def listen(self, channel=None, callback=None):
callback = callback or self.handle_message
consumer = self.Consumer(channel=channel,
@@ -72,10 +63,6 @@ class Node(object):
consumer.consume()
return consumer
- def reply(self, data, exchange, routing_key, **kwargs):
- self.mailbox._publish_reply(data, exchange, routing_key,
- channel=self.channel)
-
def dispatch_from_message(self, message):
message = dict(message)
method = message["method"]
@@ -99,28 +86,43 @@ class Node(object):
self.reply({self.hostname: reply}, **reply_to)
return reply
+ def handle(self, method, arguments={}):
+ return self.handlers[method](self.state, **arguments)
+
def handle_call(self, method, arguments):
return self.handle(method, arguments)
def handle_cast(self, method, arguments):
return self.handle(method, arguments)
- def handler(self, fun):
- self.handlers[fun.__name__] = fun
- return fun
-
- def handle(self, method, arguments={}):
- return self.handlers[method](self.state, **arguments)
-
def handle_message(self, message_data, message):
self.dispatch_from_message(message_data)
+ def reply(self, data, exchange, routing_key, **kwargs):
+ self.mailbox._publish_reply(data, exchange, routing_key,
+ channel=self.channel)
+
class Mailbox(object):
node_cls = Node
exchange_fmt = "%s.pidbox"
reply_exchange_fmt = "reply.%s.pidbox"
+ #: Name of application.
+ namespace = None
+
+ #: Connection (if bound).
+ connection = None
+
+ #: Exchange type (usually direct, or fanout for broadcast).
+ type = "direct"
+
+ #: mailbox exchange (init by constructor).
+ exchange = None
+
+ #: exchange to send replies to.
+ reply_exchange = None
+
def __init__(self, namespace, type="direct", connection=None):
self.namespace = namespace
self.connection = connection
@@ -168,7 +170,6 @@ class Mailbox(object):
return Queue("%s.%s.pidbox" % (hostname, self.namespace),
exchange=self.exchange)
-
def _publish_reply(self, reply, exchange, routing_key, channel=None):
chan = channel or self.connection.channel()
try:
diff --git a/kombu/serialization.py b/kombu/serialization.py
index 0e6d1f66..2e5507aa 100644
--- a/kombu/serialization.py
+++ b/kombu/serialization.py
@@ -1,31 +1,20 @@
"""
-Centralized support for encoding/decoding of data structures.
-Requires a json library (`cjson`_, `simplejson`_, or `Python 2.6+`_).
+kombu.serialization
+===================
-Pickle support is built-in.
+Serialization utilities.
-Optionally installs support for ``YAML`` if the `PyYAML`_ package
-is installed.
-
-Optionally installs support for `msgpack`_ if the `msgpack-python`_
-package is installed.
-
-.. _`cjson`: http://pypi.python.org/pypi/python-cjson/
-.. _`simplejson`: http://code.google.com/p/simplejson/
-.. _`Python 2.6+`: http://docs.python.org/library/json.html
-.. _`PyYAML`: http://pyyaml.org/
-.. _`msgpack`: http://msgpack.sourceforge.net/
-.. _`msgpack-python`: http://pypi.python.org/pypi/msgpack-python/
+:copyright: (c) 2009 - 2010
+:license: BSD, see LICENSE for more details.
"""
import codecs
-__all__ = ['SerializerNotInstalled', 'registry']
-
class SerializerNotInstalled(StandardError):
"""Support for the requested serialization type is not installed"""
+ pass
class SerializerRegistry(object):
@@ -40,28 +29,6 @@ class SerializerRegistry(object):
def register(self, name, encoder, decoder, content_type,
content_encoding='utf-8'):
- """Register a new encoder/decoder.
-
- :param name: A convenience name for the serialization method.
-
- :param encoder: A method that will be passed a python data structure
- and should return a string representing the serialized data.
- If ``None``, then only a decoder will be registered. Encoding
- will not be possible.
-
- :param decoder: A method that will be passed a string representing
- serialized data and should return a python data structure.
- If ``None``, then only an encoder will be registered.
- Decoding will not be possible.
-
- :param content_type: The mime-type describing the serialized
- structure.
-
- :param content_encoding: The content encoding (character set) that
- the :param:`decoder` method will be returning. Will usually be
- ``utf-8``, ``us-ascii``, or ``binary``.
-
- """
if encoder:
self._encoders[name] = (content_type, content_encoding, encoder)
if decoder:
@@ -72,7 +39,7 @@ class SerializerRegistry(object):
Set the default serialization method used by this library.
:param name: The name of the registered serialization method.
- For example, ``json`` (default), ``pickle``, ``yaml``,
+ For example, `json` (default), `pickle`, `yaml`, `msgpack`,
or any custom methods registered using :meth:`register`.
:raises SerializerNotInstalled: If the serialization method
@@ -86,34 +53,6 @@ class SerializerRegistry(object):
"No encoder installed for %s" % name)
def encode(self, data, serializer=None):
- """
- Serialize a data structure into a string suitable for sending
- as an AMQP message body.
-
- :param data: The message data to send. Can be a list,
- dictionary or a string.
-
- :keyword serializer: An optional string representing
- the serialization method you want the data marshalled
- into. (For example, ``json``, ``raw``, or ``pickle``).
-
- If ``None`` (default), then `JSON`_ will be used, unless
- ``data`` is a ``str`` or ``unicode`` object. In this
- latter case, no serialization occurs as it would be
- unnecessary.
-
- Note that if ``serializer`` is specified, then that
- serialization method will be used even if a ``str``
- or ``unicode`` object is passed in.
-
- :returns: A three-item tuple containing the content type
- (e.g., ``application/json``), content encoding, (e.g.,
- ``utf-8``) and a string containing the serialized
- data.
-
- :raises SerializerNotInstalled: If the serialization method
- requested is not available.
- """
if serializer == "raw":
return raw_encode(data)
if serializer and not self._encoders.get(serializer):
@@ -145,19 +84,6 @@ class SerializerRegistry(object):
return content_type, content_encoding, payload
def decode(self, data, content_type, content_encoding):
- """Deserialize a data stream as serialized using ``encode``
- based on :param:`content_type`.
-
- :param data: The message data to deserialize.
-
- :param content_type: The content-type of the data.
- (e.g., ``application/json``).
-
- :param content_encoding: The content-encoding of the data.
- (e.g., ``utf-8``, ``binary``, or ``us-ascii``).
-
- :returns: The unserialized data.
- """
content_type = content_type or 'application/data'
content_encoding = (content_encoding or 'utf-8').lower()
@@ -182,23 +108,87 @@ Global registry of serializers/deserializers.
"""
registry = SerializerRegistry()
+
"""
.. function:: encode(data, serializer=default_serializer)
-Encode data using the registry's default encoder.
+ Serialize a data structure into a string suitable for sending
+ as an AMQP message body.
+
+ :param data: The message data to send. Can be a list,
+ dictionary or a string.
+
+ :keyword serializer: An optional string representing
+ the serialization method you want the data marshalled
+ into. (For example, `json`, `raw`, or `pickle`).
+
+ If :const:`None` (default), then json will be used, unless
+ `data` is a :class:`str` or :class:`unicode` object. In this
+ latter case, no serialization occurs as it would be
+ unnecessary.
+
+ Note that if `serializer` is specified, then that
+ serialization method will be used even if a :class:`str`
+ or :class:`unicode` object is passed in.
+ :returns: A three-item tuple containing the content type
+ (e.g., `application/json`), content encoding, (e.g.,
+ `utf-8`) and a string containing the serialized
+ data.
+
+ :raises SerializerNotInstalled: If the serialization method
+ requested is not available.
"""
encode = registry.encode
"""
.. function:: decode(data, content_type, content_encoding):
-Decode data using the registry's default decoder.
+ Deserialize a data stream as serialized using `encode`
+ based on `content_type`.
+
+ :param data: The message data to deserialize.
+
+ :param content_type: The content-type of the data.
+ (e.g., `application/json`).
+
+ :param content_encoding: The content-encoding of the data.
+ (e.g., `utf-8`, `binary`, or `us-ascii`).
+
+ :returns: The unserialized data.
"""
decode = registry.decode
+"""
+.. function:: register(name, encoder, decoder, content_type,
+ content_encoding="utf-8"):
+ Register a new encoder/decoder.
+
+ :param name: A convenience name for the serialization method.
+
+ :param encoder: A method that will be passed a python data structure
+ and should return a string representing the serialized data.
+ If :const:`None`, then only a decoder will be registered. Encoding
+ will not be possible.
+
+ :param decoder: A method that will be passed a string representing
+ serialized data and should return a python data structure.
+ If :const:`None`, then only an encoder will be registered.
+ Decoding will not be possible.
+
+ :param content_type: The mime-type describing the serialized
+ structure.
+
+ :param content_encoding: The content encoding (character set) that
+ the `decoder` method will be returning. Will usually be
+ utf-8`, `us-ascii`, or `binary`.
+
+ """
+register = registry.register
+
+
def raw_encode(data):
"""Special case serializer."""
content_type = 'application/data'
diff --git a/kombu/simple.py b/kombu/simple.py
index e3968c50..eae5a5b6 100644
--- a/kombu/simple.py
+++ b/kombu/simple.py
@@ -1,3 +1,13 @@
+"""
+kombu.simple
+============
+
+Simple interface.
+
+:copyright: (c) 2009 - 2010 by Ask Solem.
+:license: BSD, see LICENSE for more details.
+
+"""
import socket
from collections import deque
@@ -22,20 +32,6 @@ class SimpleBase(object):
self.buffer = deque()
self.consumer.register_callback(self._receive)
- def _receive(self, message_data, message):
- self.buffer.append(message)
-
- def _consume(self):
- if not self._consuming:
- self.consumer.consume(no_ack=self.no_ack)
- self._consuming = True
-
- def get_nowait(self):
- m = self.queue.get(no_ack=self.no_ack)
- if not m:
- raise Empty()
- return m
-
def get(self, block=True, timeout=None, sync=False):
if block:
return self.get_nowait()
@@ -54,6 +50,12 @@ class SimpleBase(object):
elapsed += time() - time_start
remaining = timeout - elapsed
+ def get_nowait(self):
+ m = self.queue.get(no_ack=self.no_ack)
+ if not m:
+ raise Empty()
+ return m
+
def put(self, message, serializer=None, headers=None, compression=None,
routing_key=None, **kwargs):
self.producer.publish(message,
@@ -75,10 +77,19 @@ class SimpleBase(object):
self.channel.close()
self.consumer.cancel()
+ def _receive(self, message_data, message):
+ self.buffer.append(message)
+
+ def _consume(self):
+ if not self._consuming:
+ self.consumer.consume(no_ack=self.no_ack)
+ self._consuming = True
+
def __del__(self):
self.close()
def __len__(self):
+ """`len(self) -> self.qsize()`"""
return self.qsize()
diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py
index b2116edb..00f62e7f 100644
--- a/kombu/tests/test_connection.py
+++ b/kombu/tests/test_connection.py
@@ -28,7 +28,7 @@ class test_Connection(unittest.TestCase):
self.assertTrue(conn.connection.connected)
conn.__exit__()
self.assertIsNone(conn.connection)
- conn.close() # again
+ conn.close() # again
class ResourceCase(unittest.TestCase):
diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py
index 1f8130e4..a90dc51b 100644
--- a/kombu/tests/test_messaging.py
+++ b/kombu/tests/test_messaging.py
@@ -240,12 +240,12 @@ class test_Consumer(unittest.TestCase):
channel = self.connection.channel()
b1 = Queue("qname1", self.exchange, "rkey")
consumer = Consumer(channel, [b1])
-
received = []
+
def callback(message_data, message):
received.append(message_data)
message.ack()
- message.payload # trigger cache
+ message.payload # trigger cache
consumer.register_callback(callback)
consumer._receive_callback({u"foo": u"bar"})
@@ -339,8 +339,8 @@ class test_Consumer(unittest.TestCase):
b1 = Queue("qname1", self.exchange, "rkey")
consumer = Consumer(channel, [b1])
consumer.channel.throw_decode_error = True
-
thrown = []
+
def on_decode_error(msg, exc):
thrown.append((msg.body, exc))
diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py
index 4e406149..a6de13c9 100644
--- a/kombu/transport/__init__.py
+++ b/kombu/transport/__init__.py
@@ -1,3 +1,13 @@
+"""
+kombu.transport
+===============
+
+Built-in transports.
+
+:copyright: (c) 2009 - 2010 by Ask Solem.
+:license: BSD, see LICENSE for more details.
+
+"""
import sys
from kombu.utils import rpartition
@@ -37,7 +47,7 @@ def get_transport_cls(transport=None):
"kombu.transport.pyamqplib.Transport"
- If the name does not include "``.``" (is not fully qualified),
+ If the name does not include `"."` (is not fully qualified),
the alias table will be consulted.
"""
diff --git a/kombu/transport/base.py b/kombu/transport/base.py
index 23033394..2fcdf35b 100644
--- a/kombu/transport/base.py
+++ b/kombu/transport/base.py
@@ -1,8 +1,14 @@
"""
+kombu.transport.base
+====================
-Transport base classes.
+Base transport interface.
+
+:copyright: (c) 2009 - 2010 by Ask Solem.
+:license: BSD, see LICENSE for more details.
"""
+
from kombu import serialization
from kombu.compression import decompress
from kombu.exceptions import MessageStateError
@@ -16,6 +22,30 @@ class Message(object):
MessageStateError = MessageStateError
+ #: The channel the message was received on.
+ channel = None
+
+ #: Delivery tag used to identify the message in this channel.
+ delivery_tag = None
+
+ #: Content type used to identify the type of content.
+ content_type = None
+
+ #: Content encoding used to identify the text encoding of the body.
+ content_encoding = None
+
+ #: Additional delivery information.
+ delivery_info = None
+
+ #: Message headers
+ headers = None
+
+ #: Application properties
+ properties = None
+
+ #: Raw message body (may be serialized), see :attr:`payload` instead.
+ body = None
+
def __init__(self, channel, body=None, delivery_tag=None,
content_type=None, content_encoding=None, delivery_info={},
properties=None, headers=None,
@@ -35,19 +65,6 @@ class Message(object):
if compression:
self.body = decompress(self.body, compression)
- def decode(self):
- """Deserialize the message body, returning the original
- python structure sent by the publisher."""
- return serialization.decode(self.body, self.content_type,
- self.content_encoding)
-
- @property
- def payload(self):
- """The decoded message."""
- if not self._decoded_cache:
- self._decoded_cache = self.decode()
- return self._decoded_cache
-
def ack(self):
"""Acknowledge this message as being processed.,
This will remove the message from the queue.
@@ -93,29 +110,54 @@ class Message(object):
self.channel.basic_reject(self.delivery_tag, requeue=True)
self._state = "REQUEUED"
+ def decode(self):
+ """Deserialize the message body, returning the original
+ python structure sent by the publisher."""
+ return serialization.decode(self.body, self.content_type,
+ self.content_encoding)
+
@property
def acknowledged(self):
+ """Set to true if the message has been acknowledged."""
return self._state in ACKNOWLEDGED_STATES
+ @property
+ def payload(self):
+ """The decoded message body."""
+ if not self._decoded_cache:
+ self._decoded_cache = self.decode()
+ return self._decoded_cache
+
class Transport(object):
"""Base class for transports."""
+
+ #: The :class:`~kombu.connection.BrokerConnection` owning this instance.
client = None
+
+ #: Default port used when no port has been specified.
default_port = None
+
+ #: Tuple of errors that can happen due to connection failure.
connection_errors = ()
+
+ #: Tuple of errors that can happen due to channel/method failure.
channel_errors = ()
def __init__(self, client, **kwargs):
self.client = client
- def create_channel(self, connection):
+ def establish_connection(self):
raise NotImplementedError("Subclass responsibility")
- def drain_events(self, connection, **kwargs):
+ def close_connection(self, connection):
raise NotImplementedError("Subclass responsibility")
- def establish_connection(self):
+ def create_channel(self, connection):
raise NotImplementedError("Subclass responsibility")
- def close_connection(self, connection):
+ def close_channel(self, connection):
+ raise NotImplementedError("Subclass responsibility")
+
+ def drain_events(self, connection, **kwargs):
raise NotImplementedError("Subclass responsibility")
diff --git a/kombu/transport/memory.py b/kombu/transport/memory.py
index 194c07d6..8f08bcde 100644
--- a/kombu/transport/memory.py
+++ b/kombu/transport/memory.py
@@ -1,3 +1,13 @@
+"""
+kombu.transport.memory
+======================
+
+In-memory transport.
+
+:copyright: (c) 2009 - 2010 by Ask Solem.
+:license: BSD, see LICENSE for more details.
+
+"""
from Queue import Queue
from kombu.transport import virtual
diff --git a/kombu/transport/pyamqplib.py b/kombu/transport/pyamqplib.py
index 8c5e9942..b3521af8 100644
--- a/kombu/transport/pyamqplib.py
+++ b/kombu/transport/pyamqplib.py
@@ -1,3 +1,13 @@
+"""
+kombu.transport.pyamqplib
+=========================
+
+amqplib transport.
+
+:copyright: (c) 2009 - 2010 by Ask Solem.
+:license: BSD, see LICENSE for more details.
+
+"""
import socket
from amqplib import client_0_8 as amqp
diff --git a/kombu/transport/pypika.py b/kombu/transport/pypika.py
index 7305cddd..01567bcb 100644
--- a/kombu/transport/pypika.py
+++ b/kombu/transport/pypika.py
@@ -1,3 +1,13 @@
+"""
+kombu.transport.pypika
+======================
+
+Pika transport.
+
+:copyright: (c) 2009 - 2010 by Ask Solem.
+:license: BSD, see LICENSE for more details.
+
+"""
from pika import asyncore_adapter
from pika import blocking_adapter
from pika import channel
@@ -107,7 +117,6 @@ class SyncTransport(base.Transport):
exceptions.UnknownConsumerTag,
exceptions.ProtocolSyntaxError)
-
Message = Message
Connection = BlockingConnection
diff --git a/kombu/transport/pyredis.py b/kombu/transport/pyredis.py
index 57ef5f1a..1f155a0a 100644
--- a/kombu/transport/pyredis.py
+++ b/kombu/transport/pyredis.py
@@ -1,3 +1,13 @@
+"""
+kombu.transport.pyredis
+=======================
+
+Redis transport.
+
+:copyright: (c) 2009 - 2010 by Ask Solem.
+:license: BSD, see LICENSE for more details.
+
+"""
from Queue import Empty
from anyjson import serialize, deserialize
diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py
index 29557977..38dccb38 100644
--- a/kombu/transport/virtual/__init__.py
+++ b/kombu/transport/virtual/__init__.py
@@ -1,12 +1,13 @@
"""
- kombu.transport.virtual
- ~~~~~~~~~~~~~~~~~~~~~~~
+kombu.transport.virtual
+=======================
- Virtual transport implementation.
- Emulates the AMQ API for non-AMQ transports.
+Virtual transport implementation.
- :copyright: (c) 2009 - 2010 by Ask Solem.
- :license: BSD, see LICENSE for more details.
+Emulates the AMQ API for non-AMQ transports.
+
+:copyright: (c) 2009, 2010 by Ask Solem.
+:license: BSD, see LICENSE for more details.
"""
import socket
@@ -49,9 +50,11 @@ class QoS(object):
:param channel: AMQ Channel.
:keyword prefetch_count: Initial prefetch count (defaults to 0).
-
"""
+ #: current prefetch count value
+ prefetch_count = 0
+
#: :class:`~collections.OrderedDict` of active messages.
_delivered = None
@@ -155,34 +158,58 @@ class Message(base.Message):
class AbstractChannel(object):
- """Methods implemented by subclasses of :class:`Channel`."""
+ """This is an abstract class defining the channel methods
+ you'd usually want to implement in a virtual channel.
+
+ Do not subclass directly, but rather inherit from :class:`Channel`
+ instead.
+
+ """
def _get(self, queue):
+ """Get next message from `queue`."""
raise NotImplementedError("Virtual channels must implement _get")
def _put(self, queue, message):
+ """Put `message` onto `queue`."""
raise NotImplementedError("Virtual channels must implement _put")
def _purge(self, queue):
+ """Remove all messages from `queue`."""
raise NotImplementedError("Virtual channels must implement _purge")
def _size(self, queue):
+ """Return the number of messages in `queue` as an :class:`int`."""
return 0
def _delete(self, queue):
+ """Delete `queue`.
+
+ This just purges the queue, if you need to do more you can
+ override this method.
+
+ """
self._purge(queue)
def _new_queue(self, queue, **kwargs):
+ """Create new queue.
+
+ Some implementations needs to do additiona actions when
+ the queue is created. You can do so by overriding this
+ method.
+
+ """
pass
def _poll(self, queues):
+ """Poll a list of queues for available messages."""
return FairCycle(self._get, queues, Empty).get()
class Channel(AbstractChannel):
"""Virtual channel.
- :param connection: The connection this channel is part of.
+ :param connection: The transport instance this channel is part of.
"""
#: message class used.
@@ -208,45 +235,6 @@ class Channel(AbstractChannel):
self.exchange_types = dict((typ, cls(self))
for typ, cls in self.exchange_types.items())
- def get_table(self, exchange):
- """Get table of bindings for `exchange`."""
- return self.state.exchanges[exchange]["table"]
-
- def typeof(self, exchange):
- """Get the exchange type instance for `exchange`."""
- type = self.state.exchanges[exchange]["type"]
- return self.exchange_types[type]
-
- def _lookup(self, exchange, routing_key, default="ae.undeliver"):
- """Find all queues matching `routing_key` for the given `exchange`.
-
- Returns `default` if no queues matched.
-
- """
- table = self.get_table(exchange)
- try:
- return self.typeof(exchange).lookup(table, exchange,
- routing_key, default)
- except KeyError:
- self._new_queue(default)
- return [default]
-
- def _restore(self, message):
- """Redeliver message to its original destination."""
- delivery_info = message.delivery_info
- message = message.serializable()
- message["redelivered"] = True
- for queue in self._lookup(delivery_info["exchange"],
- delivery_info["routing_key"]):
- self._put(queue, message)
-
- def drain_events(self, timeout=None):
- if self._consumers and self.qos.can_consume():
- if hasattr(self, "_get_many"):
- return self._get_many(self._active_queues, timeout=timeout)
- return self._poll(self._active_queues)
- raise Empty()
-
def exchange_declare(self, exchange, type="direct", durable=False,
auto_delete=False, arguments=None, nowait=False):
"""Declare exchange."""
@@ -305,13 +293,33 @@ class Channel(AbstractChannel):
"""Remove all ready messages from queue."""
return self._purge(queue)
- def basic_qos(self, prefetch_size, prefetch_count, apply_global=False):
- """Change QoS settings for this channel.
+ def basic_publish(self, message, exchange, routing_key, **kwargs):
+ """Publish message."""
+ message["properties"]["delivery_info"]["exchange"] = exchange
+ message["properties"]["delivery_info"]["routing_key"] = routing_key
+ message["properties"]["delivery_tag"] = self._next_delivery_tag()
+ for queue in self._lookup(exchange, routing_key):
+ self._put(queue, message, **kwargs)
- Only `prefetch_count` is supported.
+ def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
+ """Consume from `queue`"""
+ self._tag_to_queue[consumer_tag] = queue
- """
- self.qos.prefetch_count = prefetch_count
+ def _callback(raw_message):
+ message = self.Message(self, raw_message)
+ if not no_ack:
+ self.qos.append(message, message.delivery_tag)
+ return callback(message)
+
+ self.connection._callbacks[queue] = _callback
+ self._consumers.add(consumer_tag)
+
+ def basic_cancel(self, consumer_tag):
+ """Cancel consumer by consumer tag."""
+ self._consumers.remove(consumer_tag)
+ queue = self._tag_to_queue.pop(consumer_tag, None)
+ if queue:
+ self.connection._callbacks.pop(queue, None)
def basic_get(self, queue, **kwargs):
"""Get message by direct access (synchronous)."""
@@ -334,33 +342,52 @@ class Channel(AbstractChannel):
"""Reject message."""
self.qos.reject(delivery_tag, requeue=requeue)
- def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
- """Start active consumer."""
- self._tag_to_queue[consumer_tag] = queue
+ def basic_qos(self, prefetch_size, prefetch_count, apply_global=False):
+ """Change QoS settings for this channel.
- def _callback(raw_message):
- message = self.Message(self, raw_message)
- if not no_ack:
- self.qos.append(message, message.delivery_tag)
- return callback(message)
+ Only `prefetch_count` is supported.
- self.connection._callbacks[queue] = _callback
- self._consumers.add(consumer_tag)
+ """
+ self.qos.prefetch_count = prefetch_count
- def basic_publish(self, message, exchange, routing_key, **kwargs):
- """Publish message."""
- message["properties"]["delivery_info"]["exchange"] = exchange
- message["properties"]["delivery_info"]["routing_key"] = routing_key
- message["properties"]["delivery_tag"] = self._next_delivery_tag()
- for queue in self._lookup(exchange, routing_key):
- self._put(queue, message, **kwargs)
+ def get_table(self, exchange):
+ """Get table of bindings for `exchange`."""
+ return self.state.exchanges[exchange]["table"]
- def basic_cancel(self, consumer_tag):
- """Cancel consumer by consumer tag."""
- self._consumers.remove(consumer_tag)
- queue = self._tag_to_queue.pop(consumer_tag, None)
- if queue:
- self.connection._callbacks.pop(queue, None)
+ def typeof(self, exchange):
+ """Get the exchange type instance for `exchange`."""
+ type = self.state.exchanges[exchange]["type"]
+ return self.exchange_types[type]
+
+ def _lookup(self, exchange, routing_key, default="ae.undeliver"):
+ """Find all queues matching `routing_key` for the given `exchange`.
+
+ Returns `default` if no queues matched.
+
+ """
+ table = self.get_table(exchange)
+ try:
+ return self.typeof(exchange).lookup(table, exchange,
+ routing_key, default)
+ except KeyError:
+ self._new_queue(default)
+ return [default]
+
+ def _restore(self, message):
+ """Redeliver message to its original destination."""
+ delivery_info = message.delivery_info
+ message = message.serializable()
+ message["redelivered"] = True
+ for queue in self._lookup(delivery_info["exchange"],
+ delivery_info["routing_key"]):
+ self._put(queue, message)
+
+ def drain_events(self, timeout=None):
+ if self._consumers and self.qos.can_consume():
+ if hasattr(self, "_get_many"):
+ return self._get_many(self._active_queues, timeout=timeout)
+ return self._poll(self._active_queues)
+ raise Empty()
def message_to_python(self, raw_message):
"""Convert raw message to :class:`Message` instance."""
@@ -383,6 +410,12 @@ class Channel(AbstractChannel):
"properties": properties or {}}
def flow(self, active=True):
+ """Enable/disable message flow.
+
+ :raises NotImplementedError: as flow
+ is not implemented by the base virtual implementation.
+
+ """
raise NotImplementedError("virtual channels does not support flow.")
def close(self):
@@ -415,9 +448,20 @@ class Transport(base.Transport):
:param client: :class:`~kombu.connection.BrokerConnection` instance
"""
+ #: channel class used.
Channel = Channel
+
+ #: cycle class used.
Cycle = FairCycle
+ #: :class:`BrokerState` containing declared exchanges and
+ #: bindings (set by constructor).
+ state = None
+
+ #: :class:`~kombu.transport.virtual.scheduling.FairCycle` instance
+ #: used to fairly drain events from channels (set by constructor).
+ cycle = None
+
#: default interval between polling channels for new events.
interval = 1
diff --git a/kombu/transport/virtual/exchange.py b/kombu/transport/virtual/exchange.py
index 975460c5..e3c1f836 100644
--- a/kombu/transport/virtual/exchange.py
+++ b/kombu/transport/virtual/exchange.py
@@ -1,13 +1,12 @@
"""
- kombu.transport.virtual.exchange
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+kombu.transport.virtual.exchange
+================================
- Implementations of the standard exchanges defined
- by the AMQ protocol. Currently excluding the `headers`
- exchange.
+Implementations of the standard exchanges defined
+by the AMQ protocol (excluding the `headers` exchange).
- :copyright: (c) 2009 - 2010 by Ask Solem.
- :license: BSD, see LICENSE for more details.
+:copyright: (c) 2009 - 2010 by Ask Solem.
+:license: BSD, see LICENSE for more details.
"""
import re
@@ -37,7 +36,7 @@ class ExchangeType(object):
raise NotImplementedError("subclass responsibility")
def prepare_bind(self, queue, exchange, routing_key, arguments):
- """:returns: ``(routing_key, regex, queue)`` tuple to store
+ """:returns: `(routing_key, regex, queue)` tuple to store
for bindings to this exchange."""
return routing_key, None, queue
@@ -63,7 +62,7 @@ class DirectExchange(ExchangeType):
class TopicExchange(ExchangeType):
- """The `topic` exchanges routes based on words separated by dots, and the
+ """The `topic` exchanges routes based on words separated by dots, and
wildcard characters `*` (any single word), and `#` (one or more words)."""
#: map of wildcard to regex conversions
diff --git a/kombu/transport/virtual/scheduling.py b/kombu/transport/virtual/scheduling.py
index ebe2f9a6..e3d4b01a 100644
--- a/kombu/transport/virtual/scheduling.py
+++ b/kombu/transport/virtual/scheduling.py
@@ -1,3 +1,13 @@
+"""
+ kombu.transport.virtual.scheduling
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Consumer utilities.
+
+ :copyright: (c) 2009 - 2010 by Ask Solem.
+ :license: BSD, see LICENSE for more details.
+
+"""
from itertools import count
diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py
index 24f967ca..f6752614 100644
--- a/kombu/utils/__init__.py
+++ b/kombu/utils/__init__.py
@@ -54,7 +54,7 @@ def repeatlast(it):
yield the last value infinitely."""
for item in it:
yield item
- while 1: # pragma: no cover
+ while 1: # pragma: no cover
yield item
diff --git a/kombu/utils/functional.py b/kombu/utils/functional.py
index a112b65a..85908944 100644
--- a/kombu/utils/functional.py
+++ b/kombu/utils/functional.py
@@ -65,12 +65,14 @@
# update_wrapper() and wraps() are tools to help write
# wrapper functions that can handle naive introspection
+
def _compat_partial(fun, *args, **kwargs):
"""New function with partial application of the given arguments
and keywords."""
def _curried(*addargs, **addkwargs):
- return fun(*(args+addargs), **dict(kwargs, **addkwargs))
+ return fun(*(args + addargs), **dict(kwargs, **addkwargs))
+
return _curried
@@ -81,6 +83,8 @@ except ImportError:
WRAPPER_ASSIGNMENTS = ('__module__', '__name__', '__doc__')
WRAPPER_UPDATES = ('__dict__',)
+
+
def _compat_update_wrapper(wrapper, wrapped, assigned=WRAPPER_ASSIGNMENTS,
updated=WRAPPER_UPDATES):
"""Update a wrapper function to look like the wrapped function
@@ -98,7 +102,7 @@ def _compat_update_wrapper(wrapper, wrapped, assigned=WRAPPER_ASSIGNMENTS,
for attr in assigned:
try:
setattr(wrapper, attr, getattr(wrapped, attr))
- except TypeError: # Python 2.3 doesn't allow assigning to __name__.
+ except TypeError: # Python 2.3 doesn't allow assigning to __name__.
pass
for attr in updated:
getattr(wrapper, attr).update(getattr(wrapped, attr))
diff --git a/pavement.py b/pavement.py
index 5f79ca7e..b0a418fd 100644
--- a/pavement.py
+++ b/pavement.py
@@ -6,6 +6,7 @@ options(
sphinx=Bunch(builddir=".build"),
)
+
def sphinx_builddir(options):
return path("docs") / options.sphinx.builddir / "html"