diff options
27 files changed, 2166 insertions, 0 deletions
diff --git a/AUTHORS b/AUTHORS
new file mode 100644
index 00000000..284d232b
--- /dev/null
@@ -0,0 +1,19 @@
+ AUTHORS (in chronological order)
+Ask Solem <>
+Travis Cline <>
+Rune Halvorsen <>
+Sean Creeley <>
+Jason Cater <>
+Ian Struble <>
+Patrick Schneider <>
+Travis Swicegood <>
+Stephen Day <>
+Andrew Watts
+Paul McLanahan <>
+Ralf Nyren <>
+Jeff Balogh <>
+Adam Wentz
+Vincent Driessen <>
diff --git a/Changelog b/Changelog
new file mode 100644
index 00000000..66f0f036
--- /dev/null
+++ b/Changelog
@@ -0,0 +1,8 @@
+ Change history
+* Rewrite of carrot
diff --git a/FAQ b/FAQ
new file mode 100644
index 00000000..d8f138c9
--- /dev/null
+++ b/FAQ
@@ -0,0 +1,19 @@
+ Frequently Asked Questions
+Q: Message.reject doesn't work?
+**Answer**: RabbitMQ (as of v1.5.5) has not implemented reject yet.
+There was a brief discussion about it on their mailing list, and the reason
+why it's not implemented yet is revealed:
+Q: Message.requeue doesn't work?
+**Answer**: See _`Message.reject doesn't work?`
diff --git a/INSTALL b/INSTALL
new file mode 100644
index 00000000..3d86448c
--- /dev/null
@@ -0,0 +1,21 @@
+You can install ``carrot`` either via the Python Package Index (PyPI)
+or from source.
+To install using ``pip``,::
+ $ pip install carrot
+To install using ``easy_install``,::
+ $ easy_install carrot
+If you have downloaded a source tarball you can install it
+by doing the following,::
+ $ python build
+ # python install # as root
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 00000000..81d19cef
--- /dev/null
@@ -0,0 +1,28 @@
+Copyright (c) 2009, Ask Solem
+All rights reserved.
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+ * Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+Neither the name of Ask Solem nor the names of its contributors may be used
+to endorse or promote products derived from this software without specific
+prior written permission.
diff --git a/ b/
new file mode 100644
index 00000000..08498736
--- /dev/null
+++ b/
@@ -0,0 +1,12 @@
+include AUTHORS
+include Changelog
+include FAQ
+include INSTALL
+include LICENSE
+include README.rst
+include README
+include THANKS
+include TODO
+recursive-include docs *
+recursive-include kombu *.py
diff --git a/README b/README
new file mode 120000
index 00000000..92cacd28
--- /dev/null
+++ b/README
@@ -0,0 +1 @@
+README.rst \ No newline at end of file
diff --git a/README.rst b/README.rst
new file mode 100644
index 00000000..1678a814
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,391 @@
+ kombu - AMQP Messaging Framework for Python
+:Version: 0.10.5
+`carrot` is an `AMQP`_ messaging queue framework. AMQP is the Advanced Message
+Queuing Protocol, an open standard protocol for message orientation, queuing,
+routing, reliability and security.
+The aim of `carrot` is to make messaging in Python as easy as possible by
+providing a high-level interface for producing and consuming messages. At the
+same time it is a goal to re-use what is already available as much as possible.
+`carrot` has pluggable messaging back-ends, so it is possible to support
+several messaging systems. Currently, there is support for `AMQP`_
+(`py-amqplib`_, `pika`_), `STOMP`_ (`python-stomp`_). There's also an
+in-memory backend for testing purposes, using the `Python queue module`_.
+Several AMQP message broker implementations exists, including `RabbitMQ`_,
+`ZeroMQ`_ and `Apache ActiveMQ`_. You'll need to have one of these installed,
+personally we've been using `RabbitMQ`_.
+Before you start playing with ``carrot``, you should probably read up on
+AMQP, and you could start with the excellent article about using RabbitMQ
+under Python, `Rabbits and warrens`_. For more detailed information, you can
+refer to the `Wikipedia article about AMQP`_.
+.. _`RabbitMQ`:
+.. _`ZeroMQ`:
+.. _`AMQP`:
+.. _`STOMP`:
+.. _`python-stomp`:
+.. _`Python Queue module`:
+.. _`Apache ActiveMQ`:
+.. _`Django`:
+.. _`Rabbits and warrens`:
+.. _`py-amqplib`:
+.. _`pika`:
+.. _`Wikipedia article about AMQP`:
+Carrot is using Sphinx, and the latest documentation is available at GitHub:
+You can install ``carrot`` either via the Python Package Index (PyPI)
+or from source.
+To install using ``pip``,::
+ $ pip install carrot
+To install using ``easy_install``,::
+ $ easy_install carrot
+If you have downloaded a source tarball you can install it
+by doing the following,::
+ $ python build
+ # python install # as root
+There are some concepts you should be familiar with before starting:
+ * Publishers
+ Publishers sends messages to an exchange.
+ * Exchanges
+ Messages are sent to exchanges. Exchanges are named and can be
+ configured to use one of several routing algorithms. The exchange
+ routes the messages to consumers by matching the routing key in the
+ message with the routing key the consumer provides when binding to
+ the exchange.
+ * Consumers
+ Consumers declares a queue, binds it to a exchange and receives
+ messages from it.
+ * Queues
+ Queues receive messages sent to exchanges. The queues are declared
+ by consumers.
+ * Routing keys
+ Every message has a routing key. The interpretation of the routing
+ key depends on the exchange type. There are four default exchange
+ types defined by the AMQP standard, and vendors can define custom
+ types (so see your vendors manual for details).
+ These are the default exchange types defined by AMQP/0.8:
+ * Direct exchange
+ Matches if the routing key property of the message and
+ the ``routing_key`` attribute of the consumer are identical.
+ * Fan-out exchange
+ Always matches, even if the binding does not have a routing
+ key.
+ * Topic exchange
+ Matches the routing key property of the message by a primitive
+ pattern matching scheme. The message routing key then consists
+ of words separated by dots (``"."``, like domain names), and
+ two special characters are available; star (``"*"``) and hash
+ (``"#"``). The star matches any word, and the hash matches
+ zero or more words. For example ``"*.stock.#"`` matches the
+ routing keys ``"usd.stock"`` and ``"eur.stock.db"`` but not
+ ``"stock.nasdaq"``.
+Creating a connection
+ You can set up a connection by creating an instance of
+ ``carrot.messaging.BrokerConnection``, with the appropriate options for
+ your broker:
+ >>> from carrot.connection import BrokerConnection
+ >>> conn = BrokerConnection(hostname="localhost", port=5672,
+ ... userid="test", password="test",
+ ... virtual_host="test")
+ If you're using Django you can use the
+ ``carrot.connection.DjangoBrokerConnection`` class instead, which loads
+ the connection settings from your ````::
+ BROKER_HOST = "localhost"
+ BROKER_PORT = 5672
+ BROKER_USER = "test"
+ BROKER_PASSWORD = "secret"
+ BROKER_VHOST = "/test"
+ Then create a connection by doing:
+ >>> from carrot.connection import DjangoBrokerConnection
+ >>> conn = DjangoBrokerConnection()
+Receiving messages using a Consumer
+First we open up a Python shell and start a message consumer.
+This consumer declares a queue named ``"feed"``, receiving messages with
+the routing key ``"importer"`` from the ``"feed"`` exchange.
+The example then uses the consumers ``wait()`` method to go into consume
+mode, where it continuously polls the queue for new messages, and when a
+message is received it passes the message to all registered callbacks.
+ >>> from carrot.messaging import Consumer
+ >>> consumer = Consumer(connection=conn, queue="feed",
+ ... exchange="feed", routing_key="importer")
+ >>> def import_feed_callback(message_data, message)
+ ... feed_url = message_data["import_feed"]
+ ... print("Got feed import message for: %s" % feed_url)
+ ... # something importing this feed url
+ ... # import_feed(feed_url)
+ ... message.ack()
+ >>> consumer.register_callback(import_feed_callback)
+ >>> consumer.wait() # Go into the consumer loop.
+Sending messages using a Publisher
+Then we open up another Python shell to send some messages to the consumer
+defined in the last section.
+ >>> from carrot.messaging import Publisher
+ >>> publisher = Publisher(connection=conn,
+ ... exchange="feed", routing_key="importer")
+ >>> publisher.send({"import_feed": ""})
+ >>> publisher.close()
+Look in the first Python shell again (where ``consumer.wait()`` is running),
+where the following text has been printed to the screen::
+ Got feed import message for:
+Serialization of Data
+By default every message is encoded using `JSON`_, so sending
+Python data structures like dictionaries and lists works.
+`YAML`_, `msgpack`_ and Python's built-in ``pickle`` module is also supported,
+and if needed you can register any custom serialization scheme you
+want to use.
+.. _`JSON`:
+.. _`YAML`:
+.. _`msgpack`:
+Each option has its advantages and disadvantages.
+``json`` -- JSON is supported in many programming languages, is now
+ a standard part of Python (since 2.6), and is fairly fast to
+ decode using the modern Python libraries such as ``cjson or
+ ``simplejson``.
+ The primary disadvantage to ``JSON`` is that it limits you to
+ the following data types: strings, unicode, floats, boolean,
+ dictionaries, and lists. Decimals and dates are notably missing.
+ Also, binary data will be transferred using base64 encoding, which
+ will cause the transferred data to be around 34% larger than an
+ encoding which supports native binary types.
+ However, if your data fits inside the above constraints and
+ you need cross-language support, the default setting of ``JSON``
+ is probably your best choice.
+``pickle`` -- If you have no desire to support any language other than
+ Python, then using the ``pickle`` encoding will gain you
+ the support of all built-in Python data types (except class instances),
+ smaller messages when sending binary files, and a slight speedup
+ over ``JSON`` processing.
+``yaml`` -- YAML has many of the same characteristics as ``json``,
+ except that it natively supports more data types (including dates,
+ recursive references, etc.)
+ However, the Python libraries for YAML are a good bit slower
+ than the libraries for JSON.
+ If you need a more expressive set of data types and need to maintain
+ cross-language compatibility, then ``YAML`` may be a better fit
+ than the above.
+To instruct carrot to use an alternate serialization method,
+use one of the following options.
+ 1. Set the serialization option on a per-Publisher basis:
+ >>> from carrot.messaging import Publisher
+ >>> publisher = Publisher(connection=conn,
+ ... exchange="feed", routing_key="importer",
+ ... serializer="yaml")
+ 2. Set the serialization option on a per-call basis
+ >>> from carrot.messaging import Publisher
+ >>> publisher = Publisher(connection=conn,
+ ... exchange="feed", routing_key="importer")
+ >>> publisher.send({"import_feed": ""},
+ ... serializer="pickle")
+ >>> publisher.close()
+Note that ``Consumer``s do not need the serialization method specified in
+their code. They can auto-detect the serialization method since we supply
+the ``Content-type`` header as part of the AMQP message.
+Sending raw data without Serialization
+In some cases, you don't need your message data to be serialized. If you
+pass in a plain string or unicode object as your message, then carrot will
+not waste cycles serializing/deserializing the data.
+You can optionally specify a ``content_type`` and ``content_encoding``
+for the raw data:
+ >>> from carrot.messaging import Publisher
+ >>> publisher = Publisher(connection=conn,
+ ... exchange="feed",
+ routing_key="import_pictures")
+ >>> publisher.send(open('~/my_picture.jpg','rb').read(),
+ content_type="image/jpeg",
+ content_encoding="binary")
+ >>> publisher.close()
+The ``message`` object returned by the ``Consumer`` class will have a
+``content_type`` and ``content_encoding`` attribute.
+Receiving messages without a callback
+You can also poll the queue manually, by using the ``fetch`` method.
+This method returns a ``Message`` object, from where you can get the
+message body, de-serialize the body to get the data, acknowledge, reject or
+re-queue the message.
+ >>> consumer = Consumer(connection=conn, queue="feed",
+ ... exchange="feed", routing_key="importer")
+ >>> message = consumer.fetch()
+ >>> if message:
+ ... message_data = message.payload
+ ... message.ack()
+ ... else:
+ ... # No messages waiting on the queue.
+ >>> consumer.close()
+Sub-classing the messaging classes
+The ``Consumer``, and ``Publisher`` classes can also be sub classed. Thus you
+can define the above publisher and consumer like so:
+ >>> from carrot.messaging import Publisher, Consumer
+ >>> class FeedPublisher(Publisher):
+ ... exchange = "feed"
+ ... routing_key = "importer"
+ ...
+ ... def import_feed(self, feed_url):
+ ... return self.send({"action": "import_feed",
+ ... "feed_url": feed_url})
+ >>> class FeedConsumer(Consumer):
+ ... queue = "feed"
+ ... exchange = "feed"
+ ... routing_key = "importer"
+ ...
+ ... def receive(self, message_data, message):
+ ... action = message_data["action"]
+ ... if action == "import_feed":
+ ... # something importing this feed
+ ... # import_feed(message_data["feed_url"])
+ message.ack()
+ ... else:
+ ... raise Exception("Unknown action: %s" % action)
+ >>> publisher = FeedPublisher(connection=conn)
+ >>> publisher.import_feed("")
+ >>> publisher.close()
+ >>> consumer = FeedConsumer(connection=conn)
+ >>> consumer.wait() # Go into the consumer loop.
+Getting Help
+Mailing list
+Join the `carrot-users`_ mailing list.
+.. _`carrot-users`:
+Bug tracker
+If you have any suggestions, bug reports or annoyances please report them
+to our issue tracker at
+Development of ``carrot`` happens at Github:
+You are highly encouraged to participate in the development. If you don't
+like Github (for some reason) you're welcome to send regular patches.
+This software is licensed under the ``New BSD License``. See the ``LICENSE``
+file in the top distribution directory for the full license text.
diff --git a/THANKS b/THANKS
new file mode 100644
index 00000000..6757ee47
--- /dev/null
+++ b/THANKS
@@ -0,0 +1,6 @@
+Thanks to Barry Pederson <> for the py-amqplib library.
+Thanks to Grégoire Cachet <> for bug reports.
+Thanks to Martin Mahner for the Sphinx theme.
+Thanks to jcater for bug reports.
+Thanks to sebest for bug reports.
+Thanks to greut for bug reports
diff --git a/TODO b/TODO
new file mode 100644
index 00000000..992504b8
--- /dev/null
+++ b/TODO
@@ -0,0 +1,2 @@
+Please see our Issue Tracker at GitHub:
diff --git a/contrib/doc2ghpages b/contrib/doc2ghpages
new file mode 100755
index 00000000..5ebc7aaa
--- /dev/null
+++ b/contrib/doc2ghpages
@@ -0,0 +1,13 @@
+git checkout master
+(cd docs;
+ rm -rf .build;
+ make html;
+ (cd .build/html;
+ sphinx-to-github;))
+git checkout gh-pages
+cp -r docs/.build/html/* .
+git commit . -m "Autogenerated documentation for github."
+git push origin gh-pages
+git checkout master
diff --git a/contrib/requirements/default.txt b/contrib/requirements/default.txt
new file mode 100644
index 00000000..63c3428e
--- /dev/null
+++ b/contrib/requirements/default.txt
@@ -0,0 +1,2 @@
diff --git a/contrib/requirements/test.txt b/contrib/requirements/test.txt
new file mode 100644
index 00000000..bf222f70
--- /dev/null
+++ b/contrib/requirements/test.txt
@@ -0,0 +1,7 @@
diff --git a/docs/rewrite.rst b/docs/rewrite.rst
new file mode 100644
index 00000000..c9bbfed6
--- /dev/null
+++ b/docs/rewrite.rst
@@ -0,0 +1,41 @@
+.. code-block:: python
+ from carrot import Connection, Exchange, Binding
+ from carrot import Consumer, Producer
+ media_exchange = Exchange("media", "direct", durable=True)
+ video_binding = Binding("video", exchange=media_exchange, key="video")
+ # connections/channels
+ connection = Connection("localhost", "guest", "guest", "/")
+ channel =
+ # produce
+ producer = Producer(channel, exchange=media_exchange, serializer="json")
+ producer.publish({"name": "/tmp/lolcat1.avi", "size": 1301013})
+ # consume
+ consumer = Consumer(channel, video_binding)
+ consumer.register_callback(process_media)
+ consumer.consume()
+ while True:
+ connection.drain_events()
+ # consumerset:
+ video_binding = Binding("video", exchange=media_exchange, key="video")
+ image_binding = Binding("image", exchange=media_exchange, key="image")
+ consumer = Consumer(channel, [video_binding, image_binding])
diff --git a/kombu/ b/kombu/
new file mode 100644
index 00000000..d88bc9cc
--- /dev/null
+++ b/kombu/
@@ -0,0 +1,7 @@
+"""AMQP Messaging Framework for Python"""
+VERSION = (0, 10, 5)
+__version__ = ".".join(map(str, VERSION))
+__author__ = "Ask Solem"
+__contact__ = ""
+__homepage__ = ""
+__docformat__ = "restructuredtext"
diff --git a/kombu/backends/ b/kombu/backends/
new file mode 100644
index 00000000..e2263e60
--- /dev/null
+++ b/kombu/backends/
@@ -0,0 +1,43 @@
+import sys
+from kombu.utils import rpartition
+DEFAULT_BACKEND = "kombu.backends.pyamqplib.Backend"
+ "amqplib": "kombu.backends.pyamqplib.Backend",
+ "pika": "kombu.backends.pikachu.AsyncoreBackend",
+ "syncpika": "kombu.backends.pikachu.SyncBackend",
+_backend_cache = {}
+def resolve_backend(backend=None):
+ backend = BACKEND_ALIASES.get(backend, backend)
+ backend_module_name, _, backend_cls_name = rpartition(backend, ".")
+ return backend_module_name, backend_cls_name
+def _get_backend_cls(backend=None):
+ backend_module_name, backend_cls_name = resolve_backend(backend)
+ __import__(backend_module_name)
+ backend_module = sys.modules[backend_module_name]
+ return getattr(backend_module, backend_cls_name)
+def get_backend_cls(backend=None):
+ """Get backend class by name.
+ The backend string is the full path to a backend class, e.g.::
+ "kombu.backends.pyamqplib.Backend"
+ If the name does not include "``.``" (is not fully qualified),
+ the alias table will be consulted.
+ """
+ backend = backend or DEFAULT_BACKEND
+ if backend not in _backend_cache:
+ _backend_cache[backend] = _get_backend_cls(backend)
+ return _backend_cache[backend]
diff --git a/kombu/backends/ b/kombu/backends/
new file mode 100644
index 00000000..4413e119
--- /dev/null
+++ b/kombu/backends/
@@ -0,0 +1,104 @@
+Backend base classes.
+from kombu import serialization
+class MessageStateError(Exception):
+ """The message has already been acknowledged."""
+class BaseMessage(object):
+ """Base class for received messages."""
+ _state = None
+ MessageStateError = MessageStateError
+ def __init__(self, channel, body=None, delivery_tag=None,
+ content_type=None, content_encoding=None, delivery_info={}, **kwargs):
+ = channel
+ self._decoded_cache = None
+ self._state = "RECEIVED"
+ def decode(self):
+ """Deserialize the message body, returning the original
+ python structure sent by the publisher."""
+ return serialization.decode(self.body, self.content_type,
+ self.content_encoding)
+ @property
+ def payload(self):
+ """The decoded message."""
+ if not self._decoded_cache:
+ self._decoded_cache = self.decode()
+ return self._decoded_cache
+ def ack(self):
+ """Acknowledge this message as being processed.,
+ This will remove the message from the queue.
+ :raises MessageStateError: If the message has already been
+ acknowledged/requeued/rejected.
+ """
+ if self.acknowledged:
+ raise self.MessageStateError(
+ "Message already acknowledged with state: %s" % self._state)
+ self._state = "ACK"
+ def reject(self):
+ """Reject this message.
+ The message will be discarded by the server.
+ :raises MessageStateError: If the message has already been
+ acknowledged/requeued/rejected.
+ """
+ if self.acknowledged:
+ raise self.MessageStateError(
+ "Message already acknowledged with state: %s" % self._state)
+ self._state = "REJECTED"
+ def requeue(self):
+ """Reject this message and put it back on the queue.
+ You must not use this method as a means of selecting messages
+ to process.
+ :raises MessageStateError: If the message has already been
+ acknowledged/requeued/rejected.
+ """
+ if self.acknowledged:
+ raise self.MessageStateError(
+ "Message already acknowledged with state: %s" % self._state)
+, requeue=True)
+ self._state = "REQUEUED"
+ @property
+ def acknowledged(self):
+ return self._state in ACKNOWLEDGED_STATES
+class BaseBackend(object):
+ """Base class for backends."""
+ default_port = None
+ def __init__(self, connection, **kwargs):
+ self.connection = connection
+ def get_channel(self):
+ raise NotImplementedError("Subclass responsibility")
+ def establish_connection(self):
+ raise NotImplementedError("Subclass responsibility")
+ def close_connection(self):
+ raise NotImplementedError("Subclass responsibility")
diff --git a/kombu/backends/ b/kombu/backends/
new file mode 100644
index 00000000..600366fa
--- /dev/null
+++ b/kombu/backends/
@@ -0,0 +1,244 @@
+from carrot.backends.base import BaseBackend, BaseMessage
+from anyjson import serialize, deserialize
+from itertools import count
+from django.utils.datastructures import SortedDict
+from carrot.utils import gen_unique_id
+import sys
+import time
+import atexit
+from Queue import Empty as QueueEmpty
+from itertools import cycle
+class QueueSet(object):
+ """A set of queues that operates as one."""
+ def __init__(self, backend, queues):
+ self.backend = backend
+ self.queues = queues
+ # an infinite cycle through all the queues.
+ self.cycle = cycle(self.queues)
+ # A set of all the queue names, so we can match when we've
+ # tried all of them.
+ self.all = frozenset(self.queues)
+ def get(self):
+ """Get the next message avaiable in the queue.
+ :returns: The message and the name of the queue it came from as
+ a tuple.
+ :raises Empty: If there are no more items in any of the queues.
+ """
+ # A set of queues we've already tried.
+ tried = set()
+ while True:
+ # Get the next queue in the cycle, and try to get an item off it.
+ queue =
+ try:
+ item = self.backend._get(queue)
+ except Empty:
+ # raises Empty when we've tried all of them.
+ tried.add(queue)
+ if tried == self.all:
+ raise
+ else:
+ return item, queue
+ def __repr__(self):
+ return "<QueueSet: %s>" % repr(self.queue_names)
+class QualityOfService(object):
+ def __init__(self, resource, prefetch_count=None, interval=None):
+ self.resource = resource
+ self.prefetch_count = prefetch_count
+ self.interval = interval
+ self._delivered = SortedDict()
+ self._restored_once = False
+ atexit.register(self.restore_unacked_once)
+ def can_consume(self):
+ return len(self._delivered) > self.prefetch_count
+ def append(self, message, queue_name, delivery_tag):
+ self._delivered[delivery_tag] = message, queue_name
+ def ack(self, delivery_tag):
+ self._delivered.pop(delivery_tag, None)
+ def restore_unacked(self):
+ for message, queue_name in self._delivered.items():
+ self.resource.put(queue_name, message)
+ self._delivered = SortedDict()
+ def requeue(self, delivery_tag):
+ try:
+ message, queue_name = self._delivered.pop(delivery_tag)
+ except KeyError:
+ pass
+ self.resource.put(queue_name, message)
+ def restore_unacked_once(self):
+ if not self._restored_once:
+ if self._delivered:
+ sys.stderr.write(
+ "Restoring unacknowledged messages: %s\n" % (
+ self._delivered))
+ self.restore_unacked()
+ if self._delivered:
+ sys.stderr.write("UNRESTORED MESSAGES: %s\n" % (
+ self._delivered))
+class Message(BaseMessage):
+ def __init__(self, backend, payload, **kwargs):
+ self.backend = backend
+ payload = deserialize(payload)
+ kwargs["body"] = payload.get("body").encode("utf-8")
+ kwargs["delivery_tag"] = payload.get("delivery_tag")
+ kwargs["content_type"] = payload.get("content-type")
+ kwargs["content_encoding"] = payload.get("content-encoding")
+ kwargs["priority"] = payload.get("priority")
+ self.destination = payload.get("destination")
+ super(Message, self).__init__(backend, **kwargs)
+ def reject(self):
+ raise NotImplementedError(
+ "This backend does not implement basic.reject")
+class EmulationBase(BaseBackend):
+ Message = Message
+ default_port = None
+ interval = 1
+ _prefetch_count = None
+ QueueSet = QueueSet
+ def __init__(self, connection, **kwargs):
+ self.connection = connection
+ self._consumers = {}
+ self._callbacks = {}
+ self._qos_manager = None
+ def establish_connection(self):
+ return self # for drain events
+ def close_connection(self, connection):
+ pass
+ def queue_exists(self, queue):
+ return True
+ def queue_purge(self, queue, **kwargs):
+ return self._purge(queue, **kwargs)
+ def _poll(self, resource):
+ while True:
+ if self.qos_manager.can_consume():
+ try:
+ return resource.get()
+ except QueueEmpty:
+ pass
+ time.sleep(self.interval)
+ def declare_consumer(self, queue, no_ack, callback, consumer_tag,
+ **kwargs):
+ self._consumers[consumer_tag] = queue
+ self._callbacks[queue] = callback
+ def drain_events(self, timeout=None):
+ queueset = self.QueueSet(self._consumers.values())
+ payload, queue = self._poll(queueset)
+ if not queue or queue not in self._callbacks:
+ return
+ self._callbacks[queue](payload)
+ def consume(self, limit=None):
+ for total_message_count in count():
+ if limit and total_message_count >= limit:
+ raise StopIteration
+ self.drain_events()
+ yield True
+ def queue_declare(self, queue, *args, **kwargs):
+ pass
+ def _get_many(self, queues):
+ raise NotImplementedError("Emulations must implement _get_many")
+ def _get(self, queue):
+ raise NotImplementedError("Emulations must implement _get")
+ def _put(self, queue, message):
+ raise NotImplementedError("Emulations must implement _put")
+ def _purge(self, queue, message):
+ raise NotImplementedError("Emulations must implement _purge")
+ def get(self, queue, **kwargs):
+ try:
+ payload = self._get(queue)
+ except QueueEmpty:
+ return None
+ else:
+ return self.message_to_python(payload)
+ def ack(self, delivery_tag):
+ self.qos_manager.ack(delivery_tag)
+ def requeue(self, delivery_tag):
+ self.qos_manager.requeue(delivery_tag)
+ def message_to_python(self, raw_message):
+ message = self.Message(backend=self, payload=raw_message)
+ self.qos_manager.append(message, message.destination,
+ message.delivery_tag)
+ return message
+ def prepare_message(self, message_data, delivery_mode, priority=0,
+ content_type=None, content_encoding=None):
+ return {"body": message_data,
+ "priority": priority or 0,
+ "content-encoding": content_encoding,
+ "content-type": content_type}
+ def publish(self, message, exchange, routing_key, **kwargs):
+ message["destination"] = exchange
+ self._put(exchange, message)
+ def cancel(self, consumer_tag):
+ queue = self._consumers.pop(consumer_tag, None)
+ self._callbacks.pop(queue, None)
+ def close(self):
+ for consumer_tag in self._consumers.keys():
+ self.cancel(consumer_tag)
+ def basic_qos(self, prefetch_size, prefetch_count, apply_global=False):
+ self._prefetch_count = prefetch_count
+ @property
+ def qos_manager(self):
+ if self._qos_manager is None:
+ self._qos_manager = QualityOfService(self)
+ # Update prefetch count / interval
+ self._qos_manager.prefetch_count = self._prefetch_count
+ self._qos_manager.interval = self.interval
+ return self._qos_manager
diff --git a/kombu/backends/ b/kombu/backends/
new file mode 100644
index 00000000..507bbbfb
--- /dev/null
+++ b/kombu/backends/
@@ -0,0 +1,212 @@
+import asyncore
+import weakref
+import functools
+import itertools
+import pika
+from carrot.backends.base import BaseMessage, BaseBackend
+class Message(BaseMessage):
+ def __init__(self, backend, amqp_message, **kwargs):
+ channel, method, header, body = amqp_message
+ self._channel = channel
+ self._method = method
+ self._header = header
+ self.backend = backend
+ kwargs.update({"body": body,
+ "delivery_tag": method.delivery_tag,
+ "content_type": header.content_type,
+ "content_encoding": header.content_encoding,
+ "delivery_info": dict(
+ consumer_tag=method.consumer_tag,
+ routing_key=method.routing_key,
+ delivery_tag=method.delivery_tag,
+ super(Message, self).__init__(backend, **kwargs)
+class SyncBackend(BaseBackend):
+ default_port = DEFAULT_PORT
+ _connection_cls = pika.BlockingConnection
+ Message = Message
+ def __init__(self, connection, **kwargs):
+ self.connection = connection
+ self.default_port = kwargs.get("default_port", self.default_port)
+ self._channel_ref = None
+ @property
+ def _channel(self):
+ return callable(self._channel_ref) and self._channel_ref()
+ @property
+ def channel(self):
+ """If no channel exists, a new one is requested."""
+ if not self._channel:
+ self._channel_ref = weakref.ref(self.connection.get_channel())
+ return self._channel
+ def establish_connection(self):
+ """Establish connection to the AMQP broker."""
+ conninfo = self.connection
+ if not conninfo.port:
+ conninfo.port = self.default_port
+ credentials = pika.PlainCredentials(conninfo.userid,
+ conninfo.password)
+ return self._connection_cls(pika.ConnectionParameters(
+ conninfo.hostname,
+ port=conninfo.port,
+ virtual_host=conninfo.virtual_host,
+ credentials=credentials))
+ def close_connection(self, connection):
+ """Close the AMQP broker connection."""
+ connection.close()
+ def queue_exists(self, queue):
+ return False # FIXME
+ def queue_delete(self, queue, if_unused=False, if_empty=False):
+ """Delete queue by name."""
+ return, if_unused=if_unused,
+ if_empty=if_empty)
+ def queue_purge(self, queue, **kwargs):
+ """Discard all messages in the queue. This will delete the messages
+ and results in an empty queue."""
+ return
+ def queue_declare(self, queue, durable, exclusive, auto_delete,
+ warn_if_exists=False, arguments=None):
+ """Declare a named queue."""
+ return,
+ durable=durable,
+ exclusive=exclusive,
+ auto_delete=auto_delete,
+ arguments=arguments)
+ def exchange_declare(self, exchange, type, durable, auto_delete):
+ """Declare an named exchange."""
+ return,
+ type=type,
+ durable=durable,
+ auto_delete=auto_delete)
+ def queue_bind(self, queue, exchange, routing_key, arguments={}):
+ """Bind queue to an exchange using a routing key."""
+ if not arguments:
+ arguments = {}
+ return,
+ exchange=exchange,
+ routing_key=routing_key,
+ arguments=arguments)
+ def message_to_python(self, raw_message):
+ """Convert encoded message body back to a Python value."""
+ return self.Message(backend=self, amqp_message=raw_message)
+ def get(self, queue, no_ack=False):
+ """Receive a message from a declared queue by name.
+ :returns: A :class:`Message` object if a message was received,
+ ``None`` otherwise. If ``None`` was returned, it probably means
+ there was no messages waiting on the queue.
+ """
+ raw_message =, no_ack=no_ack)
+ if not raw_message:
+ return None
+ return self.message_to_python(raw_message)
+ def declare_consumer(self, queue, no_ack, callback, consumer_tag,
+ nowait=False):
+ """Declare a consumer."""
+ @functools.wraps(callback)
+ def _callback_decode(channel, method, header, body):
+ return callback((channel, method, header, body))
+ return,
+ queue=queue,
+ no_ack=no_ack,
+ consumer_tag=consumer_tag)
+ def consume(self, limit=None):
+ """Returns an iterator that waits for one message at a time."""
+ for total_message_count in itertools.count():
+ if limit and total_message_count >= limit:
+ raise StopIteration
+ self.connection.connection.drain_events()
+ yield True
+ def cancel(self, consumer_tag):
+ """Cancel a channel by consumer tag."""
+ if not self._channel:
+ return
+ def close(self):
+ """Close the channel if open."""
+ if self._channel and not self._channel.handler.channel_close:
+ self._channel.close()
+ self._channel_ref = None
+ def ack(self, delivery_tag):
+ """Acknowledge a message by delivery tag."""
+ return
+ def reject(self, delivery_tag):
+ """Reject a message by deliver tag."""
+ return, requeue=False)
+ def requeue(self, delivery_tag):
+ """Reject and requeue a message by delivery tag."""
+ return, requeue=True)
+ def prepare_message(self, message_data, delivery_mode, priority=None,
+ content_type=None, content_encoding=None):
+ """Encapsulate data into a AMQP message."""
+ properties = pika.BasicProperties(priority=priority,
+ content_type=content_type,
+ content_encoding=content_encoding,
+ delivery_mode=delivery_mode)
+ return message_data, properties
+ def publish(self, message, exchange, routing_key, mandatory=None,
+ immediate=None, headers=None):
+ """Publish a message to a named exchange."""
+ body, properties = message
+ if headers:
+ properties.headers = headers
+ ret =,
+ properties=properties,
+ exchange=exchange,
+ routing_key=routing_key,
+ mandatory=mandatory,
+ immediate=immediate)
+ if mandatory or immediate:
+ self.close()
+ def qos(self, prefetch_size, prefetch_count, apply_global=False):
+ """Request specific Quality of Service."""
+, prefetch_count,
+ apply_global)
+ def flow(self, active):
+ """Enable/disable flow from peer."""
+class AsyncoreBackend(SyncBackend):
+ _connection_cls = pika.AsyncoreConnection
diff --git a/kombu/backends/ b/kombu/backends/
new file mode 100644
index 00000000..dbc6df69
--- /dev/null
+++ b/kombu/backends/
@@ -0,0 +1,202 @@
+`amqplib`_ backend for carrot.
+.. _`amqplib`:
+import warnings
+import weakref
+from itertools import count
+from amqplib import client_0_8 as amqp
+from amqplib.client_0_8.exceptions import AMQPChannelException
+from kombu.backends.base import BaseMessage, BaseBackend
+class Connection(amqp.Connection):
+ def drain_events(self, allowed_methods=None, timeout=None):
+ """Wait for an event on any channel."""
+ return self.wait_multi(self.channels.values(), timeout=timeout)
+ def wait_multi(self, channels, allowed_methods=None, timeout=None):
+ """Wait for an event on a channel."""
+ chanmap = dict((chan.channel_id, chan) for chan in channels)
+ chanid, method_sig, args, content = self._wait_multiple(
+ chanmap.keys(), allowed_methods, timeout=timeout)
+ channel = chanmap[chanid]
+ if content \
+ and channel.auto_decode \
+ and hasattr(content, 'content_encoding'):
+ try:
+ content.body = content.body.decode(content.content_encoding)
+ except Exception:
+ pass
+ amqp_method = channel._METHOD_MAP.get(method_sig, None)
+ if amqp_method is None:
+ raise Exception('Unknown AMQP method (%d, %d)' % method_sig)
+ if content is None:
+ return amqp_method(channel, args)
+ else:
+ return amqp_method(channel, args, content)
+ def read_timeout(self, timeout=None):
+ if timeout is None:
+ return self.method_reader.read_method()
+ sock = self.transport.sock
+ prev = sock.gettimeout()
+ sock.settimeout(timeout)
+ try:
+ return self.method_reader.read_method()
+ finally:
+ sock.settimeout(prev)
+ def _wait_multiple(self, channel_ids, allowed_methods, timeout=None):
+ for channel_id in channel_ids:
+ method_queue = self.channels[channel_id].method_queue
+ for queued_method in method_queue:
+ method_sig = queued_method[0]
+ if (allowed_methods is None) \
+ or (method_sig in allowed_methods) \
+ or (method_sig == (20, 40)):
+ method_queue.remove(queued_method)
+ method_sig, args, content = queued_method
+ return channel_id, method_sig, args, content
+ # Nothing queued, need to wait for a method from the peer
+ while True:
+ channel, method_sig, args, content = self.read_timeout(timeout)
+ if (channel in channel_ids) \
+ and ((allowed_methods is None) \
+ or (method_sig in allowed_methods) \
+ or (method_sig == (20, 40))):
+ return channel, method_sig, args, content
+ # Not the channel and/or method we were looking for. Queue
+ # this method for later
+ self.channels[channel].method_queue.append((method_sig,
+ args,
+ content))
+ #
+ # If we just queued up a method for channel 0 (the Connection
+ # itself) it's probably a close method in reaction to some
+ # error, so deal with it right away.
+ #
+ if channel == 0:
+ self.wait()
+ def channel(self, channel_id=None):
+ try:
+ return self.channels[channel_id]
+ except KeyError:
+ return Channel(self, channel_id)
+class Message(BaseMessage):
+ """A message received by the broker.
+ Usually you don't insantiate message objects yourself, but receive
+ them using a :class:`carrot.messaging.Consumer`.
+ :param backend: see :attr:`backend`.
+ :param amqp_message: see :attr:`_amqp_message`.
+ .. attribute:: body
+ The message body.
+ .. attribute:: delivery_tag
+ The message delivery tag, uniquely identifying this message.
+ .. attribute:: backend
+ The message backend used.
+ A subclass of :class:`carrot.backends.base.BaseBackend`.
+ .. attribute:: _amqp_message
+ A :class:`amqplib.client_0_8.basic_message.Message` instance.
+ This is a private attribute and should not be accessed by
+ production code.
+ """
+ def __init__(self, channel, amqp_message, **kwargs):
+ self._amqp_message = amqp_message
+ = channel
+ for attr_name in ("body",
+ "delivery_tag",
+ "content_type",
+ "content_encoding",
+ "delivery_info"):
+ kwargs[attr_name] = getattr(amqp_message, attr_name, None)
+ super(Message, self).__init__(backend, **kwargs)
+class Channel(amqp.Channel):
+ Message = Message
+ def prepare_message(self, message_data, priority=None,
+ content_type=None, content_encoding=None, headers=None,
+ properties=None):
+ """Encapsulate data into a AMQP message."""
+ return amqp.Message(message_data, priority=priority,
+ content_type=content_type,
+ content_encoding=content_encoding,
+ properties=properties)
+ def message_to_python(self, raw_message):
+ """Convert encoded message body back to a Python value."""
+ return self.Message(channel=self, amqp_message=raw_message)
+class Backend(BaseBackend):
+ default_port = DEFAULT_PORT
+ def __init__(self, connection, **kwargs):
+ self.connection = connection
+ self.default_port = kwargs.get("default_port") or self.default_port
+ def get_channel(self, connection):
+ return
+ def drain_events(self, connection, **kwargs):
+ return connection.drain_events(**kwargs)
+ def establish_connection(self):
+ """Establish connection to the AMQP broker."""
+ conninfo = self.connection
+ if not conninfo.hostname:
+ raise KeyError("Missing hostname for AMQP connection.")
+ if conninfo.userid is None:
+ raise KeyError("Missing user id for AMQP connection.")
+ if conninfo.password is None:
+ raise KeyError("Missing password for AMQP connection.")
+ if not conninfo.port:
+ conninfo.port = self.default_port
+ return Connection(,
+ userid=conninfo.userid,
+ password=conninfo.password,
+ virtual_host=conninfo.virtual_host,
+ insist=conninfo.insist,
+ ssl=conninfo.ssl,
+ connect_timeout=conninfo.connect_timeout)
+ def close_connection(self, connection):
+ """Close the AMQP broker connection."""
+ connection.close()
diff --git a/kombu/ b/kombu/
new file mode 100644
index 00000000..64727dc1
--- /dev/null
+++ b/kombu/
@@ -0,0 +1,79 @@
+from kombu.backends import get_backend_cls
+class BrokerConnection(object):
+ def __init__(self, hostname="localhost", userid="guest",
+ password="guest", virtual_host="/", port=None, **kwargs):
+ self.hostname = hostname
+ self.userid = userid
+ self.password = password
+ self.virtual_host = virtual_host or self.virtual_host
+ self.port = port or self.port
+ self.insist = kwargs.get("insist", False)
+ self.connect_timeout = kwargs.get("connect_timeout", 5)
+ self.ssl = kwargs.get("ssl", False)
+ self.backend_cls = kwargs.get("backend_cls", None)
+ self._closed = None
+ self._connection = None
+ self._backend = None
+ def __enter__(self):
+ return self
+ def __exit__(self, e_type, e_value, e_trace):
+ self.close()
+ def _establish_connection(self):
+ return self.backend.establish_connection()
+ @property
+ def connection(self):
+ if self._closed:
+ return
+ if not self._connection:
+ self._connection = self._establish_connection()
+ self._closed = False
+ return self._connection
+ @property
+ def host(self):
+ """The host as a hostname/port pair separated by colon."""
+ return ":".join([self.hostname, str(self.port)])
+ def get_backend_cls(self):
+ """Get the currently used backend class."""
+ backend_cls = self.backend_cls
+ if not backend_cls or isinstance(backend_cls, basestring):
+ backend_cls = get_backend_cls(backend_cls)
+ return backend_cls
+ def connect(self):
+ """Establish a connection to the AMQP server."""
+ self._closed = False
+ return self.connection
+ def channel(self):
+ """Request a new AMQP channel."""
+ return self.backend.create_channel(self.connection)
+ def drain_events(self, **kwargs):
+ return self.backend.drain_events(self.connection, **kwargs)
+ def close(self):
+ """Close the currently open connection."""
+ try:
+ if self._connection:
+ self.backend.close_connection(self._connection)
+ except socket.error:
+ pass
+ self._closed = True
+ def _create_backend(self):
+ return self.get_backend_cls()(connection=self)
+ @property
+ def backend(self):
+ if self._backend is None:
+ self._backend = self._create_backend()
+ return self._backend
diff --git a/kombu/ b/kombu/
new file mode 100644
index 00000000..c2269a23
--- /dev/null
+++ b/kombu/
@@ -0,0 +1,116 @@
+class Exchange(object):
+ }
+ name = ""
+ type = "direct"
+ routing_key = ""
+ durable = True
+ auto_delete = False
+ _init_opts = ("durable", "auto_delete",
+ "delivery_mode", "auto_declare")
+ def __init__(self, name="", type="", routing_key=None, **kwargs):
+ = name or
+ self.type = type or self.type
+ self.routing_key = routing_key or self.routing_key
+ for opt_name in self._init_opts:
+ opt_value = kwargs.get(opt_name)
+ if opt_value is not None:
+ setattr(self, opt_name, opt_value)
+ self.delivery_mode = self.DELIVERY_MODES.get(self.delivery_mode,
+ self.delivery_mode)
+ def declare(self, channel):
+ """Declare the exchange.
+ Creates the exchange on the broker.
+ """
+ channel.exchange_declare(,
+ type=self.type,
+ durable=self.durable,
+ auto_delete=self.auto_delete)
+ def create_message(self, channel, message_data, delivery_mode=None,
+ priority=None, content_type=None, content_encoding=None,
+ properties=None):
+ properties["delivery_mode"] = delivery_mode or self.delivery_mode
+ return channel.prepare_message(message_data,
+ properties=properties,
+ priority=priority,
+ content_type=content_type,
+ content_encoding=content_encoding)
+ def publish(self, channel, message, routing_key=None,
+ mandatory=False, immediate=False):
+ if routing_key is None:
+ routing_key = self.routing_key
+ channel.basic_publish(message,
+, routing_key=routing_key,
+ mandatory=mandatory, immediate=immediate,
+ headers=headers)
+class Binding(object):
+ name = ""
+ exchange = None
+ routing_key = ""
+ durable = True
+ exclusive = False
+ auto_delete = False
+ warn_if_exists = False
+ _init_opts = ("durable", "exclusive", "auto_delete",
+ "warn_if_exists")
+ def __init__(self, name=None, exchange=None, routing_key=None, **kwargs):
+ # Binding.
+ = name or
+ = exchange or
+ self.routing_key = routing_key or self.routing_key
+ # Options
+ for opt_name in self._init_opts:
+ opt_value = kwargs.get(opt_name)
+ if opt_value is not None:
+ setattr(self, opt_name, opt_value)
+ # exclusive implies auto-delete.
+ if self.exclusive:
+ self.auto_delete = True
+ def declare(self, channel):
+ """Declares the queue, the exchange and binds the queue to
+ the exchange."""
+ if
+ if
+ channel.queue_declare(, durable=self.durable,
+ exclusive=self.exclusive,
+ auto_delete=self.auto_delete)
+ channel.queue_bind(,
+ routing_key=self.routing_key)
+ def get(self, channel, no_ack=None):
+ message = channel.basic_get(, no_ack=no_ack)
+ if message:
+ return channel.message_to_python(message)
+ def purge(self, channel):
+ return channel.queue_purge(
+ def consume(self, channel, consumer_tag, callback, no_ack=None,
+ nowait=True):
+ return channel.consume(, no_ack=no_ack,
+ consumer_tag=consumer_tag,
+ callback=callback,
+ nowait=nowait)
+ def cancel(self, channel, consumer_tag):
+ channel.basic_cancel(consumer_tag)
diff --git a/kombu/ b/kombu/
new file mode 100644
index 00000000..cdfae69a
--- /dev/null
+++ b/kombu/
@@ -0,0 +1,136 @@
+from itertools import count
+from kombu import serialization
+from kombu.entity import Exchange, Binding
+class Producer(object):
+ exchange = None
+ serializer = None
+ auto_declare = True
+ def __init__(self, channel, exchange=None, serializer=None,
+ auto_declare=None):
+ = channel
+ = exchange or
+ self.serializer = serializer or self.serializer
+ if auto_declare is not None:
+ self.auto_declare = auto_declare
+ if and self.auto_declare:
+ def prepare(self, message_data, serializer=None,
+ content_type=None, content_encoding=None):
+ # No content_type? Then we're serializing the data internally.
+ if not content_type:
+ serializer = serializer or self.serializer
+ (content_type, content_encoding,
+ message_data) = serialization.encode(message_data,
+ serializer=serializer)
+ else:
+ # If the programmer doesn't want us to serialize,
+ # make sure content_encoding is set.
+ if isinstance(message_data, unicode):
+ if not content_encoding:
+ content_encoding = 'utf-8'
+ message_data = message_data.encode(content_encoding)
+ # If they passed in a string, we can't know anything
+ # about it. So assume it's binary data.
+ elif not content_encoding:
+ content_encoding = 'binary'
+ return message_data, content_type, content_encoding
+ def publish(self, message_data, routing_key=None, delivery_mode=None,
+ mandatory=False, immediate=False, priority=0, content_type=None,
+ content_encoding=None, serializer=None):
+ message_data, content_type, content_encoding = self.prepare(
+ message_data, content_type, content_encoding)
+ message =, message_data,
+ delivery_mode, priority, content_type, content_encoding)
+ return, routing_key, mandatory,
+ immediate)
+_next_tag = count(1).next
+class Consumer(object):
+ no_ack = False
+ auto_declare = True
+ callbacks = None
+ def __init__(self, channel, bindings, no_ack=None, auto_declare=None,
+ callbacks=None):
+ = channel
+ self.bindings = bindings
+ if no_ack is not None:
+ self.no_ack = no_ack
+ if auto_declare is not None:
+ self.auto_declare = auto_declare
+ if callbacks is not None:
+ self.callbacks = callbacks
+ if self.callbacks is None:
+ self.callbacks = []
+ # bindings maybe list
+ if not hasattr(self.bindings, "__iter__"):
+ self.bindings = [self.bindings]
+ if self.auto_declare:
+ for binding in self.bindings:
+ binding.declare(
+ self._active_tags = {}
+ self._declare_consumer()
+ def __enter__(self):
+ return self
+ def __exit__(self):
+ self.cancel()
+ def _consume(self):
+ H, T = self.bindings[:-1], self.bindings[-1]
+ for binding in H:
+ binding.consume(, self._add_tag(binding),
+ self._receive_callback, self.no_ack,
+ nowait=True)
+ T.consume(, self._add_tag(T),
+ self._receive_callback,
+ self.no_ack, nowait=False)
+ def _add_tag(self, binding):
+ tag = self._active_tags[binding] = str(_next_tag())
+ return tag
+ def _receive_callback(self, raw_message):
+ message =
+ self.receive(message.payload, message)
+ def receive(self, message_data, message):
+ if not self.callbacks:
+ raise NotImplementedError("No consumer callbacks registered")
+ for callback in self.callbacks:
+ callback(message_data, message)
+ def register_callback(self, callback):
+ self.callbacks.append(callback)
+ def purge(self):
+ for binding in self.bindings:
+ self.binding.purge(
+ def cancel(self):
+ for binding, tag in self._active_tags.items():
+ binding.cancel(, tag)
+ def flow(self, active):
+ def qos(self, prefetch_size=0, prefetch_count=0, apply_global=False):
+, prefetch_count, apply_global)
diff --git a/kombu/ b/kombu/
new file mode 100644
index 00000000..0e6d1f66
--- /dev/null
+++ b/kombu/
@@ -0,0 +1,279 @@
+Centralized support for encoding/decoding of data structures.
+Requires a json library (`cjson`_, `simplejson`_, or `Python 2.6+`_).
+Pickle support is built-in.
+Optionally installs support for ``YAML`` if the `PyYAML`_ package
+is installed.
+Optionally installs support for `msgpack`_ if the `msgpack-python`_
+package is installed.
+.. _`cjson`:
+.. _`simplejson`:
+.. _`Python 2.6+`:
+.. _`PyYAML`:
+.. _`msgpack`:
+.. _`msgpack-python`:
+import codecs
+__all__ = ['SerializerNotInstalled', 'registry']
+class SerializerNotInstalled(StandardError):
+ """Support for the requested serialization type is not installed"""
+class SerializerRegistry(object):
+ """The registry keeps track of serialization methods."""
+ def __init__(self):
+ self._encoders = {}
+ self._decoders = {}
+ self._default_encode = None
+ self._default_content_type = None
+ self._default_content_encoding = None
+ def register(self, name, encoder, decoder, content_type,
+ content_encoding='utf-8'):
+ """Register a new encoder/decoder.
+ :param name: A convenience name for the serialization method.
+ :param encoder: A method that will be passed a python data structure
+ and should return a string representing the serialized data.
+ If ``None``, then only a decoder will be registered. Encoding
+ will not be possible.
+ :param decoder: A method that will be passed a string representing
+ serialized data and should return a python data structure.
+ If ``None``, then only an encoder will be registered.
+ Decoding will not be possible.
+ :param content_type: The mime-type describing the serialized
+ structure.
+ :param content_encoding: The content encoding (character set) that
+ the :param:`decoder` method will be returning. Will usually be
+ ``utf-8``, ``us-ascii``, or ``binary``.
+ """
+ if encoder:
+ self._encoders[name] = (content_type, content_encoding, encoder)
+ if decoder:
+ self._decoders[content_type] = decoder
+ def _set_default_serializer(self, name):
+ """
+ Set the default serialization method used by this library.
+ :param name: The name of the registered serialization method.
+ For example, ``json`` (default), ``pickle``, ``yaml``,
+ or any custom methods registered using :meth:`register`.
+ :raises SerializerNotInstalled: If the serialization method
+ requested is not available.
+ """
+ try:
+ (self._default_content_type, self._default_content_encoding,
+ self._default_encode) = self._encoders[name]
+ except KeyError:
+ raise SerializerNotInstalled(
+ "No encoder installed for %s" % name)
+ def encode(self, data, serializer=None):
+ """
+ Serialize a data structure into a string suitable for sending
+ as an AMQP message body.
+ :param data: The message data to send. Can be a list,
+ dictionary or a string.
+ :keyword serializer: An optional string representing
+ the serialization method you want the data marshalled
+ into. (For example, ``json``, ``raw``, or ``pickle``).
+ If ``None`` (default), then `JSON`_ will be used, unless
+ ``data`` is a ``str`` or ``unicode`` object. In this
+ latter case, no serialization occurs as it would be
+ unnecessary.
+ Note that if ``serializer`` is specified, then that
+ serialization method will be used even if a ``str``
+ or ``unicode`` object is passed in.
+ :returns: A three-item tuple containing the content type
+ (e.g., ``application/json``), content encoding, (e.g.,
+ ``utf-8``) and a string containing the serialized
+ data.
+ :raises SerializerNotInstalled: If the serialization method
+ requested is not available.
+ """
+ if serializer == "raw":
+ return raw_encode(data)
+ if serializer and not self._encoders.get(serializer):
+ raise SerializerNotInstalled(
+ "No encoder installed for %s" % serializer)
+ # If a raw string was sent, assume binary encoding
+ # (it's likely either ASCII or a raw binary file, but 'binary'
+ # charset will encompass both, even if not ideal.
+ if not serializer and isinstance(data, str):
+ # In Python 3+, this would be "bytes"; allow binary data to be
+ # sent as a message without getting encoder errors
+ return "application/data", "binary", data
+ # For unicode objects, force it into a string
+ if not serializer and isinstance(data, unicode):
+ payload = data.encode("utf-8")
+ return "text/plain", "utf-8", payload
+ if serializer:
+ content_type, content_encoding, encoder = \
+ self._encoders[serializer]
+ else:
+ encoder = self._default_encode
+ content_type = self._default_content_type
+ content_encoding = self._default_content_encoding
+ payload = encoder(data)
+ return content_type, content_encoding, payload
+ def decode(self, data, content_type, content_encoding):
+ """Deserialize a data stream as serialized using ``encode``
+ based on :param:`content_type`.
+ :param data: The message data to deserialize.
+ :param content_type: The content-type of the data.
+ (e.g., ``application/json``).
+ :param content_encoding: The content-encoding of the data.
+ (e.g., ``utf-8``, ``binary``, or ``us-ascii``).
+ :returns: The unserialized data.
+ """
+ content_type = content_type or 'application/data'
+ content_encoding = (content_encoding or 'utf-8').lower()
+ # Don't decode 8-bit strings or unicode objects
+ if content_encoding not in ('binary', 'ascii-8bit') and \
+ not isinstance(data, unicode):
+ data = codecs.decode(data, content_encoding)
+ try:
+ decoder = self._decoders[content_type]
+ except KeyError:
+ return data
+ return decoder(data)
+.. data:: registry
+Global registry of serializers/deserializers.
+registry = SerializerRegistry()
+.. function:: encode(data, serializer=default_serializer)
+Encode data using the registry's default encoder.
+encode = registry.encode
+.. function:: decode(data, content_type, content_encoding):
+Decode data using the registry's default decoder.
+decode = registry.decode
+def raw_encode(data):
+ """Special case serializer."""
+ content_type = 'application/data'
+ payload = data
+ if isinstance(payload, unicode):
+ content_encoding = 'utf-8'
+ payload = payload.encode(content_encoding)
+ else:
+ content_encoding = 'binary'
+ return content_type, content_encoding, payload
+def register_json():
+ """Register a encoder/decoder for JSON serialization."""
+ from anyjson import serialize as json_serialize
+ from anyjson import deserialize as json_deserialize
+ registry.register('json', json_serialize, json_deserialize,
+ content_type='application/json',
+ content_encoding='utf-8')
+def register_yaml():
+ """Register a encoder/decoder for YAML serialization.
+ It is slower than JSON, but allows for more data types
+ to be serialized. Useful if you need to send data such as dates"""
+ try:
+ import yaml
+ registry.register('yaml', yaml.safe_dump, yaml.safe_load,
+ content_type='application/x-yaml',
+ content_encoding='utf-8')
+ except ImportError:
+ def not_available(*args, **kwargs):
+ """In case a client receives a yaml message, but yaml
+ isn't installed."""
+ raise SerializerNotInstalled(
+ "No decoder installed for YAML. Install the PyYAML library")
+ registry.register('yaml', None, not_available, 'application/x-yaml')
+def register_pickle():
+ """The fastest serialization method, but restricts
+ you to python clients."""
+ import cPickle
+ registry.register('pickle', cPickle.dumps, cPickle.loads,
+ content_type='application/x-python-serialize',
+ content_encoding='binary')
+def register_msgpack():
+ """See"""
+ try:
+ import msgpack
+ registry.register('msgpack', msgpack.packs, msgpack.unpacks,
+ content_type='application/x-msgpack',
+ content_encoding='utf-8')
+ except ImportError:
+ def not_available(*args, **kwargs):
+ """In case a client receives a msgpack message, but yaml
+ isn't installed."""
+ raise SerializerNotInstalled(
+ "No decoder installed for msgpack. "
+ "Install the msgpack library")
+ registry.register('msgpack', None, not_available,
+ 'application/x-msgpack')
+# Register the base serialization methods.
+# JSON is assumed to always be available, so is the default.
+# (this matches the historical use of kombu.)
diff --git a/kombu/ b/kombu/
new file mode 100644
index 00000000..37ba1aed
--- /dev/null
+++ b/kombu/
@@ -0,0 +1,66 @@
+from uuid import UUID, uuid4, _uuid_generate_random
+ import ctypes
+except ImportError:
+ ctypes = None
+def gen_unique_id():
+ """Generate a unique id, having - hopefully - a very small chance of
+ collission.
+ For now this is provided by :func:`uuid.uuid4`.
+ """
+ # Workaround for
+ if ctypes and _uuid_generate_random:
+ buffer = ctypes.create_string_buffer(16)
+ _uuid_generate_random(buffer)
+ return str(UUID(bytes=buffer.raw))
+ return str(uuid4())
+def _compat_rl_partition(S, sep, direction=None):
+ if direction is None:
+ direction = S.split
+ items = direction(sep, 1)
+ if len(items) == 1:
+ return items[0], sep, ''
+ return items[0], sep, items[1]
+def _compat_partition(S, sep):
+ """``partition(S, sep) -> (head, sep, tail)``
+ Search for the separator ``sep`` in ``S``, and return the part before
+ it, the separator itself, and the part after it. If the separator is not
+ found, return ``S`` and two empty strings.
+ """
+ return _compat_rl_partition(S, sep, direction=S.split)
+def _compat_rpartition(S, sep):
+ """``rpartition(S, sep) -> (tail, sep, head)``
+ Search for the separator ``sep`` in ``S``, starting at the end of ``S``,
+ and return the part before it, the separator itself, and the part
+ after it. If the separator is not found, return two empty
+ strings and ``S``.
+ """
+ return _compat_rl_partition(S, sep, direction=S.rsplit)
+def partition(S, sep):
+ if hasattr(S, 'partition'):
+ return S.partition(sep)
+ else: # Python <= 2.4:
+ return _compat_partition(S, sep)
+def rpartition(S, sep):
+ if hasattr(S, 'rpartition'):
+ return S.rpartition(sep)
+ else: # Python <= 2.4:
+ return _compat_rpartition(S, sep)
diff --git a/setup.cfg b/setup.cfg
new file mode 100644
index 00000000..fd518807
--- /dev/null
+++ b/setup.cfg
@@ -0,0 +1,16 @@
+verbosity = 1
+detailed-errors = 1
+source-dir = docs/
+build-dir = docs/.build
+all_files = 1
+upload-dir = docs/.build/html
+requires = python-anyjson
+ python-amqplib >= 0.6
diff --git a/ b/
new file mode 100644
index 00000000..581788ef
--- /dev/null
+++ b/
@@ -0,0 +1,92 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import os
+import codecs
+ from setuptools import setup, find_packages
+except ImportError:
+ from ez_setup import use_setuptools
+ use_setuptools()
+ from setuptools import setup, find_packages
+from distutils.command.install_data import install_data
+from distutils.command.install import INSTALL_SCHEMES
+import sys
+import kombu
+packages, data_files = [], []
+root_dir = os.path.dirname(__file__)
+if root_dir != '':
+ os.chdir(root_dir)
+src_dir = "kombu"
+def osx_install_data(install_data):
+ def finalize_options(self):
+ self.set_undefined_options("install", ("install_lib", "install_dir"))
+ install_data.finalize_options(self)
+def fullsplit(path, result=None):
+ if result is None:
+ result = []
+ head, tail = os.path.split(path)
+ if head == '':
+ return [tail] + result
+ if head == path:
+ return result
+ return fullsplit(head, [tail] + result)
+for scheme in INSTALL_SCHEMES.values():
+ scheme['data'] = scheme['purelib']
+for dirpath, dirnames, filenames in os.walk(src_dir):
+ # Ignore dirnames that start with '.'
+ for i, dirname in enumerate(dirnames):
+ if dirname.startswith("."):
+ del dirnames[i]
+ for filename in filenames:
+ if filename.endswith(".py"):
+ packages.append('.'.join(fullsplit(dirpath)))
+ else:
+ data_files.append([dirpath, [os.path.join(dirpath, f) for f in
+ filenames]])
+if os.path.exists("README.rst"):
+ long_description ='README.rst', "r", "utf-8").read()
+ long_description = "See"
+ name='kombu',
+ version=kombu.__version__,
+ description=kombu.__doc__,
+ author=kombu.__author__,
+ author_email=kombu.__contact__,
+ url=kombu.__homepage__,
+ platforms=["any"],
+ packages=packages,
+ data_files=data_files,
+ zip_safe=False,
+ test_suite="nose.collector",
+ install_requires=[
+ 'anyjson',
+ 'amqplib>=0.6',
+ ],
+ classifiers=[
+ "Development Status :: 4 - Beta",
+ "Framework :: Django",
+ "Operating System :: OS Independent",
+ "Programming Language :: Python",
+ "License :: OSI Approved :: BSD License",
+ "Intended Audience :: Developers",
+ "Topic :: Communications",
+ "Topic :: System :: Distributed Computing",
+ "Topic :: Software Development :: Libraries :: Python Modules",
+ ],
+ long_description=long_description,