summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--AUTHORS19
-rw-r--r--Changelog8
-rw-r--r--FAQ19
-rw-r--r--INSTALL21
-rw-r--r--LICENSE28
-rw-r--r--MANIFEST.in12
l---------README1
-rw-r--r--README.rst391
-rw-r--r--THANKS6
-rw-r--r--TODO2
-rwxr-xr-xcontrib/doc2ghpages13
-rw-r--r--contrib/requirements/default.txt2
-rw-r--r--contrib/requirements/test.txt7
-rw-r--r--docs/rewrite.rst41
-rw-r--r--kombu/__init__.py7
-rw-r--r--kombu/backends/__init__.py43
-rw-r--r--kombu/backends/base.py104
-rw-r--r--kombu/backends/emulation.py244
-rw-r--r--kombu/backends/pikachu.py212
-rw-r--r--kombu/backends/pyamqplib.py202
-rw-r--r--kombu/connection.py79
-rw-r--r--kombu/entity.py116
-rw-r--r--kombu/messaging.py136
-rw-r--r--kombu/serialization.py279
-rw-r--r--kombu/utils.py66
-rw-r--r--setup.cfg16
-rw-r--r--setup.py92
27 files changed, 2166 insertions, 0 deletions
diff --git a/AUTHORS b/AUTHORS
new file mode 100644
index 00000000..284d232b
--- /dev/null
+++ b/AUTHORS
@@ -0,0 +1,19 @@
+==================================
+ AUTHORS (in chronological order)
+==================================
+
+Ask Solem <askh@opera.com>
+Travis Cline <travis.cline@gmail.com>
+Rune Halvorsen <runeh@opera.com>
+Sean Creeley <sean.creeley@gmail.com>
+Jason Cater <jason@ncsfulfillment.com>
+Ian Struble <istruble@gmail.com>
+Patrick Schneider <patrick.p2k.schneider@gmail.com>
+Travis Swicegood <development@domain51.com>
+Stephen Day <stevvooe@gmail.com>
+Andrew Watts
+Paul McLanahan <paul@mclanahan.net>
+Ralf Nyren <ralf-github@nyren.net>
+Jeff Balogh <me@jeffbalogh.org>
+Adam Wentz
+Vincent Driessen <vincent@datafox.nl>
diff --git a/Changelog b/Changelog
new file mode 100644
index 00000000..66f0f036
--- /dev/null
+++ b/Changelog
@@ -0,0 +1,8 @@
+================
+ Change history
+================
+
+0.1.0
+-----
+
+* Rewrite of carrot
diff --git a/FAQ b/FAQ
new file mode 100644
index 00000000..d8f138c9
--- /dev/null
+++ b/FAQ
@@ -0,0 +1,19 @@
+============================
+ Frequently Asked Questions
+============================
+
+Questions
+=========
+
+Q: Message.reject doesn't work?
+--------------------------------------
+**Answer**: RabbitMQ (as of v1.5.5) has not implemented reject yet.
+There was a brief discussion about it on their mailing list, and the reason
+why it's not implemented yet is revealed:
+
+http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2009-January/003183.html
+
+Q: Message.requeue doesn't work?
+--------------------------------------
+
+**Answer**: See _`Message.reject doesn't work?`
diff --git a/INSTALL b/INSTALL
new file mode 100644
index 00000000..3d86448c
--- /dev/null
+++ b/INSTALL
@@ -0,0 +1,21 @@
+Installation
+============
+
+You can install ``carrot`` either via the Python Package Index (PyPI)
+or from source.
+
+To install using ``pip``,::
+
+ $ pip install carrot
+
+
+To install using ``easy_install``,::
+
+ $ easy_install carrot
+
+
+If you have downloaded a source tarball you can install it
+by doing the following,::
+
+ $ python setup.py build
+ # python setup.py install # as root
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 00000000..81d19cef
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,28 @@
+Copyright (c) 2009, Ask Solem
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+
+Neither the name of Ask Solem nor the names of its contributors may be used
+to endorse or promote products derived from this software without specific
+prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS
+BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGE.
+
diff --git a/MANIFEST.in b/MANIFEST.in
new file mode 100644
index 00000000..08498736
--- /dev/null
+++ b/MANIFEST.in
@@ -0,0 +1,12 @@
+include AUTHORS
+include Changelog
+include FAQ
+include INSTALL
+include LICENSE
+include MANIFEST.in
+include README.rst
+include README
+include THANKS
+include TODO
+recursive-include docs *
+recursive-include kombu *.py
diff --git a/README b/README
new file mode 120000
index 00000000..92cacd28
--- /dev/null
+++ b/README
@@ -0,0 +1 @@
+README.rst \ No newline at end of file
diff --git a/README.rst b/README.rst
new file mode 100644
index 00000000..1678a814
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,391 @@
+##############################################
+ kombu - AMQP Messaging Framework for Python
+##############################################
+
+:Version: 0.10.5
+
+**THIS IS A REWRITE OF CARROT**
+
+
+Introduction
+------------
+
+`carrot` is an `AMQP`_ messaging queue framework. AMQP is the Advanced Message
+Queuing Protocol, an open standard protocol for message orientation, queuing,
+routing, reliability and security.
+
+The aim of `carrot` is to make messaging in Python as easy as possible by
+providing a high-level interface for producing and consuming messages. At the
+same time it is a goal to re-use what is already available as much as possible.
+
+`carrot` has pluggable messaging back-ends, so it is possible to support
+several messaging systems. Currently, there is support for `AMQP`_
+(`py-amqplib`_, `pika`_), `STOMP`_ (`python-stomp`_). There's also an
+in-memory backend for testing purposes, using the `Python queue module`_.
+
+Several AMQP message broker implementations exists, including `RabbitMQ`_,
+`ZeroMQ`_ and `Apache ActiveMQ`_. You'll need to have one of these installed,
+personally we've been using `RabbitMQ`_.
+
+Before you start playing with ``carrot``, you should probably read up on
+AMQP, and you could start with the excellent article about using RabbitMQ
+under Python, `Rabbits and warrens`_. For more detailed information, you can
+refer to the `Wikipedia article about AMQP`_.
+
+.. _`RabbitMQ`: http://www.rabbitmq.com/
+.. _`ZeroMQ`: http://www.zeromq.org/
+.. _`AMQP`: http://amqp.org
+.. _`STOMP`: http://stomp.codehaus.org
+.. _`python-stomp`: http://bitbucket.org/asksol/python-stomp
+.. _`Python Queue module`: http://docs.python.org/library/queue.html
+.. _`Apache ActiveMQ`: http://activemq.apache.org/
+.. _`Django`: http://www.djangoproject.com/
+.. _`Rabbits and warrens`: http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/
+.. _`py-amqplib`: http://barryp.org/software/py-amqplib/
+.. _`pika`: http://github.com/tonyg/pika
+.. _`Wikipedia article about AMQP`: http://en.wikipedia.org/wiki/AMQP
+
+Documentation
+-------------
+
+Carrot is using Sphinx, and the latest documentation is available at GitHub:
+
+ http://github.com/ask/carrot/
+
+Installation
+============
+
+You can install ``carrot`` either via the Python Package Index (PyPI)
+or from source.
+
+To install using ``pip``,::
+
+ $ pip install carrot
+
+
+To install using ``easy_install``,::
+
+ $ easy_install carrot
+
+
+If you have downloaded a source tarball you can install it
+by doing the following,::
+
+ $ python setup.py build
+ # python setup.py install # as root
+
+
+Terminology
+===========
+
+There are some concepts you should be familiar with before starting:
+
+ * Publishers
+
+ Publishers sends messages to an exchange.
+
+ * Exchanges
+
+ Messages are sent to exchanges. Exchanges are named and can be
+ configured to use one of several routing algorithms. The exchange
+ routes the messages to consumers by matching the routing key in the
+ message with the routing key the consumer provides when binding to
+ the exchange.
+
+ * Consumers
+
+ Consumers declares a queue, binds it to a exchange and receives
+ messages from it.
+
+ * Queues
+
+ Queues receive messages sent to exchanges. The queues are declared
+ by consumers.
+
+ * Routing keys
+
+ Every message has a routing key. The interpretation of the routing
+ key depends on the exchange type. There are four default exchange
+ types defined by the AMQP standard, and vendors can define custom
+ types (so see your vendors manual for details).
+
+ These are the default exchange types defined by AMQP/0.8:
+
+ * Direct exchange
+
+ Matches if the routing key property of the message and
+ the ``routing_key`` attribute of the consumer are identical.
+
+ * Fan-out exchange
+
+ Always matches, even if the binding does not have a routing
+ key.
+
+ * Topic exchange
+
+ Matches the routing key property of the message by a primitive
+ pattern matching scheme. The message routing key then consists
+ of words separated by dots (``"."``, like domain names), and
+ two special characters are available; star (``"*"``) and hash
+ (``"#"``). The star matches any word, and the hash matches
+ zero or more words. For example ``"*.stock.#"`` matches the
+ routing keys ``"usd.stock"`` and ``"eur.stock.db"`` but not
+ ``"stock.nasdaq"``.
+
+
+Examples
+========
+
+Creating a connection
+---------------------
+
+ You can set up a connection by creating an instance of
+ ``carrot.messaging.BrokerConnection``, with the appropriate options for
+ your broker:
+
+ >>> from carrot.connection import BrokerConnection
+ >>> conn = BrokerConnection(hostname="localhost", port=5672,
+ ... userid="test", password="test",
+ ... virtual_host="test")
+
+
+ If you're using Django you can use the
+ ``carrot.connection.DjangoBrokerConnection`` class instead, which loads
+ the connection settings from your ``settings.py``::
+
+ BROKER_HOST = "localhost"
+ BROKER_PORT = 5672
+ BROKER_USER = "test"
+ BROKER_PASSWORD = "secret"
+ BROKER_VHOST = "/test"
+
+ Then create a connection by doing:
+
+ >>> from carrot.connection import DjangoBrokerConnection
+ >>> conn = DjangoBrokerConnection()
+
+
+
+Receiving messages using a Consumer
+-----------------------------------
+
+First we open up a Python shell and start a message consumer.
+
+This consumer declares a queue named ``"feed"``, receiving messages with
+the routing key ``"importer"`` from the ``"feed"`` exchange.
+
+The example then uses the consumers ``wait()`` method to go into consume
+mode, where it continuously polls the queue for new messages, and when a
+message is received it passes the message to all registered callbacks.
+
+ >>> from carrot.messaging import Consumer
+ >>> consumer = Consumer(connection=conn, queue="feed",
+ ... exchange="feed", routing_key="importer")
+ >>> def import_feed_callback(message_data, message)
+ ... feed_url = message_data["import_feed"]
+ ... print("Got feed import message for: %s" % feed_url)
+ ... # something importing this feed url
+ ... # import_feed(feed_url)
+ ... message.ack()
+ >>> consumer.register_callback(import_feed_callback)
+ >>> consumer.wait() # Go into the consumer loop.
+
+Sending messages using a Publisher
+----------------------------------
+
+Then we open up another Python shell to send some messages to the consumer
+defined in the last section.
+
+ >>> from carrot.messaging import Publisher
+ >>> publisher = Publisher(connection=conn,
+ ... exchange="feed", routing_key="importer")
+ >>> publisher.send({"import_feed": "http://cnn.com/rss/edition.rss"})
+ >>> publisher.close()
+
+
+Look in the first Python shell again (where ``consumer.wait()`` is running),
+where the following text has been printed to the screen::
+
+ Got feed import message for: http://cnn.com/rss/edition.rss
+
+
+Serialization of Data
+-----------------------
+
+By default every message is encoded using `JSON`_, so sending
+Python data structures like dictionaries and lists works.
+`YAML`_, `msgpack`_ and Python's built-in ``pickle`` module is also supported,
+and if needed you can register any custom serialization scheme you
+want to use.
+
+.. _`JSON`: http://www.json.org/
+.. _`YAML`: http://yaml.org/
+.. _`msgpack`: http://msgpack.sourceforge.net/
+
+Each option has its advantages and disadvantages.
+
+``json`` -- JSON is supported in many programming languages, is now
+ a standard part of Python (since 2.6), and is fairly fast to
+ decode using the modern Python libraries such as ``cjson or
+ ``simplejson``.
+
+ The primary disadvantage to ``JSON`` is that it limits you to
+ the following data types: strings, unicode, floats, boolean,
+ dictionaries, and lists. Decimals and dates are notably missing.
+
+ Also, binary data will be transferred using base64 encoding, which
+ will cause the transferred data to be around 34% larger than an
+ encoding which supports native binary types.
+
+ However, if your data fits inside the above constraints and
+ you need cross-language support, the default setting of ``JSON``
+ is probably your best choice.
+
+``pickle`` -- If you have no desire to support any language other than
+ Python, then using the ``pickle`` encoding will gain you
+ the support of all built-in Python data types (except class instances),
+ smaller messages when sending binary files, and a slight speedup
+ over ``JSON`` processing.
+
+``yaml`` -- YAML has many of the same characteristics as ``json``,
+ except that it natively supports more data types (including dates,
+ recursive references, etc.)
+
+ However, the Python libraries for YAML are a good bit slower
+ than the libraries for JSON.
+
+ If you need a more expressive set of data types and need to maintain
+ cross-language compatibility, then ``YAML`` may be a better fit
+ than the above.
+
+To instruct carrot to use an alternate serialization method,
+use one of the following options.
+
+ 1. Set the serialization option on a per-Publisher basis:
+
+ >>> from carrot.messaging import Publisher
+ >>> publisher = Publisher(connection=conn,
+ ... exchange="feed", routing_key="importer",
+ ... serializer="yaml")
+
+ 2. Set the serialization option on a per-call basis
+
+ >>> from carrot.messaging import Publisher
+ >>> publisher = Publisher(connection=conn,
+ ... exchange="feed", routing_key="importer")
+ >>> publisher.send({"import_feed": "http://cnn.com/rss/edition.rss"},
+ ... serializer="pickle")
+ >>> publisher.close()
+
+Note that ``Consumer``s do not need the serialization method specified in
+their code. They can auto-detect the serialization method since we supply
+the ``Content-type`` header as part of the AMQP message.
+
+
+Sending raw data without Serialization
+---------------------------------------
+
+In some cases, you don't need your message data to be serialized. If you
+pass in a plain string or unicode object as your message, then carrot will
+not waste cycles serializing/deserializing the data.
+
+You can optionally specify a ``content_type`` and ``content_encoding``
+for the raw data:
+
+ >>> from carrot.messaging import Publisher
+ >>> publisher = Publisher(connection=conn,
+ ... exchange="feed",
+ routing_key="import_pictures")
+ >>> publisher.send(open('~/my_picture.jpg','rb').read(),
+ content_type="image/jpeg",
+ content_encoding="binary")
+ >>> publisher.close()
+
+The ``message`` object returned by the ``Consumer`` class will have a
+``content_type`` and ``content_encoding`` attribute.
+
+
+Receiving messages without a callback
+--------------------------------------
+
+You can also poll the queue manually, by using the ``fetch`` method.
+This method returns a ``Message`` object, from where you can get the
+message body, de-serialize the body to get the data, acknowledge, reject or
+re-queue the message.
+
+ >>> consumer = Consumer(connection=conn, queue="feed",
+ ... exchange="feed", routing_key="importer")
+ >>> message = consumer.fetch()
+ >>> if message:
+ ... message_data = message.payload
+ ... message.ack()
+ ... else:
+ ... # No messages waiting on the queue.
+ >>> consumer.close()
+
+Sub-classing the messaging classes
+----------------------------------
+
+The ``Consumer``, and ``Publisher`` classes can also be sub classed. Thus you
+can define the above publisher and consumer like so:
+
+ >>> from carrot.messaging import Publisher, Consumer
+
+ >>> class FeedPublisher(Publisher):
+ ... exchange = "feed"
+ ... routing_key = "importer"
+ ...
+ ... def import_feed(self, feed_url):
+ ... return self.send({"action": "import_feed",
+ ... "feed_url": feed_url})
+
+ >>> class FeedConsumer(Consumer):
+ ... queue = "feed"
+ ... exchange = "feed"
+ ... routing_key = "importer"
+ ...
+ ... def receive(self, message_data, message):
+ ... action = message_data["action"]
+ ... if action == "import_feed":
+ ... # something importing this feed
+ ... # import_feed(message_data["feed_url"])
+ message.ack()
+ ... else:
+ ... raise Exception("Unknown action: %s" % action)
+
+ >>> publisher = FeedPublisher(connection=conn)
+ >>> publisher.import_feed("http://cnn.com/rss/edition.rss")
+ >>> publisher.close()
+
+ >>> consumer = FeedConsumer(connection=conn)
+ >>> consumer.wait() # Go into the consumer loop.
+
+Getting Help
+============
+
+Mailing list
+------------
+
+Join the `carrot-users`_ mailing list.
+
+.. _`carrot-users`: http://groups.google.com/group/carrot-users/
+
+Bug tracker
+===========
+
+If you have any suggestions, bug reports or annoyances please report them
+to our issue tracker at http://github.com/ask/carrot/issues/
+
+Contributing
+============
+
+Development of ``carrot`` happens at Github: http://github.com/ask/carrot
+
+You are highly encouraged to participate in the development. If you don't
+like Github (for some reason) you're welcome to send regular patches.
+
+License
+=======
+
+This software is licensed under the ``New BSD License``. See the ``LICENSE``
+file in the top distribution directory for the full license text.
diff --git a/THANKS b/THANKS
new file mode 100644
index 00000000..6757ee47
--- /dev/null
+++ b/THANKS
@@ -0,0 +1,6 @@
+Thanks to Barry Pederson <bp@barryp.org> for the py-amqplib library.
+Thanks to Grégoire Cachet <gregoire@audacy.fr> for bug reports.
+Thanks to Martin Mahner for the Sphinx theme.
+Thanks to jcater for bug reports.
+Thanks to sebest for bug reports.
+Thanks to greut for bug reports
diff --git a/TODO b/TODO
new file mode 100644
index 00000000..992504b8
--- /dev/null
+++ b/TODO
@@ -0,0 +1,2 @@
+Please see our Issue Tracker at GitHub:
+ http://github.com/ask/kombu/issues
diff --git a/contrib/doc2ghpages b/contrib/doc2ghpages
new file mode 100755
index 00000000..5ebc7aaa
--- /dev/null
+++ b/contrib/doc2ghpages
@@ -0,0 +1,13 @@
+#!/bin/bash
+
+git checkout master
+(cd docs;
+ rm -rf .build;
+ make html;
+ (cd .build/html;
+ sphinx-to-github;))
+git checkout gh-pages
+cp -r docs/.build/html/* .
+git commit . -m "Autogenerated documentation for github."
+git push origin gh-pages
+git checkout master
diff --git a/contrib/requirements/default.txt b/contrib/requirements/default.txt
new file mode 100644
index 00000000..63c3428e
--- /dev/null
+++ b/contrib/requirements/default.txt
@@ -0,0 +1,2 @@
+anyjson
+amqplib>=0.6
diff --git a/contrib/requirements/test.txt b/contrib/requirements/test.txt
new file mode 100644
index 00000000..bf222f70
--- /dev/null
+++ b/contrib/requirements/test.txt
@@ -0,0 +1,7 @@
+nose
+nose-cover3
+coverage>=3.0
+simplejson
+PyYAML
+msgpack-python
+
diff --git a/docs/rewrite.rst b/docs/rewrite.rst
new file mode 100644
index 00000000..c9bbfed6
--- /dev/null
+++ b/docs/rewrite.rst
@@ -0,0 +1,41 @@
+
+
+.. code-block:: python
+
+ from carrot import Connection, Exchange, Binding
+ from carrot import Consumer, Producer
+
+ media_exchange = Exchange("media", "direct", durable=True)
+ video_binding = Binding("video", exchange=media_exchange, key="video")
+
+ # connections/channels
+ connection = Connection("localhost", "guest", "guest", "/")
+ channel = connection.channel()
+
+ # produce
+ producer = Producer(channel, exchange=media_exchange, serializer="json")
+ producer.publish({"name": "/tmp/lolcat1.avi", "size": 1301013})
+
+ # consume
+ consumer = Consumer(channel, video_binding)
+ consumer.register_callback(process_media)
+ consumer.consume()
+
+ while True:
+ connection.drain_events()
+
+
+ # consumerset:
+ video_binding = Binding("video", exchange=media_exchange, key="video")
+ image_binding = Binding("image", exchange=media_exchange, key="image")
+
+ consumer = Consumer(channel, [video_binding, image_binding])
+
+
+
+
+
+
+
+
+
diff --git a/kombu/__init__.py b/kombu/__init__.py
new file mode 100644
index 00000000..d88bc9cc
--- /dev/null
+++ b/kombu/__init__.py
@@ -0,0 +1,7 @@
+"""AMQP Messaging Framework for Python"""
+VERSION = (0, 10, 5)
+__version__ = ".".join(map(str, VERSION))
+__author__ = "Ask Solem"
+__contact__ = "askh@opera.com"
+__homepage__ = "http://github.com/ask/carrot/"
+__docformat__ = "restructuredtext"
diff --git a/kombu/backends/__init__.py b/kombu/backends/__init__.py
new file mode 100644
index 00000000..e2263e60
--- /dev/null
+++ b/kombu/backends/__init__.py
@@ -0,0 +1,43 @@
+import sys
+
+from kombu.utils import rpartition
+
+DEFAULT_BACKEND = "kombu.backends.pyamqplib.Backend"
+
+BACKEND_ALIASES = {
+ "amqplib": "kombu.backends.pyamqplib.Backend",
+ "pika": "kombu.backends.pikachu.AsyncoreBackend",
+ "syncpika": "kombu.backends.pikachu.SyncBackend",
+}
+
+_backend_cache = {}
+
+
+def resolve_backend(backend=None):
+ backend = BACKEND_ALIASES.get(backend, backend)
+ backend_module_name, _, backend_cls_name = rpartition(backend, ".")
+ return backend_module_name, backend_cls_name
+
+
+def _get_backend_cls(backend=None):
+ backend_module_name, backend_cls_name = resolve_backend(backend)
+ __import__(backend_module_name)
+ backend_module = sys.modules[backend_module_name]
+ return getattr(backend_module, backend_cls_name)
+
+
+def get_backend_cls(backend=None):
+ """Get backend class by name.
+
+ The backend string is the full path to a backend class, e.g.::
+
+ "kombu.backends.pyamqplib.Backend"
+
+ If the name does not include "``.``" (is not fully qualified),
+ the alias table will be consulted.
+
+ """
+ backend = backend or DEFAULT_BACKEND
+ if backend not in _backend_cache:
+ _backend_cache[backend] = _get_backend_cls(backend)
+ return _backend_cache[backend]
diff --git a/kombu/backends/base.py b/kombu/backends/base.py
new file mode 100644
index 00000000..4413e119
--- /dev/null
+++ b/kombu/backends/base.py
@@ -0,0 +1,104 @@
+"""
+
+Backend base classes.
+
+"""
+from kombu import serialization
+
+ACKNOWLEDGED_STATES = frozenset(["ACK", "REJECTED", "REQUEUED"])
+
+
+class MessageStateError(Exception):
+ """The message has already been acknowledged."""
+
+
+class BaseMessage(object):
+ """Base class for received messages."""
+ _state = None
+
+ MessageStateError = MessageStateError
+
+ def __init__(self, channel, body=None, delivery_tag=None,
+ content_type=None, content_encoding=None, delivery_info={}, **kwargs):
+ self.channel = channel
+ self._decoded_cache = None
+ self._state = "RECEIVED"
+
+ def decode(self):
+ """Deserialize the message body, returning the original
+ python structure sent by the publisher."""
+ return serialization.decode(self.body, self.content_type,
+ self.content_encoding)
+
+ @property
+ def payload(self):
+ """The decoded message."""
+ if not self._decoded_cache:
+ self._decoded_cache = self.decode()
+ return self._decoded_cache
+
+ def ack(self):
+ """Acknowledge this message as being processed.,
+ This will remove the message from the queue.
+
+ :raises MessageStateError: If the message has already been
+ acknowledged/requeued/rejected.
+
+ """
+ if self.acknowledged:
+ raise self.MessageStateError(
+ "Message already acknowledged with state: %s" % self._state)
+ self.channel.basic_ack(self.delivery_tag)
+ self._state = "ACK"
+
+ def reject(self):
+ """Reject this message.
+
+ The message will be discarded by the server.
+
+ :raises MessageStateError: If the message has already been
+ acknowledged/requeued/rejected.
+
+ """
+ if self.acknowledged:
+ raise self.MessageStateError(
+ "Message already acknowledged with state: %s" % self._state)
+ self.channel.basic_reject(self.delivery_tag)
+ self._state = "REJECTED"
+
+ def requeue(self):
+ """Reject this message and put it back on the queue.
+
+ You must not use this method as a means of selecting messages
+ to process.
+
+ :raises MessageStateError: If the message has already been
+ acknowledged/requeued/rejected.
+
+ """
+ if self.acknowledged:
+ raise self.MessageStateError(
+ "Message already acknowledged with state: %s" % self._state)
+ self.channel.basic_reject(self.delivery_tag, requeue=True)
+ self._state = "REQUEUED"
+
+ @property
+ def acknowledged(self):
+ return self._state in ACKNOWLEDGED_STATES
+
+
+class BaseBackend(object):
+ """Base class for backends."""
+ default_port = None
+
+ def __init__(self, connection, **kwargs):
+ self.connection = connection
+
+ def get_channel(self):
+ raise NotImplementedError("Subclass responsibility")
+
+ def establish_connection(self):
+ raise NotImplementedError("Subclass responsibility")
+
+ def close_connection(self):
+ raise NotImplementedError("Subclass responsibility")
diff --git a/kombu/backends/emulation.py b/kombu/backends/emulation.py
new file mode 100644
index 00000000..600366fa
--- /dev/null
+++ b/kombu/backends/emulation.py
@@ -0,0 +1,244 @@
+from carrot.backends.base import BaseBackend, BaseMessage
+from anyjson import serialize, deserialize
+from itertools import count
+from django.utils.datastructures import SortedDict
+from carrot.utils import gen_unique_id
+import sys
+import time
+import atexit
+
+from Queue import Empty as QueueEmpty
+from itertools import cycle
+
+
+class QueueSet(object):
+ """A set of queues that operates as one."""
+
+ def __init__(self, backend, queues):
+ self.backend = backend
+ self.queues = queues
+
+ # an infinite cycle through all the queues.
+ self.cycle = cycle(self.queues)
+
+ # A set of all the queue names, so we can match when we've
+ # tried all of them.
+ self.all = frozenset(self.queues)
+
+ def get(self):
+ """Get the next message avaiable in the queue.
+
+ :returns: The message and the name of the queue it came from as
+ a tuple.
+ :raises Empty: If there are no more items in any of the queues.
+
+ """
+
+ # A set of queues we've already tried.
+ tried = set()
+
+ while True:
+ # Get the next queue in the cycle, and try to get an item off it.
+ queue = self.cycle.next()
+ try:
+ item = self.backend._get(queue)
+ except Empty:
+ # raises Empty when we've tried all of them.
+ tried.add(queue)
+ if tried == self.all:
+ raise
+ else:
+ return item, queue
+
+ def __repr__(self):
+ return "<QueueSet: %s>" % repr(self.queue_names)
+
+
+class QualityOfService(object):
+
+ def __init__(self, resource, prefetch_count=None, interval=None):
+ self.resource = resource
+ self.prefetch_count = prefetch_count
+ self.interval = interval
+ self._delivered = SortedDict()
+ self._restored_once = False
+ atexit.register(self.restore_unacked_once)
+
+ def can_consume(self):
+ return len(self._delivered) > self.prefetch_count
+
+ def append(self, message, queue_name, delivery_tag):
+ self._delivered[delivery_tag] = message, queue_name
+
+ def ack(self, delivery_tag):
+ self._delivered.pop(delivery_tag, None)
+
+ def restore_unacked(self):
+ for message, queue_name in self._delivered.items():
+ self.resource.put(queue_name, message)
+ self._delivered = SortedDict()
+
+ def requeue(self, delivery_tag):
+ try:
+ message, queue_name = self._delivered.pop(delivery_tag)
+ except KeyError:
+ pass
+ self.resource.put(queue_name, message)
+
+ def restore_unacked_once(self):
+ if not self._restored_once:
+ if self._delivered:
+ sys.stderr.write(
+ "Restoring unacknowledged messages: %s\n" % (
+ self._delivered))
+ self.restore_unacked()
+ if self._delivered:
+ sys.stderr.write("UNRESTORED MESSAGES: %s\n" % (
+ self._delivered))
+
+
+class Message(BaseMessage):
+
+ def __init__(self, backend, payload, **kwargs):
+ self.backend = backend
+
+ payload = deserialize(payload)
+ kwargs["body"] = payload.get("body").encode("utf-8")
+ kwargs["delivery_tag"] = payload.get("delivery_tag")
+ kwargs["content_type"] = payload.get("content-type")
+ kwargs["content_encoding"] = payload.get("content-encoding")
+ kwargs["priority"] = payload.get("priority")
+ self.destination = payload.get("destination")
+
+ super(Message, self).__init__(backend, **kwargs)
+
+ def reject(self):
+ raise NotImplementedError(
+ "This backend does not implement basic.reject")
+
+
+class EmulationBase(BaseBackend):
+ Message = Message
+ default_port = None
+ interval = 1
+ _prefetch_count = None
+
+ QueueSet = QueueSet
+
+ def __init__(self, connection, **kwargs):
+ self.connection = connection
+ self._consumers = {}
+ self._callbacks = {}
+ self._qos_manager = None
+
+ def establish_connection(self):
+ return self # for drain events
+
+ def close_connection(self, connection):
+ pass
+
+ def queue_exists(self, queue):
+ return True
+
+ def queue_purge(self, queue, **kwargs):
+ return self._purge(queue, **kwargs)
+
+ def _poll(self, resource):
+ while True:
+ if self.qos_manager.can_consume():
+ try:
+ return resource.get()
+ except QueueEmpty:
+ pass
+ time.sleep(self.interval)
+
+ def declare_consumer(self, queue, no_ack, callback, consumer_tag,
+ **kwargs):
+ self._consumers[consumer_tag] = queue
+ self._callbacks[queue] = callback
+
+ def drain_events(self, timeout=None):
+ queueset = self.QueueSet(self._consumers.values())
+ payload, queue = self._poll(queueset)
+
+ if not queue or queue not in self._callbacks:
+ return
+
+ self._callbacks[queue](payload)
+
+ def consume(self, limit=None):
+ for total_message_count in count():
+ if limit and total_message_count >= limit:
+ raise StopIteration
+
+ self.drain_events()
+
+ yield True
+
+ def queue_declare(self, queue, *args, **kwargs):
+ pass
+
+ def _get_many(self, queues):
+ raise NotImplementedError("Emulations must implement _get_many")
+
+ def _get(self, queue):
+ raise NotImplementedError("Emulations must implement _get")
+
+ def _put(self, queue, message):
+ raise NotImplementedError("Emulations must implement _put")
+
+ def _purge(self, queue, message):
+ raise NotImplementedError("Emulations must implement _purge")
+
+ def get(self, queue, **kwargs):
+ try:
+ payload = self._get(queue)
+ except QueueEmpty:
+ return None
+ else:
+ return self.message_to_python(payload)
+
+ def ack(self, delivery_tag):
+ self.qos_manager.ack(delivery_tag)
+
+ def requeue(self, delivery_tag):
+ self.qos_manager.requeue(delivery_tag)
+
+ def message_to_python(self, raw_message):
+ message = self.Message(backend=self, payload=raw_message)
+ self.qos_manager.append(message, message.destination,
+ message.delivery_tag)
+ return message
+
+ def prepare_message(self, message_data, delivery_mode, priority=0,
+ content_type=None, content_encoding=None):
+ return {"body": message_data,
+ "priority": priority or 0,
+ "content-encoding": content_encoding,
+ "content-type": content_type}
+
+ def publish(self, message, exchange, routing_key, **kwargs):
+ message["destination"] = exchange
+ self._put(exchange, message)
+
+ def cancel(self, consumer_tag):
+ queue = self._consumers.pop(consumer_tag, None)
+ self._callbacks.pop(queue, None)
+
+ def close(self):
+ for consumer_tag in self._consumers.keys():
+ self.cancel(consumer_tag)
+
+ def basic_qos(self, prefetch_size, prefetch_count, apply_global=False):
+ self._prefetch_count = prefetch_count
+
+ @property
+ def qos_manager(self):
+ if self._qos_manager is None:
+ self._qos_manager = QualityOfService(self)
+
+ # Update prefetch count / interval
+ self._qos_manager.prefetch_count = self._prefetch_count
+ self._qos_manager.interval = self.interval
+
+ return self._qos_manager
diff --git a/kombu/backends/pikachu.py b/kombu/backends/pikachu.py
new file mode 100644
index 00000000..507bbbfb
--- /dev/null
+++ b/kombu/backends/pikachu.py
@@ -0,0 +1,212 @@
+import asyncore
+import weakref
+import functools
+import itertools
+
+import pika
+
+from carrot.backends.base import BaseMessage, BaseBackend
+
+DEFAULT_PORT = 5672
+
+
+class Message(BaseMessage):
+
+ def __init__(self, backend, amqp_message, **kwargs):
+ channel, method, header, body = amqp_message
+ self._channel = channel
+ self._method = method
+ self._header = header
+ self.backend = backend
+
+ kwargs.update({"body": body,
+ "delivery_tag": method.delivery_tag,
+ "content_type": header.content_type,
+ "content_encoding": header.content_encoding,
+ "delivery_info": dict(
+ consumer_tag=method.consumer_tag,
+ routing_key=method.routing_key,
+ delivery_tag=method.delivery_tag,
+ exchange=method.exchange)})
+
+ super(Message, self).__init__(backend, **kwargs)
+
+
+class SyncBackend(BaseBackend):
+ default_port = DEFAULT_PORT
+ _connection_cls = pika.BlockingConnection
+
+ Message = Message
+
+ def __init__(self, connection, **kwargs):
+ self.connection = connection
+ self.default_port = kwargs.get("default_port", self.default_port)
+ self._channel_ref = None
+
+ @property
+ def _channel(self):
+ return callable(self._channel_ref) and self._channel_ref()
+
+ @property
+ def channel(self):
+ """If no channel exists, a new one is requested."""
+ if not self._channel:
+ self._channel_ref = weakref.ref(self.connection.get_channel())
+ return self._channel
+
+ def establish_connection(self):
+ """Establish connection to the AMQP broker."""
+ conninfo = self.connection
+ if not conninfo.port:
+ conninfo.port = self.default_port
+ credentials = pika.PlainCredentials(conninfo.userid,
+ conninfo.password)
+ return self._connection_cls(pika.ConnectionParameters(
+ conninfo.hostname,
+ port=conninfo.port,
+ virtual_host=conninfo.virtual_host,
+ credentials=credentials))
+
+ def close_connection(self, connection):
+ """Close the AMQP broker connection."""
+ connection.close()
+
+ def queue_exists(self, queue):
+ return False # FIXME
+
+ def queue_delete(self, queue, if_unused=False, if_empty=False):
+ """Delete queue by name."""
+ return self.channel.queue_delete(queue=queue, if_unused=if_unused,
+ if_empty=if_empty)
+
+ def queue_purge(self, queue, **kwargs):
+ """Discard all messages in the queue. This will delete the messages
+ and results in an empty queue."""
+ return self.channel.queue_purge(queue=queue).message_count
+
+ def queue_declare(self, queue, durable, exclusive, auto_delete,
+ warn_if_exists=False, arguments=None):
+ """Declare a named queue."""
+
+ return self.channel.queue_declare(queue=queue,
+ durable=durable,
+ exclusive=exclusive,
+ auto_delete=auto_delete,
+ arguments=arguments)
+
+ def exchange_declare(self, exchange, type, durable, auto_delete):
+ """Declare an named exchange."""
+ return self.channel.exchange_declare(exchange=exchange,
+ type=type,
+ durable=durable,
+ auto_delete=auto_delete)
+
+ def queue_bind(self, queue, exchange, routing_key, arguments={}):
+ """Bind queue to an exchange using a routing key."""
+ if not arguments:
+ arguments = {}
+ return self.channel.queue_bind(queue=queue,
+ exchange=exchange,
+ routing_key=routing_key,
+ arguments=arguments)
+
+ def message_to_python(self, raw_message):
+ """Convert encoded message body back to a Python value."""
+ return self.Message(backend=self, amqp_message=raw_message)
+
+ def get(self, queue, no_ack=False):
+ """Receive a message from a declared queue by name.
+
+ :returns: A :class:`Message` object if a message was received,
+ ``None`` otherwise. If ``None`` was returned, it probably means
+ there was no messages waiting on the queue.
+
+ """
+ raw_message = self.channel.basic_get(queue, no_ack=no_ack)
+ if not raw_message:
+ return None
+ return self.message_to_python(raw_message)
+
+ def declare_consumer(self, queue, no_ack, callback, consumer_tag,
+ nowait=False):
+ """Declare a consumer."""
+
+ @functools.wraps(callback)
+ def _callback_decode(channel, method, header, body):
+ return callback((channel, method, header, body))
+
+ return self.channel.basic_consume(_callback_decode,
+ queue=queue,
+ no_ack=no_ack,
+ consumer_tag=consumer_tag)
+
+ def consume(self, limit=None):
+ """Returns an iterator that waits for one message at a time."""
+ for total_message_count in itertools.count():
+ if limit and total_message_count >= limit:
+ raise StopIteration
+ self.connection.connection.drain_events()
+ yield True
+
+ def cancel(self, consumer_tag):
+ """Cancel a channel by consumer tag."""
+ if not self._channel:
+ return
+ self.channel.basic_cancel(consumer_tag)
+
+ def close(self):
+ """Close the channel if open."""
+ if self._channel and not self._channel.handler.channel_close:
+ self._channel.close()
+ self._channel_ref = None
+
+ def ack(self, delivery_tag):
+ """Acknowledge a message by delivery tag."""
+ return self.channel.basic_ack(delivery_tag)
+
+ def reject(self, delivery_tag):
+ """Reject a message by deliver tag."""
+ return self.channel.basic_reject(delivery_tag, requeue=False)
+
+ def requeue(self, delivery_tag):
+ """Reject and requeue a message by delivery tag."""
+ return self.channel.basic_reject(delivery_tag, requeue=True)
+
+ def prepare_message(self, message_data, delivery_mode, priority=None,
+ content_type=None, content_encoding=None):
+ """Encapsulate data into a AMQP message."""
+ properties = pika.BasicProperties(priority=priority,
+ content_type=content_type,
+ content_encoding=content_encoding,
+ delivery_mode=delivery_mode)
+ return message_data, properties
+
+ def publish(self, message, exchange, routing_key, mandatory=None,
+ immediate=None, headers=None):
+ """Publish a message to a named exchange."""
+ body, properties = message
+
+ if headers:
+ properties.headers = headers
+
+ ret = self.channel.basic_publish(body=body,
+ properties=properties,
+ exchange=exchange,
+ routing_key=routing_key,
+ mandatory=mandatory,
+ immediate=immediate)
+ if mandatory or immediate:
+ self.close()
+
+ def qos(self, prefetch_size, prefetch_count, apply_global=False):
+ """Request specific Quality of Service."""
+ self.channel.basic_qos(prefetch_size, prefetch_count,
+ apply_global)
+
+ def flow(self, active):
+ """Enable/disable flow from peer."""
+ self.channel.flow(active)
+
+
+class AsyncoreBackend(SyncBackend):
+ _connection_cls = pika.AsyncoreConnection
diff --git a/kombu/backends/pyamqplib.py b/kombu/backends/pyamqplib.py
new file mode 100644
index 00000000..dbc6df69
--- /dev/null
+++ b/kombu/backends/pyamqplib.py
@@ -0,0 +1,202 @@
+"""
+
+`amqplib`_ backend for carrot.
+
+.. _`amqplib`: http://barryp.org/software/py-amqplib/
+
+"""
+import warnings
+import weakref
+
+from itertools import count
+
+from amqplib import client_0_8 as amqp
+from amqplib.client_0_8.exceptions import AMQPChannelException
+
+from kombu.backends.base import BaseMessage, BaseBackend
+
+DEFAULT_PORT = 5672
+
+class Connection(amqp.Connection):
+
+ def drain_events(self, allowed_methods=None, timeout=None):
+ """Wait for an event on any channel."""
+ return self.wait_multi(self.channels.values(), timeout=timeout)
+
+ def wait_multi(self, channels, allowed_methods=None, timeout=None):
+ """Wait for an event on a channel."""
+ chanmap = dict((chan.channel_id, chan) for chan in channels)
+ chanid, method_sig, args, content = self._wait_multiple(
+ chanmap.keys(), allowed_methods, timeout=timeout)
+
+ channel = chanmap[chanid]
+
+ if content \
+ and channel.auto_decode \
+ and hasattr(content, 'content_encoding'):
+ try:
+ content.body = content.body.decode(content.content_encoding)
+ except Exception:
+ pass
+
+ amqp_method = channel._METHOD_MAP.get(method_sig, None)
+
+ if amqp_method is None:
+ raise Exception('Unknown AMQP method (%d, %d)' % method_sig)
+
+ if content is None:
+ return amqp_method(channel, args)
+ else:
+ return amqp_method(channel, args, content)
+
+ def read_timeout(self, timeout=None):
+ if timeout is None:
+ return self.method_reader.read_method()
+ sock = self.transport.sock
+ prev = sock.gettimeout()
+ sock.settimeout(timeout)
+ try:
+ return self.method_reader.read_method()
+ finally:
+ sock.settimeout(prev)
+
+ def _wait_multiple(self, channel_ids, allowed_methods, timeout=None):
+ for channel_id in channel_ids:
+ method_queue = self.channels[channel_id].method_queue
+ for queued_method in method_queue:
+ method_sig = queued_method[0]
+ if (allowed_methods is None) \
+ or (method_sig in allowed_methods) \
+ or (method_sig == (20, 40)):
+ method_queue.remove(queued_method)
+ method_sig, args, content = queued_method
+ return channel_id, method_sig, args, content
+
+ # Nothing queued, need to wait for a method from the peer
+ while True:
+ channel, method_sig, args, content = self.read_timeout(timeout)
+
+ if (channel in channel_ids) \
+ and ((allowed_methods is None) \
+ or (method_sig in allowed_methods) \
+ or (method_sig == (20, 40))):
+ return channel, method_sig, args, content
+
+ # Not the channel and/or method we were looking for. Queue
+ # this method for later
+ self.channels[channel].method_queue.append((method_sig,
+ args,
+ content))
+
+ #
+ # If we just queued up a method for channel 0 (the Connection
+ # itself) it's probably a close method in reaction to some
+ # error, so deal with it right away.
+ #
+ if channel == 0:
+ self.wait()
+
+ def channel(self, channel_id=None):
+ try:
+ return self.channels[channel_id]
+ except KeyError:
+ return Channel(self, channel_id)
+
+
+class Message(BaseMessage):
+ """A message received by the broker.
+
+ Usually you don't insantiate message objects yourself, but receive
+ them using a :class:`carrot.messaging.Consumer`.
+
+ :param backend: see :attr:`backend`.
+ :param amqp_message: see :attr:`_amqp_message`.
+
+
+ .. attribute:: body
+
+ The message body.
+
+ .. attribute:: delivery_tag
+
+ The message delivery tag, uniquely identifying this message.
+
+ .. attribute:: backend
+
+ The message backend used.
+ A subclass of :class:`carrot.backends.base.BaseBackend`.
+
+ .. attribute:: _amqp_message
+
+ A :class:`amqplib.client_0_8.basic_message.Message` instance.
+ This is a private attribute and should not be accessed by
+ production code.
+
+ """
+
+ def __init__(self, channel, amqp_message, **kwargs):
+ self._amqp_message = amqp_message
+ self.channel = channel
+
+ for attr_name in ("body",
+ "delivery_tag",
+ "content_type",
+ "content_encoding",
+ "delivery_info"):
+ kwargs[attr_name] = getattr(amqp_message, attr_name, None)
+
+ super(Message, self).__init__(backend, **kwargs)
+
+
+class Channel(amqp.Channel):
+ Message = Message
+
+ def prepare_message(self, message_data, priority=None,
+ content_type=None, content_encoding=None, headers=None,
+ properties=None):
+ """Encapsulate data into a AMQP message."""
+ return amqp.Message(message_data, priority=priority,
+ content_type=content_type,
+ content_encoding=content_encoding,
+ properties=properties)
+
+ def message_to_python(self, raw_message):
+ """Convert encoded message body back to a Python value."""
+ return self.Message(channel=self, amqp_message=raw_message)
+
+
+class Backend(BaseBackend):
+ default_port = DEFAULT_PORT
+
+ def __init__(self, connection, **kwargs):
+ self.connection = connection
+ self.default_port = kwargs.get("default_port") or self.default_port
+
+ def get_channel(self, connection):
+ return connection.channel()
+
+ def drain_events(self, connection, **kwargs):
+ return connection.drain_events(**kwargs)
+
+ def establish_connection(self):
+ """Establish connection to the AMQP broker."""
+ conninfo = self.connection
+ if not conninfo.hostname:
+ raise KeyError("Missing hostname for AMQP connection.")
+ if conninfo.userid is None:
+ raise KeyError("Missing user id for AMQP connection.")
+ if conninfo.password is None:
+ raise KeyError("Missing password for AMQP connection.")
+ if not conninfo.port:
+ conninfo.port = self.default_port
+ return Connection(host=conninfo.host,
+ userid=conninfo.userid,
+ password=conninfo.password,
+ virtual_host=conninfo.virtual_host,
+ insist=conninfo.insist,
+ ssl=conninfo.ssl,
+ connect_timeout=conninfo.connect_timeout)
+
+ def close_connection(self, connection):
+ """Close the AMQP broker connection."""
+ connection.close()
diff --git a/kombu/connection.py b/kombu/connection.py
new file mode 100644
index 00000000..64727dc1
--- /dev/null
+++ b/kombu/connection.py
@@ -0,0 +1,79 @@
+from kombu.backends import get_backend_cls
+
+
+class BrokerConnection(object):
+
+ def __init__(self, hostname="localhost", userid="guest",
+ password="guest", virtual_host="/", port=None, **kwargs):
+ self.hostname = hostname
+ self.userid = userid
+ self.password = password
+ self.virtual_host = virtual_host or self.virtual_host
+ self.port = port or self.port
+ self.insist = kwargs.get("insist", False)
+ self.connect_timeout = kwargs.get("connect_timeout", 5)
+ self.ssl = kwargs.get("ssl", False)
+ self.backend_cls = kwargs.get("backend_cls", None)
+ self._closed = None
+ self._connection = None
+ self._backend = None
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, e_type, e_value, e_trace):
+ self.close()
+
+ def _establish_connection(self):
+ return self.backend.establish_connection()
+
+ @property
+ def connection(self):
+ if self._closed:
+ return
+ if not self._connection:
+ self._connection = self._establish_connection()
+ self._closed = False
+ return self._connection
+
+ @property
+ def host(self):
+ """The host as a hostname/port pair separated by colon."""
+ return ":".join([self.hostname, str(self.port)])
+
+ def get_backend_cls(self):
+ """Get the currently used backend class."""
+ backend_cls = self.backend_cls
+ if not backend_cls or isinstance(backend_cls, basestring):
+ backend_cls = get_backend_cls(backend_cls)
+ return backend_cls
+
+ def connect(self):
+ """Establish a connection to the AMQP server."""
+ self._closed = False
+ return self.connection
+
+ def channel(self):
+ """Request a new AMQP channel."""
+ return self.backend.create_channel(self.connection)
+
+ def drain_events(self, **kwargs):
+ return self.backend.drain_events(self.connection, **kwargs)
+
+ def close(self):
+ """Close the currently open connection."""
+ try:
+ if self._connection:
+ self.backend.close_connection(self._connection)
+ except socket.error:
+ pass
+ self._closed = True
+
+ def _create_backend(self):
+ return self.get_backend_cls()(connection=self)
+
+ @property
+ def backend(self):
+ if self._backend is None:
+ self._backend = self._create_backend()
+ return self._backend
diff --git a/kombu/entity.py b/kombu/entity.py
new file mode 100644
index 00000000..c2269a23
--- /dev/null
+++ b/kombu/entity.py
@@ -0,0 +1,116 @@
+class Exchange(object):
+ TRANSIENT_DELIVERY_MODE = 1
+ PERSISTENT_DELIVERY_MODE = 2
+ DELIVERY_MODES = {
+ "transient": TRANSIENT_DELIVERY_MODE,
+ "persistent": PERSISTENT_DELIVERY_MODE,
+ }
+ name = ""
+ type = "direct"
+ routing_key = ""
+ delivery_mode = PERSISTENT_DELIVERY_MODE
+ durable = True
+ auto_delete = False
+ _init_opts = ("durable", "auto_delete",
+ "delivery_mode", "auto_declare")
+
+ def __init__(self, name="", type="", routing_key=None, **kwargs):
+ self.name = name or self.name
+ self.type = type or self.type
+ self.routing_key = routing_key or self.routing_key
+ for opt_name in self._init_opts:
+ opt_value = kwargs.get(opt_name)
+ if opt_value is not None:
+ setattr(self, opt_name, opt_value)
+ self.delivery_mode = self.DELIVERY_MODES.get(self.delivery_mode,
+ self.delivery_mode)
+
+ def declare(self, channel):
+ """Declare the exchange.
+
+ Creates the exchange on the broker.
+
+ """
+ channel.exchange_declare(exchange=self.name,
+ type=self.type,
+ durable=self.durable,
+ auto_delete=self.auto_delete)
+
+ def create_message(self, channel, message_data, delivery_mode=None,
+ priority=None, content_type=None, content_encoding=None,
+ properties=None):
+ properties["delivery_mode"] = delivery_mode or self.delivery_mode
+ return channel.prepare_message(message_data,
+ properties=properties,
+ priority=priority,
+ content_type=content_type,
+ content_encoding=content_encoding)
+
+ def publish(self, channel, message, routing_key=None,
+ mandatory=False, immediate=False):
+ if routing_key is None:
+ routing_key = self.routing_key
+ channel.basic_publish(message,
+ exchange=self.name, routing_key=routing_key,
+ mandatory=mandatory, immediate=immediate,
+ headers=headers)
+
+
+class Binding(object):
+ name = ""
+ exchange = None
+ routing_key = ""
+
+ durable = True
+ exclusive = False
+ auto_delete = False
+ warn_if_exists = False
+ _init_opts = ("durable", "exclusive", "auto_delete",
+ "warn_if_exists")
+
+ def __init__(self, name=None, exchange=None, routing_key=None, **kwargs):
+ # Binding.
+ self.name = name or self.name
+ self.exchange = exchange or self.exchange
+ self.routing_key = routing_key or self.routing_key
+
+ # Options
+ for opt_name in self._init_opts:
+ opt_value = kwargs.get(opt_name)
+ if opt_value is not None:
+ setattr(self, opt_name, opt_value)
+
+ # exclusive implies auto-delete.
+ if self.exclusive:
+ self.auto_delete = True
+
+ def declare(self, channel):
+ """Declares the queue, the exchange and binds the queue to
+ the exchange."""
+ if self.exchange:
+ self.exchange.declare(channel)
+ if self.name:
+ channel.queue_declare(queue=self.name, durable=self.durable,
+ exclusive=self.exclusive,
+ auto_delete=self.auto_delete)
+ channel.queue_bind(queue=self.name,
+ exchange=self.exchange.name,
+ routing_key=self.routing_key)
+
+ def get(self, channel, no_ack=None):
+ message = channel.basic_get(self.name, no_ack=no_ack)
+ if message:
+ return channel.message_to_python(message)
+
+ def purge(self, channel):
+ return channel.queue_purge(self.name)
+
+ def consume(self, channel, consumer_tag, callback, no_ack=None,
+ nowait=True):
+ return channel.consume(queue=self.name, no_ack=no_ack,
+ consumer_tag=consumer_tag,
+ callback=callback,
+ nowait=nowait)
+
+ def cancel(self, channel, consumer_tag):
+ channel.basic_cancel(consumer_tag)
diff --git a/kombu/messaging.py b/kombu/messaging.py
new file mode 100644
index 00000000..cdfae69a
--- /dev/null
+++ b/kombu/messaging.py
@@ -0,0 +1,136 @@
+from itertools import count
+
+from kombu import serialization
+from kombu.entity import Exchange, Binding
+
+
+class Producer(object):
+ exchange = None
+ serializer = None
+ auto_declare = True
+
+ def __init__(self, channel, exchange=None, serializer=None,
+ auto_declare=None):
+ self.channel = channel
+ self.exchange = exchange or self.exchange
+ self.serializer = serializer or self.serializer
+ if auto_declare is not None:
+ self.auto_declare = auto_declare
+
+ if self.exchange and self.auto_declare:
+ self.exchange.declare(self.channel)
+
+ def prepare(self, message_data, serializer=None,
+ content_type=None, content_encoding=None):
+ # No content_type? Then we're serializing the data internally.
+ if not content_type:
+ serializer = serializer or self.serializer
+ (content_type, content_encoding,
+ message_data) = serialization.encode(message_data,
+ serializer=serializer)
+ else:
+ # If the programmer doesn't want us to serialize,
+ # make sure content_encoding is set.
+ if isinstance(message_data, unicode):
+ if not content_encoding:
+ content_encoding = 'utf-8'
+ message_data = message_data.encode(content_encoding)
+
+ # If they passed in a string, we can't know anything
+ # about it. So assume it's binary data.
+ elif not content_encoding:
+ content_encoding = 'binary'
+
+ return message_data, content_type, content_encoding
+
+ def publish(self, message_data, routing_key=None, delivery_mode=None,
+ mandatory=False, immediate=False, priority=0, content_type=None,
+ content_encoding=None, serializer=None):
+
+ message_data, content_type, content_encoding = self.prepare(
+ message_data, content_type, content_encoding)
+ message = self.exchange.create_message(channel, message_data,
+ delivery_mode, priority, content_type, content_encoding)
+ return self.exchange.publish(message, routing_key, mandatory,
+ immediate)
+
+
+_next_tag = count(1).next
+
+class Consumer(object):
+ no_ack = False
+ auto_declare = True
+ callbacks = None
+
+ def __init__(self, channel, bindings, no_ack=None, auto_declare=None,
+ callbacks=None):
+ self.channel = channel
+ self.bindings = bindings
+ if no_ack is not None:
+ self.no_ack = no_ack
+ if auto_declare is not None:
+ self.auto_declare = auto_declare
+
+ if callbacks is not None:
+ self.callbacks = callbacks
+ if self.callbacks is None:
+ self.callbacks = []
+
+ # bindings maybe list
+ if not hasattr(self.bindings, "__iter__"):
+ self.bindings = [self.bindings]
+
+ if self.auto_declare:
+ for binding in self.bindings:
+ binding.declare(self.channel)
+
+ self._active_tags = {}
+ self._declare_consumer()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self):
+ self.cancel()
+
+ def _consume(self):
+ H, T = self.bindings[:-1], self.bindings[-1]
+ for binding in H:
+ binding.consume(self.channel, self._add_tag(binding),
+ self._receive_callback, self.no_ack,
+ nowait=True)
+ T.consume(self.channel, self._add_tag(T),
+ self._receive_callback,
+ self.no_ack, nowait=False)
+
+
+ def _add_tag(self, binding):
+ tag = self._active_tags[binding] = str(_next_tag())
+ return tag
+
+ def _receive_callback(self, raw_message):
+ message = self.channel.message_to_python(raw_message)
+ self.receive(message.payload, message)
+
+ def receive(self, message_data, message):
+ if not self.callbacks:
+ raise NotImplementedError("No consumer callbacks registered")
+ for callback in self.callbacks:
+ callback(message_data, message)
+
+ def register_callback(self, callback):
+ self.callbacks.append(callback)
+
+ def purge(self):
+ for binding in self.bindings:
+ self.binding.purge(self.channel)
+
+ def cancel(self):
+ for binding, tag in self._active_tags.items():
+ binding.cancel(self.channel, tag)
+
+ def flow(self, active):
+ self.channel.flow(active)
+
+ def qos(self, prefetch_size=0, prefetch_count=0, apply_global=False):
+ self.channel.qos(prefetch_size, prefetch_count, apply_global)
diff --git a/kombu/serialization.py b/kombu/serialization.py
new file mode 100644
index 00000000..0e6d1f66
--- /dev/null
+++ b/kombu/serialization.py
@@ -0,0 +1,279 @@
+"""
+Centralized support for encoding/decoding of data structures.
+Requires a json library (`cjson`_, `simplejson`_, or `Python 2.6+`_).
+
+Pickle support is built-in.
+
+Optionally installs support for ``YAML`` if the `PyYAML`_ package
+is installed.
+
+Optionally installs support for `msgpack`_ if the `msgpack-python`_
+package is installed.
+
+.. _`cjson`: http://pypi.python.org/pypi/python-cjson/
+.. _`simplejson`: http://code.google.com/p/simplejson/
+.. _`Python 2.6+`: http://docs.python.org/library/json.html
+.. _`PyYAML`: http://pyyaml.org/
+.. _`msgpack`: http://msgpack.sourceforge.net/
+.. _`msgpack-python`: http://pypi.python.org/pypi/msgpack-python/
+
+"""
+
+import codecs
+
+__all__ = ['SerializerNotInstalled', 'registry']
+
+
+class SerializerNotInstalled(StandardError):
+ """Support for the requested serialization type is not installed"""
+
+
+class SerializerRegistry(object):
+ """The registry keeps track of serialization methods."""
+
+ def __init__(self):
+ self._encoders = {}
+ self._decoders = {}
+ self._default_encode = None
+ self._default_content_type = None
+ self._default_content_encoding = None
+
+ def register(self, name, encoder, decoder, content_type,
+ content_encoding='utf-8'):
+ """Register a new encoder/decoder.
+
+ :param name: A convenience name for the serialization method.
+
+ :param encoder: A method that will be passed a python data structure
+ and should return a string representing the serialized data.
+ If ``None``, then only a decoder will be registered. Encoding
+ will not be possible.
+
+ :param decoder: A method that will be passed a string representing
+ serialized data and should return a python data structure.
+ If ``None``, then only an encoder will be registered.
+ Decoding will not be possible.
+
+ :param content_type: The mime-type describing the serialized
+ structure.
+
+ :param content_encoding: The content encoding (character set) that
+ the :param:`decoder` method will be returning. Will usually be
+ ``utf-8``, ``us-ascii``, or ``binary``.
+
+ """
+ if encoder:
+ self._encoders[name] = (content_type, content_encoding, encoder)
+ if decoder:
+ self._decoders[content_type] = decoder
+
+ def _set_default_serializer(self, name):
+ """
+ Set the default serialization method used by this library.
+
+ :param name: The name of the registered serialization method.
+ For example, ``json`` (default), ``pickle``, ``yaml``,
+ or any custom methods registered using :meth:`register`.
+
+ :raises SerializerNotInstalled: If the serialization method
+ requested is not available.
+ """
+ try:
+ (self._default_content_type, self._default_content_encoding,
+ self._default_encode) = self._encoders[name]
+ except KeyError:
+ raise SerializerNotInstalled(
+ "No encoder installed for %s" % name)
+
+ def encode(self, data, serializer=None):
+ """
+ Serialize a data structure into a string suitable for sending
+ as an AMQP message body.
+
+ :param data: The message data to send. Can be a list,
+ dictionary or a string.
+
+ :keyword serializer: An optional string representing
+ the serialization method you want the data marshalled
+ into. (For example, ``json``, ``raw``, or ``pickle``).
+
+ If ``None`` (default), then `JSON`_ will be used, unless
+ ``data`` is a ``str`` or ``unicode`` object. In this
+ latter case, no serialization occurs as it would be
+ unnecessary.
+
+ Note that if ``serializer`` is specified, then that
+ serialization method will be used even if a ``str``
+ or ``unicode`` object is passed in.
+
+ :returns: A three-item tuple containing the content type
+ (e.g., ``application/json``), content encoding, (e.g.,
+ ``utf-8``) and a string containing the serialized
+ data.
+
+ :raises SerializerNotInstalled: If the serialization method
+ requested is not available.
+ """
+ if serializer == "raw":
+ return raw_encode(data)
+ if serializer and not self._encoders.get(serializer):
+ raise SerializerNotInstalled(
+ "No encoder installed for %s" % serializer)
+
+ # If a raw string was sent, assume binary encoding
+ # (it's likely either ASCII or a raw binary file, but 'binary'
+ # charset will encompass both, even if not ideal.
+ if not serializer and isinstance(data, str):
+ # In Python 3+, this would be "bytes"; allow binary data to be
+ # sent as a message without getting encoder errors
+ return "application/data", "binary", data
+
+ # For unicode objects, force it into a string
+ if not serializer and isinstance(data, unicode):
+ payload = data.encode("utf-8")
+ return "text/plain", "utf-8", payload
+
+ if serializer:
+ content_type, content_encoding, encoder = \
+ self._encoders[serializer]
+ else:
+ encoder = self._default_encode
+ content_type = self._default_content_type
+ content_encoding = self._default_content_encoding
+
+ payload = encoder(data)
+ return content_type, content_encoding, payload
+
+ def decode(self, data, content_type, content_encoding):
+ """Deserialize a data stream as serialized using ``encode``
+ based on :param:`content_type`.
+
+ :param data: The message data to deserialize.
+
+ :param content_type: The content-type of the data.
+ (e.g., ``application/json``).
+
+ :param content_encoding: The content-encoding of the data.
+ (e.g., ``utf-8``, ``binary``, or ``us-ascii``).
+
+ :returns: The unserialized data.
+ """
+ content_type = content_type or 'application/data'
+ content_encoding = (content_encoding or 'utf-8').lower()
+
+ # Don't decode 8-bit strings or unicode objects
+ if content_encoding not in ('binary', 'ascii-8bit') and \
+ not isinstance(data, unicode):
+ data = codecs.decode(data, content_encoding)
+
+ try:
+ decoder = self._decoders[content_type]
+ except KeyError:
+ return data
+
+ return decoder(data)
+
+
+"""
+.. data:: registry
+
+Global registry of serializers/deserializers.
+
+"""
+registry = SerializerRegistry()
+
+"""
+.. function:: encode(data, serializer=default_serializer)
+
+Encode data using the registry's default encoder.
+
+"""
+encode = registry.encode
+
+"""
+.. function:: decode(data, content_type, content_encoding):
+
+Decode data using the registry's default decoder.
+
+"""
+decode = registry.decode
+
+
+def raw_encode(data):
+ """Special case serializer."""
+ content_type = 'application/data'
+ payload = data
+ if isinstance(payload, unicode):
+ content_encoding = 'utf-8'
+ payload = payload.encode(content_encoding)
+ else:
+ content_encoding = 'binary'
+ return content_type, content_encoding, payload
+
+
+def register_json():
+ """Register a encoder/decoder for JSON serialization."""
+ from anyjson import serialize as json_serialize
+ from anyjson import deserialize as json_deserialize
+
+ registry.register('json', json_serialize, json_deserialize,
+ content_type='application/json',
+ content_encoding='utf-8')
+
+
+def register_yaml():
+ """Register a encoder/decoder for YAML serialization.
+
+ It is slower than JSON, but allows for more data types
+ to be serialized. Useful if you need to send data such as dates"""
+ try:
+ import yaml
+ registry.register('yaml', yaml.safe_dump, yaml.safe_load,
+ content_type='application/x-yaml',
+ content_encoding='utf-8')
+ except ImportError:
+
+ def not_available(*args, **kwargs):
+ """In case a client receives a yaml message, but yaml
+ isn't installed."""
+ raise SerializerNotInstalled(
+ "No decoder installed for YAML. Install the PyYAML library")
+ registry.register('yaml', None, not_available, 'application/x-yaml')
+
+
+def register_pickle():
+ """The fastest serialization method, but restricts
+ you to python clients."""
+ import cPickle
+ registry.register('pickle', cPickle.dumps, cPickle.loads,
+ content_type='application/x-python-serialize',
+ content_encoding='binary')
+
+
+def register_msgpack():
+ """See http://msgpack.sourceforge.net/"""
+ try:
+ import msgpack
+ registry.register('msgpack', msgpack.packs, msgpack.unpacks,
+ content_type='application/x-msgpack',
+ content_encoding='utf-8')
+ except ImportError:
+
+ def not_available(*args, **kwargs):
+ """In case a client receives a msgpack message, but yaml
+ isn't installed."""
+ raise SerializerNotInstalled(
+ "No decoder installed for msgpack. "
+ "Install the msgpack library")
+ registry.register('msgpack', None, not_available,
+ 'application/x-msgpack')
+
+# Register the base serialization methods.
+register_json()
+register_pickle()
+register_yaml()
+register_msgpack()
+
+# JSON is assumed to always be available, so is the default.
+# (this matches the historical use of kombu.)
+registry._set_default_serializer('json')
diff --git a/kombu/utils.py b/kombu/utils.py
new file mode 100644
index 00000000..37ba1aed
--- /dev/null
+++ b/kombu/utils.py
@@ -0,0 +1,66 @@
+from uuid import UUID, uuid4, _uuid_generate_random
+try:
+ import ctypes
+except ImportError:
+ ctypes = None
+
+
+def gen_unique_id():
+ """Generate a unique id, having - hopefully - a very small chance of
+ collission.
+
+ For now this is provided by :func:`uuid.uuid4`.
+ """
+ # Workaround for http://bugs.python.org/issue4607
+ if ctypes and _uuid_generate_random:
+ buffer = ctypes.create_string_buffer(16)
+ _uuid_generate_random(buffer)
+ return str(UUID(bytes=buffer.raw))
+ return str(uuid4())
+
+
+def _compat_rl_partition(S, sep, direction=None):
+ if direction is None:
+ direction = S.split
+ items = direction(sep, 1)
+ if len(items) == 1:
+ return items[0], sep, ''
+ return items[0], sep, items[1]
+
+
+def _compat_partition(S, sep):
+ """``partition(S, sep) -> (head, sep, tail)``
+
+ Search for the separator ``sep`` in ``S``, and return the part before
+ it, the separator itself, and the part after it. If the separator is not
+ found, return ``S`` and two empty strings.
+
+ """
+ return _compat_rl_partition(S, sep, direction=S.split)
+
+
+def _compat_rpartition(S, sep):
+ """``rpartition(S, sep) -> (tail, sep, head)``
+
+ Search for the separator ``sep`` in ``S``, starting at the end of ``S``,
+ and return the part before it, the separator itself, and the part
+ after it. If the separator is not found, return two empty
+ strings and ``S``.
+
+ """
+ return _compat_rl_partition(S, sep, direction=S.rsplit)
+
+
+
+def partition(S, sep):
+ if hasattr(S, 'partition'):
+ return S.partition(sep)
+ else: # Python <= 2.4:
+ return _compat_partition(S, sep)
+
+
+def rpartition(S, sep):
+ if hasattr(S, 'rpartition'):
+ return S.rpartition(sep)
+ else: # Python <= 2.4:
+ return _compat_rpartition(S, sep)
diff --git a/setup.cfg b/setup.cfg
new file mode 100644
index 00000000..fd518807
--- /dev/null
+++ b/setup.cfg
@@ -0,0 +1,16 @@
+[nosetests]
+verbosity = 1
+detailed-errors = 1
+
+[build_sphinx]
+source-dir = docs/
+build-dir = docs/.build
+all_files = 1
+
+[upload_sphinx]
+upload-dir = docs/.build/html
+
+
+[bdist_rpm]
+requires = python-anyjson
+ python-amqplib >= 0.6
diff --git a/setup.py b/setup.py
new file mode 100644
index 00000000..581788ef
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,92 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import os
+import codecs
+
+try:
+ from setuptools import setup, find_packages
+except ImportError:
+ from ez_setup import use_setuptools
+ use_setuptools()
+ from setuptools import setup, find_packages
+
+from distutils.command.install_data import install_data
+from distutils.command.install import INSTALL_SCHEMES
+import sys
+
+import kombu
+
+packages, data_files = [], []
+root_dir = os.path.dirname(__file__)
+if root_dir != '':
+ os.chdir(root_dir)
+src_dir = "kombu"
+
+
+def osx_install_data(install_data):
+
+ def finalize_options(self):
+ self.set_undefined_options("install", ("install_lib", "install_dir"))
+ install_data.finalize_options(self)
+
+
+def fullsplit(path, result=None):
+ if result is None:
+ result = []
+ head, tail = os.path.split(path)
+ if head == '':
+ return [tail] + result
+ if head == path:
+ return result
+ return fullsplit(head, [tail] + result)
+
+
+for scheme in INSTALL_SCHEMES.values():
+ scheme['data'] = scheme['purelib']
+
+for dirpath, dirnames, filenames in os.walk(src_dir):
+ # Ignore dirnames that start with '.'
+ for i, dirname in enumerate(dirnames):
+ if dirname.startswith("."):
+ del dirnames[i]
+ for filename in filenames:
+ if filename.endswith(".py"):
+ packages.append('.'.join(fullsplit(dirpath)))
+ else:
+ data_files.append([dirpath, [os.path.join(dirpath, f) for f in
+ filenames]])
+
+if os.path.exists("README.rst"):
+ long_description = codecs.open('README.rst', "r", "utf-8").read()
+else:
+ long_description = "See http://pypi.python.org/pypi/kombu"
+
+setup(
+ name='kombu',
+ version=kombu.__version__,
+ description=kombu.__doc__,
+ author=kombu.__author__,
+ author_email=kombu.__contact__,
+ url=kombu.__homepage__,
+ platforms=["any"],
+ packages=packages,
+ data_files=data_files,
+ zip_safe=False,
+ test_suite="nose.collector",
+ install_requires=[
+ 'anyjson',
+ 'amqplib>=0.6',
+ ],
+ classifiers=[
+ "Development Status :: 4 - Beta",
+ "Framework :: Django",
+ "Operating System :: OS Independent",
+ "Programming Language :: Python",
+ "License :: OSI Approved :: BSD License",
+ "Intended Audience :: Developers",
+ "Topic :: Communications",
+ "Topic :: System :: Distributed Computing",
+ "Topic :: Software Development :: Libraries :: Python Modules",
+ ],
+ long_description=long_description,
+)