summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--AUTHORS1
-rw-r--r--Changelog84
-rw-r--r--LICENSE1
-rw-r--r--README.rst5
-rw-r--r--docs/conf.py2
-rw-r--r--docs/reference/index.rst6
-rw-r--r--docs/reference/kombu.rst18
-rw-r--r--docs/reference/kombu.simple.rst4
-rw-r--r--docs/reference/kombu.transport.pika.rst46
-rw-r--r--docs/reference/kombu.transport.pika2.rst34
-rw-r--r--docs/reference/kombu.transport.zmq.rst4
-rw-r--r--docs/reference/kombu.transport.zookeeper.rst4
-rw-r--r--docs/reference/kombu.utils.eventio.rst11
-rw-r--r--docs/userguide/connections.rst7
-rw-r--r--docs/userguide/consumers.rst7
-rw-r--r--docs/userguide/producers.rst5
-rw-r--r--docs/userguide/serialization.rst28
-rw-r--r--docs/userguide/simple.rst2
-rw-r--r--examples/simple_receive.py1
-rw-r--r--examples/simple_task_queue/worker.py2
-rwxr-xr-xextra/release/bump_version.py3
-rwxr-xr-xextra/release/doc4allmods3
-rwxr-xr-xextra/release/flakeplus.py126
-rw-r--r--funtests/setup.py1
-rw-r--r--funtests/tests/test_pika.py38
-rw-r--r--funtests/transport.py2
-rw-r--r--kombu/__init__.py8
-rw-r--r--kombu/abstract.py12
-rw-r--r--kombu/clocks.py10
-rw-r--r--kombu/common.py9
-rw-r--r--kombu/compat.py3
-rw-r--r--kombu/compression.py3
-rw-r--r--kombu/connection.py36
-rw-r--r--kombu/entity.py34
-rw-r--r--kombu/exceptions.py3
-rw-r--r--kombu/messaging.py17
-rw-r--r--kombu/mixins.py3
-rw-r--r--kombu/pidbox.py8
-rw-r--r--kombu/pools.py22
-rw-r--r--kombu/serialization.py5
-rw-r--r--kombu/simple.py3
-rw-r--r--kombu/syn.py3
-rw-r--r--kombu/tests/mocks.py2
-rw-r--r--kombu/tests/test_clocks.py27
-rw-r--r--kombu/tests/test_common.py172
-rw-r--r--kombu/tests/test_connection.py174
-rw-r--r--kombu/tests/test_entities.py109
-rw-r--r--kombu/tests/test_messaging.py39
-rw-r--r--kombu/tests/test_pools.py35
-rw-r--r--kombu/tests/test_utils.py125
-rw-r--r--kombu/tests/transport/test_amqplib.py2
-rw-r--r--kombu/tests/transport/test_filesystem.py4
-rw-r--r--kombu/tests/transport/test_memory.py4
-rw-r--r--kombu/tests/transport/test_mongodb.py2
-rw-r--r--kombu/tests/transport/test_pyamqp.py26
-rw-r--r--kombu/tests/transport/test_redis.py33
-rw-r--r--kombu/tests/transport/test_sqlalchemy.py2
-rw-r--r--kombu/tests/transport/virtual/test_base.py29
-rw-r--r--kombu/tests/utilities/test_amq_manager.py35
-rw-r--r--kombu/tests/utilities/test_debug.py58
-rw-r--r--kombu/transport/SQS.py5
-rw-r--r--kombu/transport/__init__.py5
-rw-r--r--kombu/transport/amqplib.py9
-rw-r--r--kombu/transport/base.py7
-rw-r--r--kombu/transport/librabbitmq.py6
-rw-r--r--kombu/transport/memory.py3
-rw-r--r--kombu/transport/mongodb.py2
-rw-r--r--kombu/transport/pika.py262
-rw-r--r--kombu/transport/pika2.py235
-rw-r--r--kombu/transport/pyamqp.py5
-rw-r--r--kombu/transport/redis.py22
-rw-r--r--kombu/transport/virtual/__init__.py5
-rw-r--r--kombu/transport/virtual/exchange.py3
-rw-r--r--kombu/transport/virtual/scheduling.py3
-rw-r--r--kombu/transport/zmq.py23
-rw-r--r--kombu/transport/zookeeper.py71
-rw-r--r--kombu/utils/__init__.py11
-rw-r--r--kombu/utils/amq_manager.py6
-rw-r--r--kombu/utils/compat.py3
-rw-r--r--kombu/utils/debug.py3
-rw-r--r--kombu/utils/encoding.py3
-rw-r--r--kombu/utils/eventio.py9
-rw-r--r--kombu/utils/limits.py3
-rw-r--r--requirements/funtest.txt3
-rw-r--r--setup.cfg6
-rw-r--r--tox.ini2
86 files changed, 1241 insertions, 976 deletions
diff --git a/AUTHORS b/AUTHORS
index 7400df31..d5007ea8 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -46,6 +46,7 @@ Mher Movsisyan <mher.movsisyan@gmail.com>
Michael Barrett <mb@eventbrite.com>
Nitzan Miron <bug.assembla@bugbug.me>
Noah Kantrowitz <noah@coderanger.net>
+Ollie Walsh <ollie.walsh@geemail.kom>
Pascal Hartig <phartig@rdrei.net>
Patrick Schneider <patrick.p2k.schneider@gmail.com>
Paul McLanahan <paul@mclanahan.net>
diff --git a/Changelog b/Changelog
index cc7400f9..0edbc49e 100644
--- a/Changelog
+++ b/Changelog
@@ -8,7 +8,7 @@
2.5.0
=====
-:release-date: TBA
+:release-date: 2012-11-27 04:00 P.M UTC
- `py-amqp`_ is now the new default transport, replacing ``amqplib``.
@@ -27,7 +27,7 @@
be compiled. librabbitmq will be updated to support all the same
features as py-amqp.
-- Support for multiple connection URL's used for failover.
+- Support for using multiple connection URL's for failover.
The first argument to :class:`~kombu.Connection` can now be a list of
connection URLs:
@@ -62,7 +62,7 @@
- Now supports PyDev, PyCharm, pylint and other static code analysis tools.
-- :class:`~kombu.entity.Queue` now supports multiple bindings.
+- :class:`~kombu.Queue` now supports multiple bindings.
You can now have multiple bindings in the same queue by having
the second argument be a list:
@@ -79,8 +79,8 @@
To enable this, helper methods have been added:
- - :meth:`~kombu.entity.Queue.bind_to`
- - :meth:`~kombu.entity.Queue.unbind_from`
+ - :meth:`~kombu.Queue.bind_to`
+ - :meth:`~kombu.Queue.unbind_from`
Contributed by Rumyana Neykova.
@@ -95,7 +95,7 @@
the :mod:`kombu.common` module.
- Consumer now supports a ``on_message`` callback that can be used to process
- raw undecoded messages.
+ raw messages (not decoded).
Other callbacks specified using the ``callbacks`` argument, and
the ``receive`` method will be not be called when a on message callback
@@ -108,7 +108,7 @@
- Support for exchange-to-exchange bindings.
- The :class:`~kombu.entity.Exchange` entity gained ``bind_to``
+ The :class:`~kombu.Exchange` entity gained ``bind_to``
and ``unbind_from`` methods:
.. code-block:: python
@@ -123,32 +123,43 @@
Contributed by Rumyana Neykova.
-- Redis: Unacked message restore limit is now unlimited by default.
+.. _version-2.4.10:
- Also, the limit can now be configured using the ``unacked_restore_limit``
- transport option:
+2.4.10
+======
+:release-date: 2012-11-22 06:00 P.M UTC
- .. code-block:: python
+- The previous versions connection pool changes broke Redis support so that
+ it would always connect to localhost (default setting) no matter what
+ connection parameters were provided (Issue #176).
- Connection('redis://', transport_options={
- 'unacked_restore_limit': 100,
- })
+.. _version-2.4.9:
- A limit of 100 means that the consumer will restore at most 100
- messages at each pass.
+2.4.9
+=====
+:release-date: 2012-11-21 03:00 P.M UTC
-- Redis: Now uses a mutex to ensure only one consumer restores messages at a
- time.
+- Redis: Fixed race condition that could occur while trying to restore
+ messages (Issue #171).
- The mutex expires after 5 minutes by default, but can be configured
- using the ``unacked_mutex_expire`` transport option.
+ Fix contributed by Ollie Walsh.
- Redis: Each channel is now using a specific connection pool instance,
which is disconnected on connection failure.
-- amqplib: Fixed bug with timeouts when SSL is used in non-blocking mode.
+- ProducerPool: Fixed possible dead-lock in the acquire method.
- Fix contributed by Mher Movsisyan
+- ProducerPool: ``force_close_all`` no longer tries to call the non-existent
+ ``Producer._close``.
+
+- librabbitmq: Now implements ``transport.verify_connection`` so that
+ connection pools will not give back connections that are no longer working.
+
+- New and better ``repr()`` for Queue and Exchange objects.
+
+- Python3: Fixed problem with running the unit test suite.
+
+- Python3: Fixed problem with JSON codec.
.. _version-2.4.8:
@@ -156,14 +167,29 @@
=====
:release-date: 2012-11-02 05:00 P.M UTC
-- Redis: Fair queue cyle implementation improved (Issue #166).
+- Redis: Improved fair queue cycle implementation (Issue #166).
Contributed by Kevin McCarthy.
-- Redis: Number of messages to restore in one iteration is now unlimited,
- but can be configured using the unacked_restore_limit transport option.
+- Redis: Unacked message restore limit is now unlimited by default.
-- Redis: A Redis based mutex is now used while restoring messages.
+ Also, the limit can now be configured using the ``unacked_restore_limit``
+ transport option:
+
+ .. code-block:: python
+
+ Connection('redis://', transport_options={
+ 'unacked_restore_limit': 100,
+ })
+
+ A limit of 100 means that the consumer will restore at most 100
+ messages at each pass.
+
+- Redis: Now uses a mutex to ensure only one consumer restores messages at a
+ time.
+
+ The mutex expires after 5 minutes by default, but can be configured
+ using the ``unacked_mutex_expire`` transport option.
- LamportClock.adjust now returns the new clock value.
@@ -177,6 +203,10 @@
Fix contributed by Jasper Bryant-Greene
+- amqplib: Fixed bug with timeouts when SSL is used in non-blocking mode.
+
+ Fix contributed by Mher Movsisyan
+
.. _version-2.4.7:
@@ -627,6 +657,8 @@ News
Contributed by Mahendra M.
+.. _`Apache ZooKeeper`: http://zookeeper.apache.org/
+
- Redis: Priority support.
The message's ``priority`` field is now respected by the Redis
diff --git a/LICENSE b/LICENSE
index c453a3b2..3234c403 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,3 +1,4 @@
+Copyright (c) 2012 VMware, Inc. All rights reserved.
Copyright (c) 2009-2012, Ask Solem & contributors.
All rights reserved.
diff --git a/README.rst b/README.rst
index b40e5cb5..a2b84029 100644
--- a/README.rst
+++ b/README.rst
@@ -2,7 +2,7 @@
kombu - Messaging Framework for Python
========================================
-:Version: 2.5.0rc1
+:Version: 2.5.0
`Kombu` is a messaging framework for Python.
@@ -68,7 +68,6 @@ and the `Wikipedia article about AMQP`_.
.. _`Beanstalk`: http://kr.github.com/beanstalkd/
.. _`Rabbits and warrens`: http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/
.. _`amqplib`: http://barryp.org/software/py-amqplib/
-.. _`pika`: http://github.com/pika/pika
.. _`Wikipedia article about AMQP`: http://en.wikipedia.org/wiki/AMQP
.. _`carrot`: http://pypi.python.org/pypi/carrot/
.. _`librabbitmq`: http://pypi.python.org/pypi/librabbitmq
@@ -90,8 +89,6 @@ Transport Comparison
+---------------+----------+------------+------------+---------------+
| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ |
+---------------+----------+------------+------------+---------------+
-| *pika* | Native | Yes | Yes | Yes |
-+---------------+----------+------------+------------+---------------+
| *couchdb* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No |
diff --git a/docs/conf.py b/docs/conf.py
index af7be574..27534746 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -46,7 +46,7 @@ exclude_trees = ['.build']
add_function_parentheses = True
# The name of the Pygments (syntax highlighting) style to use.
-pygments_style = 'trac'
+pygments_style = 'colorful'
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
diff --git a/docs/reference/index.rst b/docs/reference/index.rst
index 1cdb4bc2..c09075a0 100644
--- a/docs/reference/index.rst
+++ b/docs/reference/index.rst
@@ -25,9 +25,6 @@
kombu.transport
kombu.transport.pyamqp
kombu.transport.librabbitmq
- kombu.transport.pika
- kombu.transport.pika2
- kombu.transport.amqplib
kombu.transport.memory
kombu.transport.redis
kombu.transport.zmq
@@ -43,15 +40,18 @@
kombu.transport.sqlalchemy
kombu.transport.sqlalchemy.models
kombu.transport.SQS
+ kombu.transport.amqplib
kombu.transport.base
kombu.transport.virtual
kombu.transport.virtual.exchange
kombu.transport.virtual.scheduling
kombu.serialization
kombu.utils
+ kombu.utils.eventio
kombu.utils.limits
kombu.utils.compat
kombu.utils.debug
kombu.utils.encoding
kombu.utils.functional
kombu.utils.url
+ kombu.utils.amq_manager
diff --git a/docs/reference/kombu.rst b/docs/reference/kombu.rst
index 71aa62ec..46fceeaf 100644
--- a/docs/reference/kombu.rst
+++ b/docs/reference/kombu.rst
@@ -1,11 +1,11 @@
.. currentmodule:: kombu
-.. automodule:: kombu
+.. contents::
+ :local:
- .. contents::
- :local:
+.. automodule:: kombu
- Connection
+ Connection
----------
.. autoclass:: Connection
@@ -73,8 +73,6 @@
.. automethod:: SimpleQueue
.. automethod:: SimpleBuffer
-
-
Exchange
--------
@@ -110,9 +108,9 @@
Example creating a queue using our exchange in the :class:`Exchange`
example::
- >>> science_news = Queue("science_news",
+ >>> science_news = Queue('science_news',
... exchange=news_exchange,
- ... routing_key="news.science")
+ ... routing_key='news.science')
For now `science_news` is just a declaration, you can't perform
actions on it. It just describes the name and options for the queue.
@@ -124,7 +122,9 @@
>>> bound_science_news = science_news(channel)
- Now you can perform operations like :meth:`declare` or :meth:`purge`::
+ Now you can perform operations like :meth:`declare` or :meth:`purge`:
+
+ .. code-block:: python
>>> bound_sicence_news.declare()
>>> bound_science_news.purge()
diff --git a/docs/reference/kombu.simple.rst b/docs/reference/kombu.simple.rst
index bee729df..b43e2ec8 100644
--- a/docs/reference/kombu.simple.rst
+++ b/docs/reference/kombu.simple.rst
@@ -28,7 +28,7 @@
.. attribute:: queue
- :class:`~kombu.entity.Queue` to consume from (if consuming).
+ :class:`~kombu.Queue` to consume from (if consuming).
.. attribute:: queue_opts
@@ -69,7 +69,7 @@
.. attribute:: queue
- :class:`~kombu.entity.Queue` to consume from (if consuming).
+ :class:`~kombu.Queue` to consume from (if consuming).
.. attribute:: queue_opts
diff --git a/docs/reference/kombu.transport.pika.rst b/docs/reference/kombu.transport.pika.rst
deleted file mode 100644
index 783af0d6..00000000
--- a/docs/reference/kombu.transport.pika.rst
+++ /dev/null
@@ -1,46 +0,0 @@
-=======================================
- kombu.transport.pika
-=======================================
-
-.. currentmodule:: kombu.transport.pika
-
-.. automodule:: kombu.transport.pika
-
- .. contents::
- :local:
-
- Transports
- ----------
-
- .. autoclass:: AsyncoreTransport
- :members:
- :undoc-members:
-
- .. autoclass:: SyncTransport
- :members:
- :undoc-members:
-
- Connections
- -----------
-
- .. autoclass:: AsyncoreConnection
- :members:
- :undoc-members:
-
- .. autoclass:: BlockingConnection
- :members:
- :undoc-members:
-
- Channel
- -------
-
- .. autoclass:: Channel
- :members:
- :undoc-members:
-
- Message
- -------
-
- .. autoclass:: Message
- :members:
- :undoc-members:
diff --git a/docs/reference/kombu.transport.pika2.rst b/docs/reference/kombu.transport.pika2.rst
deleted file mode 100644
index b06dd0a7..00000000
--- a/docs/reference/kombu.transport.pika2.rst
+++ /dev/null
@@ -1,34 +0,0 @@
-.. currentmodule:: kombu.transport.pika2
-
-.. automodule:: kombu.transport.pika2
-
- .. contents::
- :local:
-
- Transport
- ---------
-
- .. autoclass:: Transport
- :members:
- :undoc-members:
-
- Connection
- ----------
-
- .. autoclass:: Connection
- :members:
- :undoc-members:
-
- Channel
- -------
-
- .. autoclass:: Channel
- :members:
- :undoc-members:
-
- Message
- -------
-
- .. autoclass:: Message
- :members:
- :undoc-members:
diff --git a/docs/reference/kombu.transport.zmq.rst b/docs/reference/kombu.transport.zmq.rst
index 0735840b..08d0ea5e 100644
--- a/docs/reference/kombu.transport.zmq.rst
+++ b/docs/reference/kombu.transport.zmq.rst
@@ -1,3 +1,7 @@
+=====================
+ kombu.transport.zmq
+=====================
+
.. currentmodule:: kombu.transport.zmq
.. automodule:: kombu.transport.zmq
diff --git a/docs/reference/kombu.transport.zookeeper.rst b/docs/reference/kombu.transport.zookeeper.rst
index 976ad6af..af900a35 100644
--- a/docs/reference/kombu.transport.zookeeper.rst
+++ b/docs/reference/kombu.transport.zookeeper.rst
@@ -1,3 +1,7 @@
+===========================
+ kombu.transport.zookeeper
+===========================
+
.. currentmodule:: kombu.transport.zookeeper
.. automodule:: kombu.transport.zookeeper
diff --git a/docs/reference/kombu.utils.eventio.rst b/docs/reference/kombu.utils.eventio.rst
new file mode 100644
index 00000000..16c40f33
--- /dev/null
+++ b/docs/reference/kombu.utils.eventio.rst
@@ -0,0 +1,11 @@
+==========================================================
+ Evented I/O - kombu.utils.eventio
+==========================================================
+
+.. contents::
+ :local:
+.. currentmodule:: kombu.utils.eventio
+
+.. automodule:: kombu.utils.eventio
+ :members:
+ :undoc-members:
diff --git a/docs/userguide/connections.rst b/docs/userguide/connections.rst
index d7275ea4..aa88add8 100644
--- a/docs/userguide/connections.rst
+++ b/docs/userguide/connections.rst
@@ -116,10 +116,7 @@ keyword arguments, these are:
:ssl: Use SSL to connect to the server. Default is ``False``.
Only supported by the amqp transport.
:insist: Insist on connecting to a server.
- In a configuration with multiple load-sharing servers, the insist
- option tells the server that the client is insisting on a connection
- to the specified server. Default is ``False``.
- Only supported by the amqp and pika transports, and not by AMQP 0-9-1.
+ *No longer supported, relic from AMQP 0.8*
:connect_timeout: Timeout in seconds for connecting to the
server. May not be supported by the specified transport.
:transport_options: A dict of additional connection arguments to
@@ -142,8 +139,6 @@ Transport Comparison
+---------------+----------+------------+------------+---------------+
| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ |
+---------------+----------+------------+------------+---------------+
-| *pika* | Native | Yes | Yes | Yes |
-+---------------+----------+------------+------------+---------------+
| *couchdb* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No |
diff --git a/docs/userguide/consumers.rst b/docs/userguide/consumers.rst
index 74ab79cf..5e9d3d61 100644
--- a/docs/userguide/consumers.rst
+++ b/docs/userguide/consumers.rst
@@ -1,4 +1,3 @@
-.. currentmodule:: kombu.messaging
.. _guide-consumers:
===========
@@ -62,7 +61,7 @@ and with multiple channels again:
.. code-block:: python
- from kombu.messaging import Consumer
+ from kombu import Consumer
from kombu.mixins import ConsumerMixin
class C(ConsumerMixin):
@@ -88,8 +87,6 @@ and with multiple channels again:
Reference
=========
-.. module:: kombu.messaging
-
-.. autoclass:: Consumer
+.. autoclass:: kombu.Consumer
:noindex:
:members:
diff --git a/docs/userguide/producers.rst b/docs/userguide/producers.rst
index c6e80625..454b4ca3 100644
--- a/docs/userguide/producers.rst
+++ b/docs/userguide/producers.rst
@@ -1,4 +1,3 @@
-.. currentmodule:: kombu.messaging
.. _guide-producers:
===========
@@ -20,8 +19,6 @@ See :ref:`guide-serialization`.
Reference
=========
-.. module:: kombu.messaging
-
-.. autoclass:: Producer
+.. autoclass:: kombu.Producer
:noindex:
:members:
diff --git a/docs/userguide/serialization.rst b/docs/userguide/serialization.rst
index b3648d14..3f236861 100644
--- a/docs/userguide/serialization.rst
+++ b/docs/userguide/serialization.rst
@@ -138,24 +138,24 @@ supported by Kombu.
.. admonition:: Buffer Objects
-The decoder function of custom serializer must support both strings
-and Python's old-style buffer objects.
+ The decoder function of custom serializer must support both strings
+ and Python's old-style buffer objects.
-Python pickle and json modules usually don't do this via its ``loads``
-function, but you can easily add support by making a wrapper around the
-``load`` function that takes file objects instead of strings.
+ Python pickle and json modules usually don't do this via its ``loads``
+ function, but you can easily add support by making a wrapper around the
+ ``load`` function that takes file objects instead of strings.
-Here's an example wrapping :func:`pickle.loads` in such a way:
+ Here's an example wrapping :func:`pickle.loads` in such a way:
-.. code-block:: python
+ .. code-block:: python
- import pickle
- from kombu.serialization import BytesIO, register
+ import pickle
+ from kombu.serialization import BytesIO, register
- def loads(s):
- return pickle.load(BytesIO(s))
+ def loads(s):
+ return pickle.load(BytesIO(s))
- register('my_pickle', loads, pickle.dumps,
- content_type='application/x-pickle2',
- content_encoding='binary')
+ register('my_pickle', loads, pickle.dumps,
+ content_type='application/x-pickle2',
+ content_encoding='binary')
diff --git a/docs/userguide/simple.rst b/docs/userguide/simple.rst
index 9239f118..69caaf2d 100644
--- a/docs/userguide/simple.rst
+++ b/docs/userguide/simple.rst
@@ -16,7 +16,7 @@ messaging needs.
Instead of defining exchanges and queues, the simple classes only requires
two arguments, a connection channel and a name. The name is used as the
queue, exchange and routing key. If the need arises, you can specify
-a :class:`~kombu.entity.Queue` as the name argument instead.
+a :class:`~kombu.Queue` as the name argument instead.
In addition, the :class:`~kombu.Connection` comes with
shortcuts to create simple queues using the current connection::
diff --git a/examples/simple_receive.py b/examples/simple_receive.py
index e15427e3..906c2742 100644
--- a/examples/simple_receive.py
+++ b/examples/simple_receive.py
@@ -1,6 +1,7 @@
"""
Example receiving a message using the SimpleQueue interface.
"""
+
from kombu import Connection
#: Create connection
diff --git a/examples/simple_task_queue/worker.py b/examples/simple_task_queue/worker.py
index 807542e9..d9fb691c 100644
--- a/examples/simple_task_queue/worker.py
+++ b/examples/simple_task_queue/worker.py
@@ -23,7 +23,7 @@ class Worker(ConsumerMixin):
logger.info('Got task: %s', reprcall(fun.__name__, args, kwargs))
try:
fun(*args, **kwdict(kwargs))
- except Exception as exc:
+ except Exception, exc:
logger.error('task raised exception: %r', exc)
message.ack()
diff --git a/extra/release/bump_version.py b/extra/release/bump_version.py
index d7b9c6fd..abda578e 100755
--- a/extra/release/bump_version.py
+++ b/extra/release/bump_version.py
@@ -1,5 +1,4 @@
#!/usr/bin/env python
-
from __future__ import absolute_import
import errno
@@ -22,7 +21,7 @@ def cmd(*args):
def no_enoent():
try:
yield
- except OSError as exc:
+ except OSError, exc:
if exc.errno != errno.ENOENT:
raise
diff --git a/extra/release/doc4allmods b/extra/release/doc4allmods
index 4651dcd4..f95fee2f 100755
--- a/extra/release/doc4allmods
+++ b/extra/release/doc4allmods
@@ -2,7 +2,8 @@
PACKAGE="$1"
SKIP_PACKAGES="$PACKAGE tests management urls"
-SKIP_FILES="kombu.utils.eventio.rst
+SKIP_FILES="kombu.entity.rst
+ kombu.messaging.rst
kombu.transport.django.migrations.rst
kombu.transport.django.migrations.0001_initial.rst
kombu.transport.django.management.rst
diff --git a/extra/release/flakeplus.py b/extra/release/flakeplus.py
new file mode 100755
index 00000000..6fe1f1fc
--- /dev/null
+++ b/extra/release/flakeplus.py
@@ -0,0 +1,126 @@
+#!/usr/bin/env python
+from __future__ import absolute_import
+from __future__ import with_statement
+
+import os
+import re
+import sys
+
+from collections import defaultdict
+from unipath import Path
+
+RE_COMMENT = r'^\s*\#'
+RE_NOQA = r'.+?\#\s+noqa+'
+RE_MULTILINE_COMMENT_O = r'^\s*(?:\'\'\'|""").+?(?:\'\'\'|""")'
+RE_MULTILINE_COMMENT_S = r'^\s*(?:\'\'\'|""")'
+RE_MULTILINE_COMMENT_E = r'(?:^|.+?)(?:\'\'\'|""")'
+RE_WITH = r'(?:^|\s+)with\s+'
+RE_WITH_IMPORT = r'''from\s+ __future__\s+ import\s+ with_statement'''
+RE_PRINT = r'''(?:^|\s+)print\((?:"|')(?:\W+?)?[A-Z0-9:]{2,}'''
+RE_ABS_IMPORT = r'''from\s+ __future__\s+ import\s+ absolute_import'''
+
+acc = defaultdict(lambda: {"abs": False, "print": False})
+
+
+def compile(regex):
+ return re.compile(regex, re.VERBOSE)
+
+
+class FlakePP(object):
+ re_comment = compile(RE_COMMENT)
+ re_ml_comment_o = compile(RE_MULTILINE_COMMENT_O)
+ re_ml_comment_s = compile(RE_MULTILINE_COMMENT_S)
+ re_ml_comment_e = compile(RE_MULTILINE_COMMENT_E)
+ re_abs_import = compile(RE_ABS_IMPORT)
+ re_print = compile(RE_PRINT)
+ re_with_import = compile(RE_WITH_IMPORT)
+ re_with = compile(RE_WITH)
+ re_noqa = compile(RE_NOQA)
+ map = {"abs": True, "print": False,
+ "with": False, "with-used": False}
+
+ def __init__(self, verbose=False):
+ self.verbose = verbose
+ self.steps = (("abs", self.re_abs_import),
+ ("with", self.re_with_import),
+ ("with-used", self.re_with),
+ ("print", self.re_print))
+
+ def analyze_fh(self, fh):
+ steps = self.steps
+ filename = fh.name
+ acc = dict(self.map)
+ index = 0
+ errors = [0]
+
+ def error(fmt, **kwargs):
+ errors[0] += 1
+ self.announce(fmt, **dict(kwargs, filename=filename))
+
+ for index, line in enumerate(self.strip_comments(fh)):
+ for key, pattern in steps:
+ if pattern.match(line):
+ acc[key] = True
+ if index:
+ if not acc["abs"]:
+ error("%(filename)s: missing abs import")
+ if acc["with-used"] and not acc["with"]:
+ error("%(filename)s: missing with import")
+ if acc["print"]:
+ error("%(filename)s: left over print statement")
+
+ return filename, errors[0], acc
+
+ def analyze_file(self, filename):
+ with open(filename) as fh:
+ return self.analyze_fh(fh)
+
+ def analyze_tree(self, dir):
+ for dirpath, _, filenames in os.walk(dir):
+ for path in (Path(dirpath, f) for f in filenames):
+ if path.endswith(".py"):
+ yield self.analyze_file(path)
+
+ def analyze(self, *paths):
+ for path in map(Path, paths):
+ if path.isdir():
+ for res in self.analyze_tree(path):
+ yield res
+ else:
+ yield self.analyze_file(path)
+
+ def strip_comments(self, fh):
+ re_comment = self.re_comment
+ re_ml_comment_o = self.re_ml_comment_o
+ re_ml_comment_s = self.re_ml_comment_s
+ re_ml_comment_e = self.re_ml_comment_e
+ re_noqa = self.re_noqa
+ in_ml = False
+
+ for line in fh.readlines():
+ if in_ml:
+ if re_ml_comment_e.match(line):
+ in_ml = False
+ else:
+ if re_noqa.match(line) or re_ml_comment_o.match(line):
+ pass
+ elif re_ml_comment_s.match(line):
+ in_ml = True
+ elif re_comment.match(line):
+ pass
+ else:
+ yield line
+
+ def announce(self, fmt, **kwargs):
+ sys.stderr.write((fmt + "\n") % kwargs)
+
+
+def main(argv=sys.argv, exitcode=0):
+ for _, errors, _ in FlakePP(verbose=True).analyze(*argv[1:]):
+ if errors:
+ exitcode = 1
+ return exitcode
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/funtests/setup.py b/funtests/setup.py
index b024edff..fdb7a98c 100644
--- a/funtests/setup.py
+++ b/funtests/setup.py
@@ -53,7 +53,6 @@ setup(
"pymongo",
"couchdb",
"kazoo",
- "pika",
"beanstalkc",
"kombu-sqlalchemy",
"django",
diff --git a/funtests/tests/test_pika.py b/funtests/tests/test_pika.py
deleted file mode 100644
index 4dd357b6..00000000
--- a/funtests/tests/test_pika.py
+++ /dev/null
@@ -1,38 +0,0 @@
-from funtests import transport
-from nose import SkipTest
-
-from kombu.exceptions import VersionMismatch
-
-
-class test_pika_blocking(transport.TransportCase):
- transport = "syncpika"
- prefix = "syncpika"
-
- def before_connect(self):
- try:
- from kombu.transport import pika
- except VersionMismatch:
- raise SkipTest("Pika version mismatch")
-
- def test_produce__consume_large_messages(self, *args, **kwargs):
- raise SkipTest("test currently fails for sync pika")
-
- def test_cyclic_reference_channel(self, *args, **kwargs):
- raise SkipTest("known memory leak")
-
-
-class test_pika_async(transport.TransportCase):
- transport = "pika"
- prefix = "pika"
-
- def before_connect(self):
- try:
- from kombu.transport import pika
- except VersionMismatch:
- raise SkipTest("Pika version mismatch")
-
- def test_produce__consume_large_messages(self, *args, **kwargs):
- raise SkipTest("test currently fails for async pika")
-
- def test_cyclic_reference_channel(self, *args, **kwargs):
- raise SkipTest("known memory leak")
diff --git a/funtests/transport.py b/funtests/transport.py
index 4df10738..5924a72a 100644
--- a/funtests/transport.py
+++ b/funtests/transport.py
@@ -79,7 +79,7 @@ class TransportCase(unittest.TestCase):
if self.transport:
try:
self.before_connect()
- except SkipTest as exc:
+ except SkipTest, exc:
self.skip_test_reason = str(exc)
else:
self.do_connect()
diff --git a/kombu/__init__.py b/kombu/__init__.py
index ed9ca1e8..0a482802 100644
--- a/kombu/__init__.py
+++ b/kombu/__init__.py
@@ -1,7 +1,7 @@
"""Messaging Framework for Python"""
from __future__ import absolute_import
-VERSION = (2, 5, 0, 'rc1')
+VERSION = (2, 5, 0)
__version__ = '.'.join(map(str, VERSION[0:3])) + ''.join(VERSION[3:])
__author__ = 'Ask Solem'
__contact__ = 'ask@celeryproject.org'
@@ -23,7 +23,7 @@ if sys.version_info < (2, 5): # pragma: no cover
STATICA_HACK = True
globals()['kcah_acitats'[::-1].upper()] = False
-if STATICA_HACK:
+if STATICA_HACK: # pragma: no cover
# This is never executed, but tricks static analyzers (PyDev, PyCharm,
# pylint, etc.) into knowing the types of these symbols, and what
# they contain.
@@ -74,7 +74,7 @@ class module(ModuleType):
# 2.5 does not define __package__
try:
package = __package__
-except NameError:
+except NameError: # pragma: no cover
package = 'kombu'
# keep a reference to this module so that it's not garbage collected
@@ -94,7 +94,7 @@ new_module.__dict__.update({
'__package__': package,
'VERSION': VERSION})
-if os.environ.get('KOMBU_LOG_DEBUG'):
+if os.environ.get('KOMBU_LOG_DEBUG'): # pragma: no cover
os.environ.update(KOMBU_LOG_CHANNEL='1', KOMBU_LOG_CONNECTION='1')
from .utils import debug
debug.setup_logging()
diff --git a/kombu/abstract.py b/kombu/abstract.py
index f0968c1a..382505dd 100644
--- a/kombu/abstract.py
+++ b/kombu/abstract.py
@@ -4,9 +4,6 @@ kombu.compression
Object utilities.
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
@@ -14,7 +11,6 @@ from copy import copy
from .connection import maybe_channel
from .exceptions import NotBoundError
-from .five import items
from .utils import ChannelPromise
__all__ = ['Object', 'MaybeChannelBound']
@@ -41,11 +37,6 @@ class Object(object):
except AttributeError:
setattr(self, name, None)
- def setdefault(self, **defaults):
- for key, value in items(defaults):
- if getattr(self, key) is None:
- setattr(self, key, value)
-
def as_dict(self, recurse=False):
def f(obj):
if recurse and isinstance(obj, Object):
@@ -101,7 +92,8 @@ class MaybeChannelBound(Object):
def __repr__(self, item=''):
item = item or type(self).__name__
if self.is_bound:
- return '<bound {0} of {1}>'.format(item, self.channel)
+ return '<{0} bound to chan:{1}>'.format(
+ item or type(self).__name__, self.channel.channel_id)
return '<unbound {0}>'.format(item)
@property
diff --git a/kombu/clocks.py b/kombu/clocks.py
index 1afff225..8d85bf21 100644
--- a/kombu/clocks.py
+++ b/kombu/clocks.py
@@ -4,9 +4,6 @@ kombu.clocks
Logical Clocks and Synchronization.
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
@@ -71,9 +68,6 @@ class LamportClock(object):
self.value += 1
return self.value
- def sort(self, d):
- return d[sorted(d)[0]]
-
def sort_heap(self, h):
"""List of tuples containing at least two elements, representing
an event, where the first element is the event's scalar clock value,
@@ -89,13 +83,13 @@ class LamportClock(object):
"""
if h[0][0] == h[1][0]:
same = []
- for i, PN in zip(h, islice(h, 1, None)):
+ for PN in zip(h, islice(h, 1, None)):
if PN[0][0] != PN[1][0]:
break # Prev and Next's clocks differ
same.append(PN[0])
# return first item sorted by process id
return sorted(same, key=lambda event: event[1])[0]
- # all clock values unique, return first item
+ # clock values unique, return first item
return h[0]
def __str__(self):
diff --git a/kombu/common.py b/kombu/common.py
index 927f4829..c6f9a6e0 100644
--- a/kombu/common.py
+++ b/kombu/common.py
@@ -4,9 +4,6 @@ kombu.common
Common Utilities.
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
@@ -67,7 +64,7 @@ class Broadcast(Queue):
:keyword queue: By default a unique id is used for the queue
name for every consumer. You can specify a custom queue
name here.
- :keyword \*\*kwargs: See :class:`~kombu.entity.Queue` for a list
+ :keyword \*\*kwargs: See :class:`~kombu.Queue` for a list
of additional keyword arguments supported.
"""
@@ -169,9 +166,9 @@ def eventloop(conn, limit=None, timeout=None, ignore_timeouts=False):
try:
yield conn.drain_events(timeout=timeout)
except socket.timeout:
- if timeout and not ignore_timeouts:
+ if timeout and not ignore_timeouts: # pragma: no cover
raise
- except socket.error:
+ except socket.error: # pragma: no cover
pass
diff --git a/kombu/compat.py b/kombu/compat.py
index bdd876b9..7a588880 100644
--- a/kombu/compat.py
+++ b/kombu/compat.py
@@ -6,9 +6,6 @@ Carrot compatible interface for :class:`Publisher` and :class:`Producer`.
See http://packages.python.org/pypi/carrot for documentation.
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
diff --git a/kombu/compression.py b/kombu/compression.py
index c76adfc8..34daa7a2 100644
--- a/kombu/compression.py
+++ b/kombu/compression.py
@@ -4,9 +4,6 @@ kombu.compression
Compression utilities.
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
diff --git a/kombu/connection.py b/kombu/connection.py
index e23be740..c4c11ac2 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -4,9 +4,6 @@ kombu.connection
Broker connection and pools.
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
@@ -146,9 +143,9 @@ class Connection(object):
failover_strategy = 'round-robin'
#: Heartbeat value, currently only supported by the py-amqp transport.
- hertbeat = None
+ heartbeat = None
- userid = password = ssl = login_method = None
+ hostname = userid = password = ssl = login_method = None
def __init__(self, hostname='localhost', userid=None,
password=None, virtual_host=None, port=None, insist=False,
@@ -293,7 +290,7 @@ class Connection(object):
except socket.timeout:
self.more_to_read = False
return False
- except socket.error as exc:
+ except socket.error, exc:
if exc.errno in (errno.EAGAIN, errno.EINTR):
self.more_to_read = False
return False
@@ -383,8 +380,8 @@ class Connection(object):
self.maybe_close_channel(self._default_channel)
self._default_channel = None
- def _default_ensure_callback(exc, interval):
- logger.error("Ensure: Couldn't send message: %r. Retry in %ss",
+ def _default_ensure_callback(self, exc, interval):
+ logger.error("Ensure: Operation error: %r. Retry in %ss",
exc, interval, exc_info=True)
def ensure(self, obj, fun, errback=None, max_retries=None,
@@ -427,7 +424,7 @@ class Connection(object):
for retries in count(0): # for infinity
try:
return fun(*args, **kwargs)
- except self.recoverable_connection_errors as exc:
+ except self.recoverable_connection_errors, exc:
if got_connection:
raise
if max_retries is not None and retries > max_retries:
@@ -450,7 +447,7 @@ class Connection(object):
if on_revive:
on_revive(new_channel)
got_connection += 1
- except self.recoverable_channel_errors as exc:
+ except self.recoverable_channel_errors, exc:
if max_retries is not None and retries > max_retries:
raise
self._debug('ensure channel error: %r', exc, exc_info=1)
@@ -651,14 +648,14 @@ class Connection(object):
created using that name as the name of the queue and exchange,
also it will be used as the default routing key.
- :param name: Name of the queue/or a :class:`~kombu.entity.Queue`.
+ :param name: Name of the queue/or a :class:`~kombu.Queue`.
:keyword no_ack: Disable acknowledgements. Default is false.
:keyword queue_opts: Additional keyword arguments passed to the
constructor of the automatically created
- :class:`~kombu.entity.Queue`.
+ :class:`~kombu.Queue`.
:keyword exchange_opts: Additional keyword arguments passed to the
constructor of the automatically created
- :class:`~kombu.entity.Exchange`.
+ :class:`~kombu.Exchange`.
:keyword channel: Channel to use. If not specified a new channel
from the current connection will be used. Remember to call
:meth:`~kombu.simple.SimpleQueue.close` when done with the
@@ -856,7 +853,11 @@ class Resource(object):
except Empty:
self._add_when_empty()
else:
- R = self.prepare(R)
+ try:
+ R = self.prepare(R)
+ except BaseException:
+ self.release(R)
+ raise
self._dirty.add(R)
break
else:
@@ -932,7 +933,7 @@ class Resource(object):
except AttributeError:
pass # Issue #78
finally:
- if mutex:
+ if mutex: # pragma: no cover
mutex.release()
if os.environ.get('KOMBU_DEBUG_POOL'): # pragma: no cover
@@ -974,7 +975,10 @@ class ConnectionPool(Resource):
return self.connection.clone()
def release_resource(self, resource):
- resource._debug('released')
+ try:
+ resource._debug('released')
+ except AttributeError:
+ pass
def close_resource(self, resource):
resource._close()
diff --git a/kombu/entity.py b/kombu/entity.py
index e7ec141c..84a7a445 100644
--- a/kombu/entity.py
+++ b/kombu/entity.py
@@ -4,9 +4,6 @@ kombu.entity
Exchange and Queue declarations.
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
@@ -266,8 +263,10 @@ class Exchange(MaybeChannelBound):
return False
def __repr__(self):
- return super(Exchange, self).__repr__('Exchange %s(%s)' % (self.name,
- self.type))
+ return super(Exchange, self).__repr__(str(self))
+
+ def __str__(self):
+ return 'Exchange %s(%s)' % (self.name, self.type)
@property
def can_cache_declaration(self):
@@ -289,12 +288,13 @@ class binding(object):
self.exchange = exchange
self.routing_key = routing_key
self.arguments = arguments
- self.unbind_arguments = None
+ self.unbind_arguments = unbind_arguments
def declare(self, channel, nowait=False):
"""Declare destination exchange."""
if self.exchange and self.exchange.name:
- self.exchange(channel).declare(nowait=nowait)
+ ex = self.exchange(channel)
+ ex.declare(nowait=nowait)
def bind(self, entity, nowait=False):
"""Bind entity to this binding."""
@@ -305,11 +305,14 @@ class binding(object):
def unbind(self, entity, nowait=False):
"""Unbind entity from this binding."""
- entity.unbind_from(exchange=self.exchange,
+ entity.unbind_from(self.exchange,
routing_key=self.routing_key,
arguments=self.unbind_arguments,
nowait=nowait)
+ def __repr__(self):
+ return '<binding: %r -> %r>' % (self.exchange.name, self.routing_key)
+
class Queue(MaybeChannelBound):
"""A Queue declaration.
@@ -423,19 +426,22 @@ class Queue(MaybeChannelBound):
('exclusive', bool),
('auto_delete', bool),
('no_ack', None),
- ('alias', None))
+ ('alias', None),
+ ('bindings', None))
def __init__(self, name='', exchange=None, routing_key='', channel=None,
- **kwargs):
+ bindings=None, **kwargs):
super(Queue, self).__init__(**kwargs)
self.name = name or self.name
self.exchange = exchange or self.exchange
self.routing_key = routing_key or self.routing_key
- self.bindings = set()
+ self.bindings = set(bindings or [])
# allows Queue('name', [binding(...), binding(...), ...])
if isinstance(exchange, (list, tuple, set)):
- self.bindings, self.exchange = set(exchange), None
+ self.bindings |= set(exchange)
+ if self.bindings:
+ self.exchange = None
# exclusive implies auto-delete.
if self.exclusive:
@@ -457,9 +463,7 @@ class Queue(MaybeChannelBound):
self.exchange.declare(nowait)
self.queue_declare(nowait, passive=False)
- if self.name:
- # name should be set by queue_declare in the case
- # of anonymous queues.
+ if self.exchange is not None:
self.queue_bind(nowait)
# - declare extra/multi-bindings.
diff --git a/kombu/exceptions.py b/kombu/exceptions.py
index dadc740a..2bf57f37 100644
--- a/kombu/exceptions.py
+++ b/kombu/exceptions.py
@@ -4,9 +4,6 @@ kombu.exceptions
Exceptions.
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
diff --git a/kombu/messaging.py b/kombu/messaging.py
index 7b16628a..e2ee2d6e 100644
--- a/kombu/messaging.py
+++ b/kombu/messaging.py
@@ -4,9 +4,6 @@ kombu.messaging
Sending and receiving messages.
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
@@ -202,10 +199,12 @@ class Producer(object):
def revive(self, channel):
"""Revive the producer after connection loss."""
if is_connection(channel):
- promise = ChannelPromise(lambda: channel.default_channel)
- self.__connection__ = channel
- self._channel = promise
- self.exchange = self.exchange(promise)
+ connection = channel
+ self.__connection__ = connection
+ channel = ChannelPromise(lambda: connection.default_channel)
+ if isinstance(channel, ChannelPromise):
+ self._channel = channel
+ self.exchange = self.exchange(channel)
else:
# Channel already concrete
self._channel = channel
@@ -277,7 +276,7 @@ class Consumer(object):
#: The connection/channel to use for this consumer.
channel = None
- #: A single :class:`~kombu.entity.Queue`, or a list of queues to
+ #: A single :class:`~kombu.Queue`, or a list of queues to
#: consume from.
queues = None
@@ -536,7 +535,7 @@ class Consumer(object):
if m2p:
message = m2p(message)
decoded = None if on_m else message.decode()
- except Exception as exc:
+ except Exception, exc:
if not self.on_decode_error:
raise
self.on_decode_error(message, exc)
diff --git a/kombu/mixins.py b/kombu/mixins.py
index 818b5563..5b0dcbf4 100644
--- a/kombu/mixins.py
+++ b/kombu/mixins.py
@@ -4,9 +4,6 @@ kombu.mixins
Useful mixin classes.
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
diff --git a/kombu/pidbox.py b/kombu/pidbox.py
index e3998fd4..3723a4a1 100644
--- a/kombu/pidbox.py
+++ b/kombu/pidbox.py
@@ -4,9 +4,6 @@ kombu.pidbox
Generic process mailbox.
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
@@ -18,11 +15,10 @@ from itertools import count
from threading import local
from time import time
+from . import Exchange, Queue, Consumer, Producer
from .clocks import LamportClock
from .common import maybe_declare, oid_from
-from .entity import Exchange, Queue
from .five import range
-from .messaging import Consumer, Producer
from .utils import cached_property, kwdict, uuid
REPLY_QUEUE_EXPIRES = 10
@@ -83,7 +79,7 @@ class Node(object):
reply = handle(method, kwdict(arguments))
except SystemExit:
raise
- except Exception as exc:
+ except Exception, exc:
reply = {'error': repr(exc)}
if reply_to:
diff --git a/kombu/pools.py b/kombu/pools.py
index 0cd03c41..0aba7dc1 100644
--- a/kombu/pools.py
+++ b/kombu/pools.py
@@ -4,9 +4,6 @@ kombu.pools
Public resource pools.
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
@@ -41,7 +38,12 @@ class ProducerPool(Resource):
return self.connections.acquire(block=True)
def create_producer(self):
- return self.Producer(self._acquire_connection())
+ conn = self._acquire_connection()
+ try:
+ return self.Producer(conn)
+ except BaseException:
+ conn.release()
+ raise
def new(self):
return lambda: self.create_producer()
@@ -51,11 +53,19 @@ class ProducerPool(Resource):
for _ in range(self.limit):
self._resource.put_nowait(self.new())
+ def close_resource(self, resource):
+ pass
+
def prepare(self, p):
if isinstance(p, Callable):
p = p()
- if not p.channel:
- p.revive(self._acquire_connection())
+ if p._channel is None:
+ conn = self._acquire_connection()
+ try:
+ p.revive(conn)
+ except BaseException:
+ conn.release()
+ raise
return p
def release(self, resource):
diff --git a/kombu/serialization.py b/kombu/serialization.py
index 391c6a22..b1256164 100644
--- a/kombu/serialization.py
+++ b/kombu/serialization.py
@@ -5,9 +5,6 @@ kombu.serialization
Serialization utilities.
-:copyright: (c) 2009 - 2012 by Ask Solem
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
@@ -287,7 +284,7 @@ def register_json():
from anyjson import loads, dumps
def _loads(obj):
- if isinstance(obj, bytes):
+ if isinstance(obj, bytes_t):
obj = obj.decode()
return loads(obj)
diff --git a/kombu/simple.py b/kombu/simple.py
index 89d756a2..5f8a3e90 100644
--- a/kombu/simple.py
+++ b/kombu/simple.py
@@ -4,9 +4,6 @@ kombu.simple
Simple interface.
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
diff --git a/kombu/syn.py b/kombu/syn.py
index 7983f41c..7f6e8099 100644
--- a/kombu/syn.py
+++ b/kombu/syn.py
@@ -2,9 +2,6 @@
kombu.syn
=========
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
diff --git a/kombu/tests/mocks.py b/kombu/tests/mocks.py
index 6e1be5eb..c3f1e882 100644
--- a/kombu/tests/mocks.py
+++ b/kombu/tests/mocks.py
@@ -22,6 +22,7 @@ class Message(base.Message):
class Channel(base.StdChannel):
open = True
throw_decode_error = False
+ _ids = count(1)
def __init__(self, connection):
self.connection = connection
@@ -29,6 +30,7 @@ class Channel(base.StdChannel):
self.deliveries = count(1)
self.to_deliver = []
self.events = {'basic_return': set()}
+ self.channel_id = next(self._ids)
def _called(self, name):
self.called.append(name)
diff --git a/kombu/tests/test_clocks.py b/kombu/tests/test_clocks.py
index 27b68f93..ed8c9fa8 100644
--- a/kombu/tests/test_clocks.py
+++ b/kombu/tests/test_clocks.py
@@ -1,5 +1,7 @@
from __future__ import absolute_import
+from heapq import heappush
+
from kombu.clocks import LamportClock
from .utils import TestCase
@@ -17,6 +19,7 @@ class test_LamportClock(TestCase):
c1.forward()
c2.adjust(c1.value)
self.assertEqual(c2.value, c1.value + 1)
+ self.assertTrue(repr(c1))
c2_val = c2.value
c2.forward()
@@ -26,3 +29,27 @@ class test_LamportClock(TestCase):
c1.adjust(c2.value)
self.assertEqual(c1.value, c2.value + 1)
+
+ def test_sort(self):
+ c = LamportClock()
+ pid1 = 'a.example.com:312'
+ pid2 = 'b.example.com:311'
+
+ events = []
+
+ m1 = (c.forward(), pid1)
+ heappush(events, m1)
+ m2 = (c.forward(), pid2)
+ heappush(events, m2)
+ m3 = (c.forward(), pid1)
+ heappush(events, m3)
+ m4 = (30, pid1)
+ heappush(events, m4)
+ m5 = (30, pid2)
+ heappush(events, m5)
+
+ self.assertEqual(str(c), str(c.value))
+
+ self.assertEqual(c.sort_heap(events), m1)
+ self.assertEqual(c.sort_heap([m4, m5]), m4)
+ self.assertEqual(c.sort_heap([m4, m5, m1]), m4)
diff --git a/kombu/tests/test_common.py b/kombu/tests/test_common.py
index 524f60b9..ffcf9424 100644
--- a/kombu/tests/test_common.py
+++ b/kombu/tests/test_common.py
@@ -5,13 +5,55 @@ import socket
from mock import patch
from kombu import common
-from kombu.common import (Broadcast, maybe_declare,
- send_reply, isend_reply, collect_replies)
+from kombu.common import (
+ Broadcast, maybe_declare,
+ send_reply, isend_reply, collect_replies,
+ declaration_cached, ignore_errors,
+ QoS, PREFETCH_COUNT_MAX,
+ entry_to_queue,
+)
+from kombu.exceptions import StdChannelError
from .utils import TestCase
from .utils import ContextMock, Mock, MockPool
+class test_ignore_errors(TestCase):
+
+ def test_ignored(self):
+ connection = Mock()
+ connection.channel_errors = (KeyError, )
+ connection.connection_errors = (KeyError, )
+
+ with ignore_errors(connection):
+ raise KeyError()
+
+ def raising():
+ raise KeyError()
+
+ ignore_errors(connection, raising)
+
+ connection.channel_errors = connection.connection_errors = \
+ ()
+
+ with self.assertRaises(KeyError):
+ with ignore_errors(connection):
+ raise KeyError()
+
+
+class test_declaration_cached(TestCase):
+
+ def test_when_cached(self):
+ chan = Mock()
+ chan.connection.client.declared_entities = ['foo']
+ self.assertTrue(declaration_cached('foo', chan))
+
+ def test_when_not_cached(self):
+ chan = Mock()
+ chan.connection.client.declared_entities = ['bar']
+ self.assertFalse(declaration_cached('foo', chan))
+
+
class test_Broadcast(TestCase):
def test_arguments(self):
@@ -46,6 +88,10 @@ class test_maybe_declare(TestCase):
maybe_declare(entity, channel)
self.assertEqual(entity.declare.call_count, 1)
+ entity.channel.connection = None
+ with self.assertRaises(StdChannelError):
+ maybe_declare(entity)
+
def test_binds_entities(self):
channel = Mock()
channel.connection.client.declared_entities = set()
@@ -298,3 +344,125 @@ class test_itermessages(TestCase):
with self.assertRaises(StopIteration):
next(it)
+
+
+class test_entry_to_queue(TestCase):
+
+ def test_calls_Queue_from_dict(self):
+ with patch('kombu.common.Queue') as Queue:
+ entry_to_queue('name', exchange='bar')
+ Queue.from_dict.assert_called_with('name', exchange='bar')
+
+
+class test_QoS(TestCase):
+
+ class _QoS(QoS):
+ def __init__(self, value):
+ self.value = value
+ QoS.__init__(self, None, value)
+
+ def set(self, value):
+ return value
+
+ def test_qos_exceeds_16bit(self):
+ with patch('kombu.common.logger') as logger:
+ callback = Mock()
+ qos = QoS(callback, 10)
+ qos.prev = 100
+ qos.set(2 ** 32)
+ self.assertTrue(logger.warn.called)
+ callback.assert_called_with(prefetch_count=0)
+
+ def test_qos_increment_decrement(self):
+ qos = self._QoS(10)
+ self.assertEqual(qos.increment_eventually(), 11)
+ self.assertEqual(qos.increment_eventually(3), 14)
+ self.assertEqual(qos.increment_eventually(-30), 14)
+ self.assertEqual(qos.decrement_eventually(7), 7)
+ self.assertEqual(qos.decrement_eventually(), 6)
+
+ def test_qos_disabled_increment_decrement(self):
+ qos = self._QoS(0)
+ self.assertEqual(qos.increment_eventually(), 0)
+ self.assertEqual(qos.increment_eventually(3), 0)
+ self.assertEqual(qos.increment_eventually(-30), 0)
+ self.assertEqual(qos.decrement_eventually(7), 0)
+ self.assertEqual(qos.decrement_eventually(), 0)
+ self.assertEqual(qos.decrement_eventually(10), 0)
+
+ def test_qos_thread_safe(self):
+ qos = self._QoS(10)
+
+ def add():
+ for i in range(1000):
+ qos.increment_eventually()
+
+ def sub():
+ for i in range(1000):
+ qos.decrement_eventually()
+
+ def threaded(funs):
+ from threading import Thread
+ threads = [Thread(target=fun) for fun in funs]
+ for thread in threads:
+ thread.start()
+ for thread in threads:
+ thread.join()
+
+ threaded([add, add])
+ self.assertEqual(qos.value, 2010)
+
+ qos.value = 1000
+ threaded([add, sub]) # n = 2
+ self.assertEqual(qos.value, 1000)
+
+ def test_exceeds_short(self):
+ qos = QoS(Mock(), PREFETCH_COUNT_MAX - 1)
+ qos.update()
+ self.assertEqual(qos.value, PREFETCH_COUNT_MAX - 1)
+ qos.increment_eventually()
+ self.assertEqual(qos.value, PREFETCH_COUNT_MAX)
+ qos.increment_eventually()
+ self.assertEqual(qos.value, PREFETCH_COUNT_MAX + 1)
+ qos.decrement_eventually()
+ self.assertEqual(qos.value, PREFETCH_COUNT_MAX)
+ qos.decrement_eventually()
+ self.assertEqual(qos.value, PREFETCH_COUNT_MAX - 1)
+
+ def test_consumer_increment_decrement(self):
+ mconsumer = Mock()
+ qos = QoS(mconsumer.qos, 10)
+ qos.update()
+ self.assertEqual(qos.value, 10)
+ mconsumer.qos.assert_called_with(prefetch_count=10)
+ qos.decrement_eventually()
+ qos.update()
+ self.assertEqual(qos.value, 9)
+ mconsumer.qos.assert_called_with(prefetch_count=9)
+ qos.decrement_eventually()
+ self.assertEqual(qos.value, 8)
+ mconsumer.qos.assert_called_with(prefetch_count=9)
+ self.assertIn({'prefetch_count': 9}, mconsumer.qos.call_args)
+
+ # Does not decrement 0 value
+ qos.value = 0
+ qos.decrement_eventually()
+ self.assertEqual(qos.value, 0)
+ qos.increment_eventually()
+ self.assertEqual(qos.value, 0)
+
+ def test_consumer_decrement_eventually(self):
+ mconsumer = Mock()
+ qos = QoS(mconsumer.qos, 10)
+ qos.decrement_eventually()
+ self.assertEqual(qos.value, 9)
+ qos.value = 0
+ qos.decrement_eventually()
+ self.assertEqual(qos.value, 0)
+
+ def test_set(self):
+ mconsumer = Mock()
+ qos = QoS(mconsumer.qos, 10)
+ qos.set(12)
+ self.assertEqual(qos.prev, 12)
+ qos.set(qos.prev)
diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py
index 9bdf6343..4af9eb37 100644
--- a/kombu/tests/test_connection.py
+++ b/kombu/tests/test_connection.py
@@ -1,7 +1,11 @@
from __future__ import absolute_import
+import errno
import pickle
+import socket
+from copy import copy
+from mock import patch
from nose import SkipTest
from kombu import Connection, Consumer, Producer, parse_url
@@ -10,6 +14,7 @@ from kombu.five import items
from .mocks import Transport
from .utils import TestCase
+
from .utils import Mock, skip_if_not_module
@@ -154,6 +159,157 @@ class test_Connection(TestCase):
self.assertFalse(_connection.connected)
self.assertIsInstance(conn.transport, Transport)
+ def test_multiple_urls(self):
+ conn1 = Connection('amqp://foo;amqp://bar')
+ self.assertEqual(conn1.hostname, 'foo')
+ self.assertListEqual(conn1.alt, ['amqp://foo', 'amqp://bar'])
+
+ conn2 = Connection(['amqp://foo', 'amqp://bar'])
+ self.assertEqual(conn2.hostname, 'foo')
+ self.assertListEqual(conn2.alt, ['amqp://foo', 'amqp://bar'])
+
+ def test_uri_passthrough(self):
+ from kombu import connection as mod
+ prev, mod.URI_PASSTHROUGH = mod.URI_PASSTHROUGH, set(['foo'])
+ try:
+ with patch('kombu.connection.parse_url') as parse_url:
+ c = Connection('foo+mysql://some_host')
+ self.assertEqual(c.transport_cls, 'foo')
+ self.assertFalse(parse_url.called)
+ self.assertEqual(c.hostname, 'mysql://some_host')
+ self.assertTrue(c.as_uri().startswith('foo+'))
+ with patch('kombu.connection.parse_url') as parse_url:
+ c = Connection('mysql://some_host', transport='foo')
+ self.assertEqual(c.transport_cls, 'foo')
+ self.assertFalse(parse_url.called)
+ self.assertEqual(c.hostname, 'mysql://some_host')
+ finally:
+ mod.URI_PASSTHROUGH = prev
+ c = Connection('amqp+sqlite://some_host')
+ self.assertTrue(c.as_uri().startswith('amqp+'))
+
+ def test_default_ensure_callback(self):
+ with patch('kombu.connection.logger') as logger:
+ c = Connection(transport=Mock)
+ c._default_ensure_callback(KeyError(), 3)
+ self.assertTrue(logger.error.called)
+
+ def test_ensure_connection_on_error(self):
+ c = Connection('amqp://A;amqp://B')
+ with patch('kombu.connection.retry_over_time') as rot:
+ c.ensure_connection()
+ self.assertTrue(rot.called)
+
+ args = rot.call_args[0]
+ cb = args[4]
+ intervals = iter([1, 2, 3, 4, 5])
+ self.assertEqual(cb(KeyError(), intervals, 0), 0)
+ self.assertEqual(cb(KeyError(), intervals, 1), 1)
+ self.assertEqual(cb(KeyError(), intervals, 2), 0)
+ self.assertEqual(cb(KeyError(), intervals, 3), 2)
+ self.assertEqual(cb(KeyError(), intervals, 4), 0)
+ self.assertEqual(cb(KeyError(), intervals, 5), 3)
+ self.assertEqual(cb(KeyError(), intervals, 6), 0)
+ self.assertEqual(cb(KeyError(), intervals, 7), 4)
+
+ errback = Mock()
+ c.ensure_connection(errback=errback)
+ args = rot.call_args[0]
+ cb = args[4]
+ self.assertEqual(cb(KeyError(), intervals, 0), 0)
+ self.assertTrue(errback.called)
+
+ def test_drain_nowait(self):
+ c = Connection(transport=Mock)
+ c.drain_events = Mock()
+ c.drain_events.side_effect = socket.timeout()
+
+ c.more_to_read = True
+ self.assertFalse(c.drain_nowait())
+ self.assertFalse(c.more_to_read)
+
+ c.drain_events.side_effect = socket.error()
+ c.drain_events.side_effect.errno = errno.EAGAIN
+ c.more_to_read = True
+ self.assertFalse(c.drain_nowait())
+ self.assertFalse(c.more_to_read)
+
+ c.drain_events.side_effect = socket.error()
+ c.drain_events.side_effect.errno = errno.EPERM
+ with self.assertRaises(socket.error):
+ c.drain_nowait()
+
+ c.more_to_read = False
+ c.drain_events = Mock()
+ self.assertTrue(c.drain_nowait())
+ c.drain_events.assert_called_with(timeout=0)
+ self.assertTrue(c.more_to_read)
+
+ def test_supports_heartbeats(self):
+ c = Connection(transport=Mock)
+ c.transport.supports_heartbeats = False
+ self.assertFalse(c.supports_heartbeats)
+
+ def test_is_evented(self):
+ c = Connection(transport=Mock)
+ c.transport.supports_ev = False
+ self.assertFalse(c.is_evented)
+
+ def test_eventmap(self):
+ c = Connection(transport=Mock)
+ c.transport.eventmap.return_value = {1: 1, 2: 2}
+ self.assertDictEqual(c.eventmap, {1: 1, 2: 2})
+ c.transport.eventmap.assert_called_with(c.connection)
+
+ def test_manager(self):
+ c = Connection(transport=Mock)
+ self.assertIs(c.manager, c.transport.manager)
+
+ def test_copy(self):
+ c = Connection('amqp://example.com')
+ self.assertEqual(copy(c).info(), c.info())
+
+ def test_switch(self):
+ c = Connection('amqp://foo')
+ c._closed = True
+ c.switch('redis://example.com//3')
+ self.assertFalse(c._closed)
+ self.assertEqual(c.hostname, 'example.com')
+ self.assertEqual(c.transport_cls, 'redis')
+ self.assertEqual(c.virtual_host, '/3')
+
+ def test_maybe_switch_next(self):
+ c = Connection('amqp://foo;redis://example.com//3')
+ c.maybe_switch_next()
+ self.assertFalse(c._closed)
+ self.assertEqual(c.hostname, 'example.com')
+ self.assertEqual(c.transport_cls, 'redis')
+ self.assertEqual(c.virtual_host, '/3')
+
+ def test_maybe_switch_next_no_cycle(self):
+ c = Connection('amqp://foo')
+ c.maybe_switch_next()
+ self.assertFalse(c._closed)
+ self.assertEqual(c.hostname, 'foo')
+ self.assertIn(c.transport_cls, ('librabbitmq', 'pyamqp'))
+
+ def test_heartbeat_check(self):
+ c = Connection(transport=Transport)
+ c.transport.heartbeat_check = Mock()
+ c.heartbeat_check(3)
+ c.transport.heartbeat_check.assert_called_with(c.connection, rate=3)
+
+ def test_completes_cycle_no_cycle(self):
+ c = Connection('amqp://')
+ self.assertTrue(c.completes_cycle(0))
+ self.assertTrue(c.completes_cycle(1))
+
+ def test_completes_cycle(self):
+ c = Connection('amqp://a;amqp://b;amqp://c')
+ self.assertFalse(c.completes_cycle(0))
+ self.assertFalse(c.completes_cycle(1))
+ self.assertTrue(c.completes_cycle(2))
+
def test__enter____exit__(self):
conn = self.conn
context = conn.__enter__()
@@ -357,6 +513,18 @@ class ResourceCase(TestCase):
[chan.release() for chan in chans]
self.assertState(P, 10, 0)
+ def test_acquire_prepare_raises(self):
+ if self.abstract:
+ return
+ P = self.create_resource(10, 0)
+
+ self.assertEqual(len(P._resource.queue), 10)
+ P.prepare = Mock()
+ P.prepare.side_effect = IOError()
+ with self.assertRaises(IOError):
+ P.acquire(block=True)
+ self.assertEqual(len(P._resource.queue), 10)
+
def test_acquire_no_limit(self):
if self.abstract:
return
@@ -440,6 +608,12 @@ class test_ConnectionPool(ResourceCase):
self.assertIsNotNone(q[1]._connection)
self.assertIsNone(q[2]()._connection)
+ def test_release_no__debug(self):
+ P = self.create_resource(10, 2)
+ R = Mock()
+ R._debug.side_effect = AttributeError()
+ P.release_resource(R)
+
def test_setup_no_limit(self):
P = self.create_resource(None, None)
self.assertFalse(P._resource.queue)
diff --git a/kombu/tests/test_entities.py b/kombu/tests/test_entities.py
index cdd9c34a..fadaf358 100644
--- a/kombu/tests/test_entities.py
+++ b/kombu/tests/test_entities.py
@@ -1,7 +1,10 @@
from __future__ import absolute_import
-from kombu import Connection
-from kombu.entity import Exchange, Queue
+import pickle
+
+from mock import call
+
+from kombu import Connection, Exchange, Queue, binding
from kombu.exceptions import NotBoundError
from .mocks import Transport
@@ -13,6 +16,48 @@ def get_conn():
return Connection(transport=Transport)
+class test_binding(TestCase):
+
+ def test_constructor(self):
+ x = binding(Exchange('foo'), 'rkey',
+ arguments={'barg': 'bval'},
+ unbind_arguments={'uarg': 'uval'},
+ )
+ self.assertEqual(x.exchange, Exchange('foo'))
+ self.assertEqual(x.routing_key, 'rkey')
+ self.assertDictEqual(x.arguments, {'barg': 'bval'})
+ self.assertDictEqual(x.unbind_arguments, {'uarg': 'uval'})
+
+ def test_declare(self):
+ chan = get_conn().channel()
+ x = binding(Exchange('foo'), 'rkey')
+ x.declare(chan)
+ self.assertIn('exchange_declare', chan)
+
+ def test_declare_no_exchange(self):
+ chan = get_conn().channel()
+ x = binding()
+ x.declare(chan)
+ self.assertNotIn('exchange_declare', chan)
+
+ def test_bind(self):
+ chan = get_conn().channel()
+ x = binding(Exchange('foo'))
+ x.bind(Exchange('bar')(chan))
+ self.assertIn('exchange_bind', chan)
+
+ def test_unbind(self):
+ chan = get_conn().channel()
+ x = binding(Exchange('foo'))
+ x.unbind(Exchange('bar')(chan))
+ self.assertIn('exchange_unbind', chan)
+
+ def test_repr(self):
+ b = binding(Exchange('foo'), 'rkey')
+ self.assertIn('foo', repr(b))
+ self.assertIn('rkey', repr(b))
+
+
class test_Exchange(TestCase):
def test_bound(self):
@@ -24,7 +69,8 @@ class test_Exchange(TestCase):
bound = exchange.bind(chan)
self.assertTrue(bound.is_bound)
self.assertIs(bound.channel, chan)
- self.assertIn('<bound', repr(bound))
+ self.assertIn('bound to chan:%r' % (chan.channel_id, ),
+ repr(bound))
def test_hash(self):
self.assertEqual(hash(Exchange('a')), hash(Exchange('a')))
@@ -34,6 +80,11 @@ class test_Exchange(TestCase):
self.assertTrue(Exchange('a', durable=True).can_cache_declaration)
self.assertFalse(Exchange('a', durable=False).can_cache_declaration)
+ def test_pickle(self):
+ e1 = Exchange('foo', 'direct')
+ e2 = pickle.loads(pickle.dumps(e1))
+ self.assertEqual(e1, e2)
+
def test_eq(self):
e1 = Exchange('foo', 'direct')
e2 = Exchange('foo', 'direct')
@@ -111,6 +162,12 @@ class test_Exchange(TestCase):
foo(chan).bind_to(bar)
self.assertIn('exchange_bind', chan)
+ def test_bind_to_by_name(self):
+ chan = get_conn().channel()
+ foo = Exchange('foo', 'topic')
+ foo(chan).bind_to('bar')
+ self.assertIn('exchange_bind', chan)
+
def test_unbind_from(self):
chan = get_conn().channel()
foo = Exchange('foo', 'topic')
@@ -118,6 +175,12 @@ class test_Exchange(TestCase):
foo(chan).unbind_from(bar)
self.assertIn('exchange_unbind', chan)
+ def test_unbind_from_by_name(self):
+ chan = get_conn().channel()
+ foo = Exchange('foo', 'topic')
+ foo(chan).unbind_from('bar')
+ self.assertIn('exchange_unbind', chan)
+
class test_Queue(TestCase):
@@ -128,6 +191,14 @@ class test_Queue(TestCase):
self.assertEqual(hash(Queue('a')), hash(Queue('a')))
self.assertNotEqual(hash(Queue('a')), hash(Queue('b')))
+ def test_anonymous(self):
+ chan = Mock()
+ x = Queue(bindings=[binding(Exchange('foo'), 'rkey')])
+ chan.queue_declare.return_value = 'generated', 0, 0
+ xx = x(chan)
+ xx.declare()
+ self.assertEqual(xx.name, 'generated')
+
def test_when_bound_but_no_exchange(self):
q = Queue('a')
q.exchange = None
@@ -141,7 +212,37 @@ class test_Queue(TestCase):
q.declare()
q.queue_declare.assert_called_with(False, passive=False)
- q.queue_bind.assert_called_with(False)
+
+ def test_bind_to_when_name(self):
+ chan = Mock()
+ q = Queue('a')
+ q(chan).bind_to('ex')
+ self.assertTrue(chan.queue_bind.called)
+
+ def test_get_when_no_m2p(self):
+ chan = Mock()
+ q = Queue('a')(chan)
+ chan.message_to_python = None
+ self.assertTrue(q.get())
+
+ def test_multiple_bindings(self):
+ chan = Mock()
+ q = Queue('mul', [
+ binding(Exchange('mul1'), 'rkey1'),
+ binding(Exchange('mul2'), 'rkey2'),
+ binding(Exchange('mul3'), 'rkey3'),
+ ])
+ q(chan).declare()
+ self.assertIn(
+ call(nowait=False,
+ exchange='mul1',
+ auto_delete=False,
+ passive=False,
+ arguments=None,
+ type='direct',
+ durable=True,
+ ), chan.exchange_declare.call_args_list,
+ )
def test_can_cache_declaration(self):
self.assertTrue(Queue('a', durable=True).can_cache_declaration)
diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py
index 4c5fe619..249af87f 100644
--- a/kombu/tests/test_messaging.py
+++ b/kombu/tests/test_messaging.py
@@ -1,13 +1,14 @@
from __future__ import absolute_import, unicode_literals
import anyjson
+import pickle
+from collections import defaultdict
from mock import patch
-from kombu import Connection
+from kombu import Connection, Consumer, Producer, Exchange, Queue
from kombu.exceptions import MessageStateError
-from kombu.messaging import Consumer, Producer
-from kombu.entity import Exchange, Queue
+from kombu.utils import ChannelPromise
from .mocks import Transport
from .utils import TestCase
@@ -23,6 +24,16 @@ class test_Producer(TestCase):
self.assertTrue(self.connection.connection.connected)
self.assertFalse(self.exchange.is_bound)
+ def test_pickle(self):
+ chan = Mock()
+ producer = Producer(chan, serializer='pickle')
+ p2 = pickle.loads(pickle.dumps(producer))
+ self.assertEqual(p2.serializer, producer.serializer)
+
+ def test_no_channel(self):
+ p = Producer(None)
+ self.assertFalse(p._channel)
+
@patch('kombu.common.maybe_declare')
def test_maybe_declare(self, maybe_declare):
p = self.connection.Producer()
@@ -109,11 +120,25 @@ class test_Producer(TestCase):
def test_publish_with_Exchange_instance(self):
p = self.connection.Producer()
p.channel = Mock()
- p.publish('hello', exchange=Exchange('foo'))
+ p.publish('hello', exchange=Exchange('foo'), delivery_mode='transient')
self.assertEqual(
p._channel.basic_publish.call_args[1]['exchange'], 'foo',
)
+ def test_set_on_return(self):
+ chan = Mock()
+ chan.events = defaultdict(Mock)
+ p = Producer(ChannelPromise(lambda: chan), on_return='on_return')
+ p.channel
+ chan.events['basic_return'].add.assert_called_with('on_return')
+
+ def test_publish_retry_calls_ensure(self):
+ p = Producer(Mock())
+ p._connection = Mock()
+ ensure = p.connection.ensure = Mock()
+ p.publish('foo', exchange='foo', retry=True)
+ self.assertTrue(ensure.called)
+
def test_publish_retry_with_declare(self):
p = self.connection.Producer()
p.maybe_declare = Mock()
@@ -195,6 +220,12 @@ class test_Consumer(TestCase):
self.assertTrue(self.connection.connection.connected)
self.exchange = Exchange('foo', 'direct')
+ def test_set_no_channel(self):
+ c = Consumer(None)
+ self.assertIsNone(c.channel)
+ c.revive(Mock())
+ self.assertTrue(c.channel)
+
def test_set_no_ack(self):
channel = self.connection.channel()
queue = Queue('qname', self.exchange, 'rkey')
diff --git a/kombu/tests/test_pools.py b/kombu/tests/test_pools.py
index 32d1294f..a9386602 100644
--- a/kombu/tests/test_pools.py
+++ b/kombu/tests/test_pools.py
@@ -1,9 +1,8 @@
from __future__ import absolute_import
-from kombu import Connection
+from kombu import Connection, Producer
from kombu import pools
from kombu.connection import ConnectionPool
-from kombu.messaging import Producer
from kombu.utils import eqhash
from .utils import TestCase
@@ -26,6 +25,34 @@ class test_ProducerPool(TestCase):
self.connections = Mock()
self.pool = self.Pool(self.connections, limit=10)
+ def test_releases_connection_when_Producer_raises(self):
+ self.pool.Producer = Mock()
+ self.pool.Producer.side_effect = IOError()
+ acq = self.pool._acquire_connection = Mock()
+ conn = acq.return_value = Mock()
+ with self.assertRaises(IOError):
+ self.pool.create_producer()
+ conn.release.assert_called_with()
+
+ def test_prepare_release_connection_on_error(self):
+ pp = Mock()
+ p = pp.return_value = Mock()
+ p.revive.side_effect = IOError()
+ acq = self.pool._acquire_connection = Mock()
+ conn = acq.return_value = Mock()
+ p._channel = None
+ with self.assertRaises(IOError):
+ self.pool.prepare(pp)
+ conn.release.assert_called_with()
+
+ def test_release_releases_connection(self):
+ p = Mock()
+ p.__connection__ = Mock()
+ self.pool.release(p)
+ p.__connection__.release.assert_called_with()
+ p.__connection__ = None
+ self.pool.release(p)
+
def test_init(self):
self.assertIs(self.pool.connections, self.connections)
@@ -57,7 +84,7 @@ class test_ProducerPool(TestCase):
def test_prepare(self):
connection = self.connections.acquire.return_value = Mock()
pool = self.MyPool(self.connections, limit=10)
- pool.instance.channel = None
+ pool.instance._channel = None
first = pool._resource.get_nowait()
producer = pool.prepare(first)
self.assertTrue(self.connections.acquire.called)
@@ -66,7 +93,7 @@ class test_ProducerPool(TestCase):
def test_prepare_channel_already_created(self):
self.connections.acquire.return_value = Mock()
pool = self.MyPool(self.connections, limit=10)
- pool.instance.channel = Mock()
+ pool.instance._channel = Mock()
first = pool._resource.get_nowait()
self.connections.acquire.reset()
producer = pool.prepare(first)
diff --git a/kombu/tests/test_utils.py b/kombu/tests/test_utils.py
index 3633f8b1..38b0f581 100644
--- a/kombu/tests/test_utils.py
+++ b/kombu/tests/test_utils.py
@@ -5,12 +5,15 @@ import sys
from functools import wraps
from io import BytesIO, StringIO
+from mock import Mock, patch
from kombu import utils
from kombu.five import string_t
-from .utils import redirect_stdouts, mask_modules, skip_if_module
-from .utils import TestCase
+from .utils import (
+ TestCase,
+ redirect_stdouts, mask_modules, module_exists, skip_if_module,
+)
class OldString(object):
@@ -28,6 +31,13 @@ class OldString(object):
return self.value.rsplit(*args, **kwargs)
+class test_kombu_module(TestCase):
+
+ def test_dir(self):
+ import kombu
+ self.assertTrue(dir(kombu))
+
+
class test_utils(TestCase):
def test_maybe_list(self):
@@ -174,10 +184,21 @@ class test_retry_over_time(TestCase):
@insomnia
def test_simple(self):
- x = utils.retry_over_time(self.myfun, self.Predicate,
- errback=self.errback, interval_max=14)
- self.assertEqual(x, 42)
- self.assertEqual(self.index, 9)
+ prev_count, utils.count = utils.count, Mock()
+ try:
+ utils.count.return_value = range(1)
+ x = utils.retry_over_time(self.myfun, self.Predicate,
+ errback=None, interval_max=14)
+ self.assertIsNone(x)
+ utils.count.return_value = range(10)
+ cb = Mock()
+ x = utils.retry_over_time(self.myfun, self.Predicate,
+ errback=self.errback, callback=cb, interval_max=14)
+ self.assertEqual(x, 42)
+ self.assertEqual(self.index, 9)
+ cb.assert_called_with()
+ finally:
+ utils.count = prev_count
@insomnia
def test_retry_once(self):
@@ -200,6 +221,26 @@ class test_retry_over_time(TestCase):
class test_cached_property(TestCase):
+ def test_deleting(self):
+
+ class X(object):
+ xx = False
+
+ @utils.cached_property
+ def foo(self):
+ return 42
+
+ @foo.deleter # noqa
+ def foo(self, value):
+ self.xx = value
+
+ x = X()
+ del(x.foo)
+ self.assertFalse(x.xx)
+ x.__dict__['foo'] = 'here'
+ del(x.foo)
+ self.assertEqual(x.xx, 'here')
+
def test_when_access_from_class(self):
class X(object):
@@ -226,3 +267,75 @@ class test_cached_property(TestCase):
self.assertEqual(x.xx, 10)
del(x.foo)
+
+
+class test_symbol_by_name(TestCase):
+
+ def test_instance_returns_instance(self):
+ instance = object()
+ self.assertIs(utils.symbol_by_name(instance), instance)
+
+ def test_returns_default(self):
+ default = object()
+ self.assertIs(utils.symbol_by_name('xyz.ryx.qedoa.weq:foz',
+ default=default), default)
+
+ def test_no_default(self):
+ with self.assertRaises(ImportError):
+ utils.symbol_by_name('xyz.ryx.qedoa.weq:foz')
+
+ def test_imp_reraises_ValueError(self):
+ imp = Mock()
+ imp.side_effect = ValueError()
+ with self.assertRaises(ValueError):
+ utils.symbol_by_name('kombu.Connection', imp=imp)
+
+ def test_package(self):
+ from kombu.entity import Exchange
+ self.assertIs(utils.symbol_by_name('.entity:Exchange',
+ package='kombu'), Exchange)
+ self.assertTrue(utils.symbol_by_name(':Consumer', package='kombu'))
+
+
+class test_ChannelPromise(TestCase):
+
+ def test_repr(self):
+ self.assertEqual(repr(utils.ChannelPromise(lambda: 'foo')),
+ "<promise: 'foo'>")
+
+
+class test_entrypoints(TestCase):
+
+ @mask_modules('pkg_resources')
+ def test_without_pkg_resources(self):
+ self.assertListEqual(list(utils.entrypoints('kombu.test')), [])
+
+ @module_exists('pkg_resources')
+ def test_with_pkg_resources(self):
+ with patch('pkg_resources.iter_entry_points', create=True) as iterep:
+ eps = iterep.return_value = [Mock(), Mock()]
+
+ self.assertTrue(list(utils.entrypoints('kombu.test')))
+ iterep.assert_called_with('kombu.test')
+ eps[0].load.assert_called_with()
+ eps[1].load.assert_called_with()
+
+
+class test_shufflecycle(TestCase):
+
+ def test_shuffles(self):
+ prev_repeat, utils.repeat = utils.repeat, Mock()
+ try:
+ utils.repeat.return_value = range(10)
+ values = set(['A', 'B', 'C'])
+ cycle = utils.shufflecycle(values)
+ seen = set()
+ for i in xrange(10):
+ cycle.next()
+ utils.repeat.assert_called_with(None)
+ self.assertTrue(seen.issubset(values))
+ with self.assertRaises(StopIteration):
+ cycle.next()
+ cycle.next()
+ finally:
+ utils.repeat = prev_repeat
diff --git a/kombu/tests/transport/test_amqplib.py b/kombu/tests/transport/test_amqplib.py
index 6ce95cc9..fb6c0141 100644
--- a/kombu/tests/transport/test_amqplib.py
+++ b/kombu/tests/transport/test_amqplib.py
@@ -4,7 +4,7 @@ import sys
from nose import SkipTest
-from kombu.connection import Connection
+from kombu import Connection
from kombu.tests.utils import TestCase
from kombu.tests.utils import mask_modules, Mock
diff --git a/kombu/tests/transport/test_filesystem.py b/kombu/tests/transport/test_filesystem.py
index 6a8d64d1..7bdad8f5 100644
--- a/kombu/tests/transport/test_filesystem.py
+++ b/kombu/tests/transport/test_filesystem.py
@@ -4,9 +4,7 @@ import tempfile
from nose import SkipTest
-from kombu.connection import Connection
-from kombu.entity import Exchange, Queue
-from kombu.messaging import Consumer, Producer
+from kombu import Connection, Exchange, Queue, Consumer, Producer
from kombu.tests.utils import TestCase
diff --git a/kombu/tests/transport/test_memory.py b/kombu/tests/transport/test_memory.py
index 95e3432e..970681d7 100644
--- a/kombu/tests/transport/test_memory.py
+++ b/kombu/tests/transport/test_memory.py
@@ -2,9 +2,7 @@ from __future__ import absolute_import
import socket
-from kombu.connection import Connection
-from kombu.entity import Exchange, Queue
-from kombu.messaging import Consumer, Producer
+from kombu import Connection, Exchange, Queue, Consumer, Producer
from kombu.tests.utils import TestCase
diff --git a/kombu/tests/transport/test_mongodb.py b/kombu/tests/transport/test_mongodb.py
index b05e7d9f..522df04a 100644
--- a/kombu/tests/transport/test_mongodb.py
+++ b/kombu/tests/transport/test_mongodb.py
@@ -2,7 +2,7 @@ from __future__ import absolute_import
from nose import SkipTest
-from kombu.connection import Connection
+from kombu import Connection
from kombu.tests.utils import TestCase, skip_if_not_module
diff --git a/kombu/tests/transport/test_pyamqp.py b/kombu/tests/transport/test_pyamqp.py
index b48ec153..7936484f 100644
--- a/kombu/tests/transport/test_pyamqp.py
+++ b/kombu/tests/transport/test_pyamqp.py
@@ -2,6 +2,7 @@ from __future__ import absolute_import
import sys
+from mock import patch
from nose import SkipTest
try:
@@ -10,7 +11,7 @@ except ImportError:
pyamqp = None # noqa
else:
from kombu.transport import pyamqp
-from kombu.connection import Connection
+from kombu import Connection
from kombu.tests.utils import TestCase
from kombu.tests.utils import mask_modules, Mock
@@ -160,3 +161,26 @@ class test_pyamqp(TestCase):
c = Connection(port=1337, transport=Transport).connect()
self.assertEqual(c['host'], '127.0.0.1:1337')
+
+ def test_eventmap(self):
+ t = pyamqp.Transport(Mock())
+ conn = Mock()
+ self.assertDictEqual(t.eventmap(conn),
+ {conn.sock: t.client.drain_nowait})
+
+ def test_event_interface(self):
+ t = pyamqp.Transport(Mock())
+ t.on_poll_init(Mock())
+ t.on_poll_start()
+
+ def test_heartbeat_check(self):
+ t = pyamqp.Transport(Mock())
+ conn = Mock()
+ t.heartbeat_check(conn, rate=4.331)
+ conn.heartbeat_tick.assert_called_with(rate=4.331)
+
+ def test_get_manager(self):
+ with patch('kombu.transport.pyamqp.get_manager') as get_manager:
+ t = pyamqp.Transport(Mock())
+ t.get_manager(1, kw=2)
+ get_manager.assert_called_with(t.client, 1, kw=2)
diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py
index 403e815e..cf166e87 100644
--- a/kombu/tests/transport/test_redis.py
+++ b/kombu/tests/transport/test_redis.py
@@ -7,11 +7,9 @@ from anyjson import dumps
from collections import defaultdict
from itertools import count
-from kombu.connection import Connection
-from kombu.entity import Exchange, Queue
+from kombu import Connection, Exchange, Queue, Consumer, Producer
from kombu.exceptions import InconsistencyError, VersionMismatch
from kombu.five import Empty, Queue as _Queue
-from kombu.messaging import Consumer, Producer
from kombu.utils import eventio # patch poll
from kombu.tests.utils import TestCase
@@ -42,9 +40,7 @@ class Client(object):
hashes = defaultdict(dict)
shard_hint = None
- def __init__(self, db=None, port=None, **kwargs):
- self.port = port
- self.db = db
+ def __init__(self, db=None, port=None, connection_pool=None, **kwargs):
self._called = []
self._connection = None
self.bgsave_raises_ResponseError = False
@@ -384,10 +380,9 @@ class test_Channel(TestCase):
c.connection.disconnect.assert_called_with()
def test_invalid_database_raises_ValueError(self):
- self.channel.connection.client.virtual_host = 'xfeqwewkfk'
with self.assertRaises(ValueError):
- self.channel._create_client()
+ Connection('redis:///dwqwewqe').channel()
@skip_if_not_module('redis')
def test_get_client(self):
@@ -395,7 +390,7 @@ class test_Channel(TestCase):
KombuRedis = redis.Channel._get_client(self.channel)
self.assertTrue(KombuRedis)
- Rv = getattr(R, 'VERSION')
+ Rv = getattr(R, 'VERSION', None)
try:
R.VERSION = (2, 4, 0)
with self.assertRaises(VersionMismatch):
@@ -528,29 +523,23 @@ class test_Redis(TestCase):
channel.close()
def test_db_values(self):
- c1 = Connection(virtual_host=1,
- transport=Transport).channel()
- self.assertEqual(c1.client.db, 1)
+ Connection(virtual_host=1,
+ transport=Transport).channel()
- c2 = Connection(virtual_host='1',
- transport=Transport).channel()
- self.assertEqual(c2.client.db, 1)
+ Connection(virtual_host='1',
+ transport=Transport).channel()
- c3 = Connection(virtual_host='/1',
- transport=Transport).channel()
- self.assertEqual(c3.client.db, 1)
+ Connection(virtual_host='/1',
+ transport=Transport).channel()
with self.assertRaises(Exception):
- Connection(virtual_host='/foo',
- transport=Transport).channel()
+ Connection('redis:///foo').channel()
def test_db_port(self):
c1 = Connection(port=None, transport=Transport).channel()
- self.assertEqual(c1.client.port, Transport.default_port)
c1.close()
c2 = Connection(port=9999, transport=Transport).channel()
- self.assertEqual(c2.client.port, 9999)
c2.close()
def test_close_poller_not_active(self):
diff --git a/kombu/tests/transport/test_sqlalchemy.py b/kombu/tests/transport/test_sqlalchemy.py
index 82b51ebc..f3d7d239 100644
--- a/kombu/tests/transport/test_sqlalchemy.py
+++ b/kombu/tests/transport/test_sqlalchemy.py
@@ -3,7 +3,7 @@ from __future__ import absolute_import
from mock import patch
from nose import SkipTest
-from kombu.connection import Connection
+from kombu import Connection
from kombu.tests.utils import TestCase
diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py
index eb7eaf6d..232be80b 100644
--- a/kombu/tests/transport/virtual/test_base.py
+++ b/kombu/tests/transport/virtual/test_base.py
@@ -91,6 +91,9 @@ class test_QoS(TestCase):
self.assertTrue(stderr.getvalue())
self.assertFalse(stdout.getvalue())
+ self.q.restore_at_shutdown = False
+ self.q.restore_unacked_once()
+
def test_get(self):
self.q._delivered['foo'] = 1
self.assertEqual(self.q.get('foo'), 1)
@@ -176,10 +179,34 @@ class test_Channel(TestCase):
if self.channel._qos is not None:
self.channel._qos._on_collect.cancel()
+ def test_exchange_bind_interface(self):
+ with self.assertRaises(NotImplementedError):
+ self.channel.exchange_bind('dest', 'src', 'key')
+
+ def test_exchange_unbind_interface(self):
+ with self.assertRaises(NotImplementedError):
+ self.channel.exchange_unbind('dest', 'src', 'key')
+
+ def test_queue_unbind_interface(self):
+ with self.assertRaises(NotImplementedError):
+ self.channel.queue_unbind('dest', 'ex', 'key')
+
+ def test_management(self):
+ m = self.channel.connection.client.get_manager()
+ self.assertTrue(m)
+ m.get_bindings()
+ m.close()
+
def test_exchange_declare(self):
c = self.channel
+
+ with self.assertRaises(StdChannelError):
+ c.exchange_declare('test_exchange_declare', 'direct',
+ durable=True, auto_delete=True, passive=True)
c.exchange_declare('test_exchange_declare', 'direct',
durable=True, auto_delete=True)
+ c.exchange_declare('test_exchange_declare', 'direct',
+ durable=True, auto_delete=True, passive=True)
self.assertIn('test_exchange_declare', c.state.exchanges)
# can declare again with same values
c.exchange_declare('test_exchange_declare', 'direct',
@@ -348,7 +375,7 @@ class test_Channel(TestCase):
exc = None
try:
raise KeyError()
- except KeyError as exc_:
+ except KeyError, exc_:
exc = exc_
ru.return_value = [(exc, 1)]
diff --git a/kombu/tests/utilities/test_amq_manager.py b/kombu/tests/utilities/test_amq_manager.py
new file mode 100644
index 00000000..8f85a2c3
--- /dev/null
+++ b/kombu/tests/utilities/test_amq_manager.py
@@ -0,0 +1,35 @@
+from __future__ import absolute_import
+
+from mock import patch
+
+from kombu import Connection
+from kombu.tests.utils import TestCase, mask_modules, module_exists
+
+
+class test_get_manager(TestCase):
+
+ @mask_modules('pyrabbit')
+ def test_without_pyrabbit(self):
+ with self.assertRaises(ImportError):
+ Connection('amqp://').get_manager()
+
+ @module_exists('pyrabbit')
+ def test_with_pyrabbit(self):
+ with patch('pyrabbit.Client', create=True) as Client:
+ manager = Connection('amqp://').get_manager()
+ self.assertIsNotNone(manager)
+ Client.assert_called_with('localhost:55672',
+ 'guest', 'guest')
+
+ @module_exists('pyrabbit')
+ def test_transport_options(self):
+ with patch('pyrabbit.Client', create=True) as Client:
+ manager = Connection('amqp://', transport_options={
+ 'manager_hostname': 'admin.mq.vandelay.com',
+ 'manager_port': 808,
+ 'manager_userid': 'george',
+ 'manager_password': 'bosco',
+ }).get_manager()
+ self.assertIsNotNone(manager)
+ Client.assert_called_with('admin.mq.vandelay.com:808',
+ 'george', 'bosco')
diff --git a/kombu/tests/utilities/test_debug.py b/kombu/tests/utilities/test_debug.py
new file mode 100644
index 00000000..d364400f
--- /dev/null
+++ b/kombu/tests/utilities/test_debug.py
@@ -0,0 +1,58 @@
+from __future__ import absolute_import
+
+import logging
+
+from mock import Mock, patch
+
+from kombu.utils.debug import (
+ setup_logging,
+ Logwrapped,
+)
+from kombu.tests.utils import TestCase
+
+
+class test_setup_logging(TestCase):
+
+ def test_adds_handlers_sets_level(self):
+ with patch('kombu.utils.debug.get_logger') as get_logger:
+ logger = get_logger.return_value = Mock()
+ setup_logging(loggers=['kombu.test'])
+
+ get_logger.assert_called_with('kombu.test')
+
+ self.assertTrue(logger.addHandler.called)
+ logger.setLevel.assert_called_with(logging.DEBUG)
+
+
+class test_Logwrapped(TestCase):
+
+ def test_wraps(self):
+ with patch('kombu.utils.debug.get_logger') as get_logger:
+ logger = get_logger.return_value = Mock()
+
+ W = Logwrapped(Mock(), 'kombu.test')
+ get_logger.assert_called_with('kombu.test')
+ self.assertIsNotNone(W.instance)
+ self.assertIs(W.logger, logger)
+
+ W.instance.__repr__ = lambda s: 'foo'
+ self.assertEqual(repr(W), 'foo')
+ self.assertListEqual(dir(W), dir(W.instance))
+
+ W.instance.some_attr = 303
+ self.assertEqual(W.some_attr, 303)
+
+ W.instance.some_method.__name__ = 'some_method'
+ W.some_method(1, 2, kw=1)
+ W.instance.some_method.assert_called_with(1, 2, kw=1)
+
+ W.some_method()
+ W.instance.some_method.assert_called_with()
+
+ W.some_method(kw=1)
+ W.instance.some_method.assert_called_with(kw=1)
+
+ W.ident = 'ident'
+ W.some_method(kw=1)
+ self.assertTrue(logger.debug.called)
+ self.assertIn('ident', logger.debug.call_args[0][0])
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py
index 2020b41a..12d92226 100644
--- a/kombu/transport/SQS.py
+++ b/kombu/transport/SQS.py
@@ -4,9 +4,6 @@ kombu.transport.SQS
Amazon SQS transport.
-:copyright: (c) 2010 - 2012 by Ask Solem
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
@@ -273,7 +270,7 @@ class Channel(virtual.Channel):
if conn:
try:
conn.close()
- except AttributeError as exc: # FIXME ???
+ except AttributeError, exc: # FIXME ???
if "can't set attribute" not in str(exc):
raise
diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py
index e8f01486..e14015ef 100644
--- a/kombu/transport/__init__.py
+++ b/kombu/transport/__init__.py
@@ -4,9 +4,6 @@ kombu.transport
Built-in transports.
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
@@ -51,8 +48,6 @@ TRANSPORT_ALIASES = {
'amqp': 'kombu.transport.pyamqp:Transport',
'pyamqp': 'kombu.transport.pyamqp:Transport',
'librabbitmq': 'kombu.transport.librabbitmq:Transport',
- 'pika': 'kombu.transport.pika2:Transport',
- 'oldpika': 'kombu.transport.pika:SyncTransport',
'memory': 'kombu.transport.memory:Transport',
'redis': 'kombu.transport.redis:Transport',
'SQS': 'kombu.transport.SQS:Transport',
diff --git a/kombu/transport/amqplib.py b/kombu/transport/amqplib.py
index 9f9f467d..1f9697fb 100644
--- a/kombu/transport/amqplib.py
+++ b/kombu/transport/amqplib.py
@@ -4,9 +4,6 @@ kombu.transport.amqplib
amqplib transport.
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
@@ -59,7 +56,7 @@ class TCPTransport(transport.TCPTransport):
while len(self._read_buffer) < n:
try:
s = self.sock.recv(65536)
- except socket.error as exc:
+ except socket.error, exc:
if not initial and exc.errno in (errno.EAGAIN, errno.EINTR):
continue
raise
@@ -108,7 +105,7 @@ class SSLTransport(transport.SSLTransport):
while len(result) < n:
try:
s = self.sslobj.read(n - len(result))
- except socket.error as exc:
+ except socket.error, exc:
if not initial and exc.errno in (errno.EAGAIN, errno.EINTR):
continue
raise
@@ -192,7 +189,7 @@ class Connection(amqp.Connection): # pragma: no cover
try:
try:
return self.method_reader.read_method()
- except SSLError as exc:
+ except SSLError, exc:
# http://bugs.python.org/issue10272
if 'timed out' in str(exc):
raise socket.timeout()
diff --git a/kombu/transport/base.py b/kombu/transport/base.py
index c402f2f7..ff7a1640 100644
--- a/kombu/transport/base.py
+++ b/kombu/transport/base.py
@@ -4,9 +4,6 @@ kombu.transport.base
Base transport interface.
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
@@ -107,14 +104,14 @@ class Message(object):
def ack_log_error(self, logger, errors):
try:
self.ack()
- except errors as exc:
+ except errors, exc:
logger.critical("Couldn't ack %r, reason:%r",
self.delivery_tag, exc, exc_info=True)
def reject_log_error(self, logger, errors):
try:
self.reject()
- except errors as exc:
+ except errors, exc:
logger.critical("Couldn't ack %r, reason: %r",
self.delivery_tag, exc, exc_info=True)
diff --git a/kombu/transport/librabbitmq.py b/kombu/transport/librabbitmq.py
index 7578c55a..3548a8c0 100644
--- a/kombu/transport/librabbitmq.py
+++ b/kombu/transport/librabbitmq.py
@@ -6,9 +6,6 @@ kombu.transport.librabbitmq
.. _`librabbitmq`: http://pypi.python.org/librabbitmq/
-:copyright: (c) 2010 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
@@ -116,6 +113,9 @@ class Transport(base.Transport):
"""Close the AMQP broker connection."""
connection.close()
+ def verify_connection(self, connection):
+ return connection.connected
+
def on_poll_init(self, poller):
pass
diff --git a/kombu/transport/memory.py b/kombu/transport/memory.py
index b8fe4058..1c67956d 100644
--- a/kombu/transport/memory.py
+++ b/kombu/transport/memory.py
@@ -4,9 +4,6 @@ kombu.transport.memory
In-memory transport.
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py
index 18c49971..91603688 100644
--- a/kombu/transport/mongodb.py
+++ b/kombu/transport/mongodb.py
@@ -55,7 +55,7 @@ class Channel(virtual.Channel):
msg = self.client.command('findandmodify', 'messages',
query={'queue': queue},
sort={'_id': pymongo.ASCENDING}, remove=True)
- except errors.OperationFailure as exc:
+ except errors.OperationFailure, exc:
if 'No matching object found' in exc.args[0]:
raise Empty()
raise
diff --git a/kombu/transport/pika.py b/kombu/transport/pika.py
deleted file mode 100644
index ce60dd6c..00000000
--- a/kombu/transport/pika.py
+++ /dev/null
@@ -1,262 +0,0 @@
-"""
-kombu.transport.pika
-====================
-
-Pika transport.
-
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
-"""
-from __future__ import absolute_import
-
-import socket
-
-from operator import attrgetter
-
-from kombu.exceptions import (
- StdConnectionError,
- StdChannelError,
- VersionMismatch,
-)
-
-from . import base
-
-from pika import channel # must be here to raise import error
-try:
- from pika import asyncore_adapter
-except ImportError:
- raise VersionMismatch('Kombu only works with pika version 0.5.2')
-from pika import blocking_adapter
-from pika import connection
-from pika import exceptions
-from pika.spec import Basic, BasicProperties
-
-
-DEFAULT_PORT = 5672
-
-
-BASIC_PROPERTIES = ('content_type', 'content_encoding',
- 'headers', 'delivery_mode', 'priority',
- 'correlation_id', 'reply_to', 'expiration',
- 'message_id', 'timestamp', 'type', 'user_id',
- 'app_id', 'cluster_id')
-
-
-class Message(base.Message):
-
- def __init__(self, channel, amqp_message, **kwargs):
- channel_id, method, props, body = amqp_message
- propdict = dict(zip(BASIC_PROPERTIES,
- attrgetter(*BASIC_PROPERTIES)(props)))
-
- kwargs.update({'body': body,
- 'delivery_tag': method.delivery_tag,
- 'content_type': props.content_type,
- 'content_encoding': props.content_encoding,
- 'headers': props.headers,
- 'properties': propdict,
- 'delivery_info': dict(
- consumer_tag=getattr(method, 'consumer_tag', None),
- routing_key=method.routing_key,
- delivery_tag=method.delivery_tag,
- redelivered=method.redelivered,
- exchange=method.exchange)})
-
- super(Message, self).__init__(channel, **kwargs)
-
-
-class Channel(channel.Channel, base.StdChannel):
- Message = Message
-
- def basic_get(self, queue, no_ack):
- method = channel.Channel.basic_get(self, queue=queue, no_ack=no_ack)
- # pika returns semi-predicates (GetEmpty/GetOk).
- if isinstance(method, Basic.GetEmpty):
- return
- return None, method, method._properties, method._body
-
- def queue_purge(self, queue=None, nowait=False):
- return channel.Channel.queue_purge(self, queue=queue, nowait=nowait) \
- .message_count
-
- def basic_publish(self, message, exchange, routing_key, mandatory=False,
- immediate=False):
- message_data, properties = message
- try:
- return channel.Channel.basic_publish(self,
- exchange,
- routing_key,
- message_data,
- properties,
- mandatory,
- immediate)
- finally:
- # Pika does not automatically flush the outbound buffer
- # TODO async: Needs to support `nowait`.
- self.handler.connection.flush_outbound()
-
- def basic_consume(self, queue, no_ack=False, consumer_tag=None,
- callback=None, nowait=False):
-
- # Kombu callbacks only take a single `message` argument,
- # but pika applies with 4 arguments, so need to wrap
- # these into a single tuple.
- def _callback_decode(channel, method, header, body):
- return callback((channel, method, header, body))
-
- return channel.Channel.basic_consume(self, _callback_decode,
- queue, no_ack,
- False, consumer_tag)
-
- def prepare_message(self, body, priority=None,
- content_type=None, content_encoding=None, headers=None,
- properties=None):
- properties = BasicProperties(priority=priority,
- content_type=content_type,
- content_encoding=content_encoding,
- headers=headers,
- **properties)
- return body, properties
-
- def message_to_python(self, raw_message):
- return self.Message(channel=self, amqp_message=raw_message)
-
- def basic_ack(self, delivery_tag):
- return channel.Channel.basic_ack(self, delivery_tag)
-
- def __enter__(self):
- return self
-
- def __exit__(self, *exc_info):
- self.close()
-
- def close(self):
- super(Channel, self).close()
- if getattr(self, 'handler', None):
- if getattr(self.handler, 'connection', None):
- self.handler.connection.channels.pop(
- self.handler.channel_number, None)
- self.handler.connection = None
- self.handler = None
-
- @property
- def channel_id(self):
- return self.channel_number
-
-
-class BlockingConnection(blocking_adapter.BlockingConnection):
- Super = blocking_adapter.BlockingConnection
-
- def __init__(self, client, *args, **kwargs):
- self.client = client
- self.Super.__init__(self, *args, **kwargs)
-
- def channel(self):
- c = Channel(channel.ChannelHandler(self))
- c.connection = self
- return c
-
- def close(self):
- self.client = None
- self.Super.close(self)
-
- def ensure_drain_events(self, timeout=None):
- return self.drain_events(timeout=timeout)
-
-
-class AsyncoreConnection(asyncore_adapter.AsyncoreConnection):
- _event_counter = 0
- Super = asyncore_adapter.AsyncoreConnection
-
- def __init__(self, client, *args, **kwargs):
- self.client = client
- self.Super.__init__(self, *args, **kwargs)
-
- def channel(self):
- c = Channel(channel.ChannelHandler(self))
- c.connection = self
- return c
-
- def ensure_drain_events(self, timeout=None):
- # asyncore connection does not raise socket.timeout when timing out
- # so need to do a little trick here to mimic the behavior
- # of sync connection.
- current_events = self._event_counter
- self.drain_events(timeout=timeout)
- if timeout and self._event_counter <= current_events:
- raise socket.timeout('timed out')
-
- def on_data_available(self, buf):
- self._event_counter += 1
- self.Super.on_data_available(self, buf)
-
- def close(self):
- self.client = None
- self.Super.close(self)
-
-
-class SyncTransport(base.Transport):
- Message = Message
- Connection = BlockingConnection
-
- default_port = DEFAULT_PORT
- connection_errors = (StdConnectionError,
- socket.error,
- exceptions.ConnectionClosed,
- exceptions.ChannelClosed,
- exceptions.LoginError,
- exceptions.NoFreeChannels,
- exceptions.DuplicateConsumerTag,
- exceptions.UnknownConsumerTag,
- exceptions.RecursiveOperationDetected,
- exceptions.ContentTransmissionForbidden,
- exceptions.ProtocolSyntaxError)
- channel_errors = (StdChannelError,
- exceptions.ChannelClosed,
- exceptions.DuplicateConsumerTag,
- exceptions.UnknownConsumerTag,
- exceptions.ProtocolSyntaxError)
- driver_type = 'amqp'
- driver_name = 'pika'
-
- def __init__(self, client, **kwargs):
- self.client = client
- self.default_port = kwargs.get('default_port', self.default_port)
-
- def driver_version(self):
- import pika
- return pika.__version__
-
- def create_channel(self, connection):
- return connection.channel()
-
- def drain_events(self, connection, **kwargs):
- return connection.ensure_drain_events(**kwargs)
-
- def establish_connection(self):
- """Establish connection to the AMQP broker."""
- conninfo = self.client
- for name, default_value in self.default_connection_params.items():
- if not getattr(conninfo, name, None):
- setattr(conninfo, name, default_value)
- credentials = connection.PlainCredentials(conninfo.userid,
- conninfo.password)
- return self.Connection(self.client,
- connection.ConnectionParameters(
- conninfo.hostname, port=conninfo.port,
- virtual_host=conninfo.virtual_host,
- credentials=credentials))
-
- def close_connection(self, connection):
- """Close the AMQP broker connection."""
- connection.close()
-
- @property
- def default_connection_params(self):
- return {'hostname': 'localhost', 'port': self.default_port,
- 'userid': 'guest', 'password': 'guest'}
-
-
-class AsyncoreTransport(SyncTransport):
- Connection = AsyncoreConnection
diff --git a/kombu/transport/pika2.py b/kombu/transport/pika2.py
deleted file mode 100644
index 4015ecf5..00000000
--- a/kombu/transport/pika2.py
+++ /dev/null
@@ -1,235 +0,0 @@
-"""
-kombu.transport.pika
-====================
-
-Pika transport.
-
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
-"""
-from __future__ import absolute_import
-
-import socket
-
-from operator import attrgetter
-
-from kombu.exceptions import StdConnectionError, StdChannelError
-from kombu.utils.amq_manager import get_manager
-
-from . import base
-
-import pika
-from pika import spec
-from pika.adapters import blocking_connection as blocking
-from pika import exceptions
-
-DEFAULT_PORT = 5672
-BASIC_PROPERTIES = ('content_type', 'content_encoding',
- 'headers', 'delivery_mode', 'priority',
- 'correlation_id', 'reply_to', 'expiration',
- 'message_id', 'timestamp', 'type', 'user_id',
- 'app_id', 'cluster_id')
-
-
-class Message(base.Message):
-
- def __init__(self, channel, amqp_message, **kwargs):
- channel_id, method, props, body = amqp_message
- propdict = dict(zip(BASIC_PROPERTIES,
- attrgetter(*BASIC_PROPERTIES)(props)))
-
- kwargs.update({'body': body,
- 'delivery_tag': method.delivery_tag,
- 'content_type': props.content_type,
- 'content_encoding': props.content_encoding,
- 'headers': props.headers,
- 'properties': propdict,
- 'delivery_info': dict(
- consumer_tag=getattr(method, 'consumer_tag', None),
- routing_key=method.routing_key,
- delivery_tag=method.delivery_tag,
- redelivered=method.redelivered,
- exchange=method.exchange)})
-
- super(Message, self).__init__(channel, **kwargs)
-
-
-class Channel(blocking.BlockingChannel, base.StdChannel):
- Message = Message
-
- def basic_get(self, queue, no_ack):
- method = super(Channel, self).basic_get(self, queue=queue,
- no_ack=no_ack)
- # pika returns semi-predicates (GetEmpty/GetOk).
- if isinstance(method, spec.Basic.GetEmpty):
- return
- return None, method, method._properties, method._body
-
- def queue_purge(self, queue=None, nowait=False):
- return super(Channel, self).\
- queue_purge(queue=queue, nowait=nowait).method.message_count
-
- def basic_publish(self, message, exchange, routing_key, mandatory=False,
- immediate=False):
- body, properties = message
- try:
- return super(Channel, self).basic_publish(exchange,
- routing_key,
- body,
- properties,
- mandatory,
- immediate)
- finally:
- # Pika does not automatically flush the outbound buffer
- # TODO async: Needs to support `nowait`.
- self.connection._flush_outbound()
-
- def basic_consume(self, queue, no_ack=False, consumer_tag=None,
- callback=None, nowait=False):
-
- # Kombu callbacks only take a single `message` argument,
- # but pika applies with 4 arguments, so need to wrap
- # these into a single tuple.
- def _callback_decode(channel, method, header, body):
- return callback((channel, method, header, body))
-
- return super(Channel, self).basic_consume(
- _callback_decode, queue, no_ack, False, consumer_tag)
-
- def prepare_message(self, body, priority=None,
- content_type=None, content_encoding=None, headers=None,
- properties=None):
- properties = spec.BasicProperties(priority=priority,
- content_type=content_type,
- content_encoding=content_encoding,
- headers=headers)
- return body, properties
-
- def message_to_python(self, raw_message):
- return self.Message(channel=self, amqp_message=raw_message)
-
- def basic_qos(self, prefetch_size, prefetch_count, a_global=False):
- return super(Channel, self).basic_qos(prefetch_size=prefetch_size,
- prefetch_count=prefetch_count,
- global_=a_global)
-
- def __enter__(self):
- return self
-
- def __exit__(self, *exc_info):
- self.close()
-
- def close(self, *args):
- super(Channel, self).close(*args)
- self.connection = None
- if getattr(self, 'handler', None):
- if getattr(self.handler, 'connection', None):
- self.handler.connection.channels.pop(
- self.handler.channel_number, None)
- self.handler.connection = None
- self.handler = None
-
- @property
- def channel_id(self):
- return self.channel_number
-
-
-class Connection(blocking.BlockingConnection):
- Channel = Channel
-
- def __init__(self, client, *args, **kwargs):
- self.client = client
- super(Connection, self).__init__(*args, **kwargs)
-
- def channel(self):
- self._channel_open = False
- cid = self._next_channel_number()
-
- self.callbacks.add(cid, spec.Channel.CloseOk, self._on_channel_close)
- transport = blocking.BlockingChannelTransport(self, cid)
- channel = self._channels[cid] = self.Channel(self, cid, transport)
- channel.connection = self
- return channel
-
- def drain_events(self, timeout=None):
- if timeout:
- prev = self.socket.gettimeout()
- self.socket.settimeout(timeout)
- try:
- self._handle_read()
- finally:
- if timeout:
- self.socket.settimeout(prev)
- self._flush_outbound()
-
- def close(self, *args):
- self.client = None
- super(Connection, self).close(*args)
-
-
-AuthenticationError = getattr(exceptions, 'AuthenticationError',
- getattr(exceptions, 'LoginError'))
-
-
-class Transport(base.Transport):
- Message = Message
- Connection = Connection
-
- default_port = DEFAULT_PORT
- connection_errors = (StdConnectionError,
- socket.error,
- exceptions.ConnectionClosed,
- exceptions.ChannelClosed,
- AuthenticationError,
- exceptions.NoFreeChannels,
- exceptions.DuplicateConsumerTag,
- exceptions.UnknownConsumerTag,
- exceptions.RecursiveOperationDetected,
- exceptions.ProtocolSyntaxError)
- channel_errors = (StdChannelError,
- exceptions.ChannelClosed,
- exceptions.DuplicateConsumerTag,
- exceptions.UnknownConsumerTag,
- exceptions.ProtocolSyntaxError)
- driver_type = 'amqp'
- driver_name = 'pika'
-
- def __init__(self, client, **kwargs):
- self.client = client
- self.default_port = kwargs.get('default_port', self.default_port)
-
- def driver_version(self):
- return pika.__version__
-
- def create_channel(self, connection):
- return connection.channel()
-
- def drain_events(self, connection, **kwargs):
- return connection.drain_events(**kwargs)
-
- def establish_connection(self):
- """Establish connection to the AMQP broker."""
- conninfo = self.client
- for name, default_value in self.default_connection_params.items():
- if not getattr(conninfo, name, None):
- setattr(conninfo, name, default_value)
- credentials = pika.PlainCredentials(conninfo.userid,
- conninfo.password)
- return self.Connection(self.client,
- pika.ConnectionParameters(
- conninfo.hostname, port=conninfo.port,
- virtual_host=conninfo.virtual_host,
- credentials=credentials))
-
- def close_connection(self, connection):
- """Close the AMQP broker connection."""
- connection.close()
-
- def get_manager(self, *args, **kwargs):
- return get_manager(self.client, *args, **kwargs)
-
- @property
- def default_connection_params(self):
- return {'hostname': 'localhost', 'port': self.default_port,
- 'userid': 'guest', 'password': 'guest'}
diff --git a/kombu/transport/pyamqp.py b/kombu/transport/pyamqp.py
index d03178e0..eec7d744 100644
--- a/kombu/transport/pyamqp.py
+++ b/kombu/transport/pyamqp.py
@@ -4,9 +4,6 @@ kombu.transport.pyamqp
pure python amqp transport.
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
@@ -23,7 +20,7 @@ from . import base
DEFAULT_PORT = 5672
-if amqp.VERSION < (0, 9, 3):
+if amqp.VERSION < (0, 9, 3): # pragma: no cover
raise VersionMismatch('Please install amqp version 0.9.3 or higher.')
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 4badf8fd..dc242bae 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -4,9 +4,6 @@ kombu.transport.redis
Redis transport.
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
@@ -399,8 +396,8 @@ class Channel(virtual.Channel):
c = self.subclient
if c.connection._sock is None:
c.connection.connect()
- self.subclient.subscribe(keys)
self._in_listen = True
+ self.subclient.subscribe(keys)
def _handle_message(self, client, r):
if r[0] == 'unsubscribe' and r[2] == 0:
@@ -432,8 +429,8 @@ class Channel(virtual.Channel):
return
keys = [self._q_for_pri(queue, pri) for pri in PRIORITY_STEPS
for queue in queues] + [timeout or 0]
- self.client.connection.send_command('BRPOP', *keys)
self._in_poll = True
+ self.client.connection.send_command('BRPOP', *keys)
def _brpop_read(self, **options):
try:
@@ -563,7 +560,7 @@ class Channel(virtual.Channel):
pass
super(Channel, self).close()
- def _create_client(self):
+ def _connparams(self):
conninfo = self.connection.client
database = conninfo.virtual_host
if not isinstance(database, int):
@@ -576,15 +573,16 @@ class Channel(virtual.Channel):
except ValueError:
raise ValueError(
'Database name must be int between 0 and limit - 1')
+ return {'host': conninfo.hostname or '127.0.0.1',
+ 'port': conninfo.port or DEFAULT_PORT,
+ 'db': database,
+ 'password': conninfo.password}
- return self.Client(host=conninfo.hostname or '127.0.0.1',
- port=conninfo.port or DEFAULT_PORT,
- db=database,
- password=conninfo.password,
- connection_pool=self.pool)
+ def _create_client(self):
+ return self.Client(connection_pool=self.pool)
def _get_pool(self):
- return redis.ConnectionPool()
+ return redis.ConnectionPool(**self._connparams())
def _get_client(self):
if redis.VERSION < (2, 4, 4):
diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py
index 4bf13f85..d7258b2e 100644
--- a/kombu/transport/virtual/__init__.py
+++ b/kombu/transport/virtual/__init__.py
@@ -6,9 +6,6 @@ Virtual transport implementation.
Emulates the AMQ API for non-AMQ transports.
-:copyright: (c) 2009, 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import, unicode_literals
@@ -176,7 +173,7 @@ class QoS(object):
try:
self.channel._restore(message)
- except BaseException as exc:
+ except BaseException, exc:
errors.append((exc, message))
delivered.clear()
return errors
diff --git a/kombu/transport/virtual/exchange.py b/kombu/transport/virtual/exchange.py
index fed253ba..bbc6a7b1 100644
--- a/kombu/transport/virtual/exchange.py
+++ b/kombu/transport/virtual/exchange.py
@@ -5,9 +5,6 @@ kombu.transport.virtual.exchange
Implementations of the standard exchanges defined
by the AMQ protocol (excluding the `headers` exchange).
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
diff --git a/kombu/transport/virtual/scheduling.py b/kombu/transport/virtual/scheduling.py
index 7304f67b..bf92a3a8 100644
--- a/kombu/transport/virtual/scheduling.py
+++ b/kombu/transport/virtual/scheduling.py
@@ -4,9 +4,6 @@
Consumer utilities.
- :copyright: (c) 2009 - 2012 by Ask Solem.
- :license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
diff --git a/kombu/transport/zmq.py b/kombu/transport/zmq.py
index 1c7ff4ae..c03a78fc 100644
--- a/kombu/transport/zmq.py
+++ b/kombu/transport/zmq.py
@@ -11,7 +11,11 @@ import errno
import os
import socket
-import zmq
+try:
+ import zmq
+ from zmq import ZMQError
+except ImportError:
+ zmq = ZMQError = None # noqa
from kombu.exceptions import StdConnectionError, StdChannelError
from kombu.five import Empty
@@ -128,9 +132,9 @@ class Client(object):
def get(self, queue=None, timeout=None):
try:
return self.sink.recv(flags=zmq.NOBLOCK)
- except zmq.ZMQError as exc:
- if exc.errno == zmq.EAGAIN:
- raise socket.error(errno.EAGAIN, exc.strerror)
+ except ZMQError, e:
+ if e.errno == zmq.EAGAIN:
+ raise socket.error(errno.EAGAIN, e.strerror)
else:
raise
@@ -177,7 +181,7 @@ class Channel(virtual.Channel):
def _get(self, queue, timeout=None):
try:
return loads(self.client.get(queue, timeout))
- except socket.error as exc:
+ except socket.error, exc:
if exc.errno == errno.EAGAIN and timeout != 0:
raise Empty()
else:
@@ -226,7 +230,7 @@ class Transport(virtual.Transport):
driver_type = 'zeromq'
driver_name = 'zmq'
- connection_errors = (StdConnectionError, zmq.ZMQError,)
+ connection_errors = (StdConnectionError, ZMQError,)
channel_errors = (StdChannelError, )
supports_ev = True
@@ -234,8 +238,9 @@ class Transport(virtual.Transport):
nb_keep_draining = True
def __init__(self, *args, **kwargs):
+ if zmq is None:
+ raise ImportError('The zmq library is not installed')
super(Transport, self).__init__(*args, **kwargs)
-
self.cycle = MultiChannelPoller()
def driver_version(self):
@@ -258,8 +263,8 @@ class Transport(virtual.Transport):
for channel in connection.channels:
try:
evt = channel.cycle.get(timeout=timeout)
- except socket.error as exc:
- if exc.errno == errno.EAGAIN:
+ except socket.error, e:
+ if e.errno == errno.EAGAIN:
continue
raise
else:
diff --git a/kombu/transport/zookeeper.py b/kombu/transport/zookeeper.py
index d7655797..8eb61185 100644
--- a/kombu/transport/zookeeper.py
+++ b/kombu/transport/zookeeper.py
@@ -34,7 +34,6 @@ Zookeeper transport.
"""
from __future__ import absolute_import
-import kazoo
import socket
from anyjson import loads, dumps
@@ -44,6 +43,41 @@ from kombu.five import Empty
from . import virtual
+try:
+ import kazoo
+
+ KZ_CONNECTION_ERRORS = (
+ kazoo.zkclient.SystemErrorException,
+ kazoo.zkclient.ConnectionLossException,
+ kazoo.zkclient.MarshallingErrorException,
+ kazoo.zkclient.UnimplementedException,
+ kazoo.zkclient.OperationTimeoutException,
+ kazoo.zkclient.NoAuthException,
+ kazoo.zkclient.InvalidACLException,
+ kazoo.zkclient.AuthFailedException,
+ kazoo.zkclient.SessionExpiredException,
+ )
+
+ KZ_CHANNEL_ERRORS = (
+ kazoo.zkclient.RuntimeInconsistencyException,
+ kazoo.zkclient.DataInconsistencyException,
+ kazoo.zkclient.BadArgumentsException,
+ kazoo.zkclient.MarshallingErrorException,
+ kazoo.zkclient.UnimplementedException,
+ kazoo.zkclient.OperationTimeoutException,
+ kazoo.zkclient.ApiErrorException,
+ kazoo.zkclient.NoNodeException,
+ kazoo.zkclient.NoAuthException,
+ kazoo.zkclient.NodeExistsException,
+ kazoo.zkclient.NoChildrenForEphemeralsException,
+ kazoo.zkclient.NotEmptyException,
+ kazoo.zkclient.SessionExpiredException,
+ kazoo.zkclient.InvalidCallbackException,
+ )
+except ImportError:
+ kazoo = None # noqa
+ KZ_CONNECTION_ERRORS = KZ_CHANNEL_ERRORS = () # noqa
+
DEFAULT_PORT = 2181
__author__ = 'Mahendra M <mahendra.m@gmail.com>'
@@ -134,35 +168,16 @@ class Transport(virtual.Transport):
Channel = Channel
polling_interval = 1
default_port = DEFAULT_PORT
- connection_errors = (StdConnectionError,
- socket.error,
- kazoo.zkclient.SystemErrorException,
- kazoo.zkclient.ConnectionLossException,
- kazoo.zkclient.MarshallingErrorException,
- kazoo.zkclient.UnimplementedException,
- kazoo.zkclient.OperationTimeoutException,
- kazoo.zkclient.NoAuthException,
- kazoo.zkclient.InvalidACLException,
- kazoo.zkclient.AuthFailedException,
- kazoo.zkclient.SessionExpiredException)
-
- channel_errors = (StdChannelError,
- kazoo.zkclient.RuntimeInconsistencyException,
- kazoo.zkclient.DataInconsistencyException,
- kazoo.zkclient.BadArgumentsException,
- kazoo.zkclient.MarshallingErrorException,
- kazoo.zkclient.UnimplementedException,
- kazoo.zkclient.OperationTimeoutException,
- kazoo.zkclient.ApiErrorException,
- kazoo.zkclient.NoNodeException,
- kazoo.zkclient.NoAuthException,
- kazoo.zkclient.NodeExistsException,
- kazoo.zkclient.NoChildrenForEphemeralsException,
- kazoo.zkclient.NotEmptyException,
- kazoo.zkclient.SessionExpiredException,
- kazoo.zkclient.InvalidCallbackException)
+ connection_errors = (StdConnectionError, ) + KZ_CONNECTION_ERRORS
+ channel_errors = (StdChannelError, socket.error) + KZ_CHANNEL_ERRORS
driver_type = 'zookeeper'
driver_name = 'kazoo'
+ def __init__(self, *args, **kwargs):
+ if kazoo is None:
+ raise ImportError('The kazoo library is not installed')
+
+ super(Transport, self).__init__(*args, **kwargs)
+
def driver_version(self):
return kazoo.__version__
diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py
index e18cc269..70f75573 100644
--- a/kombu/utils/__init__.py
+++ b/kombu/utils/__init__.py
@@ -4,9 +4,6 @@ kombu.utils
Internal utilities.
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import, print_function
@@ -221,17 +218,17 @@ def retry_over_time(fun, catch, args=[], kwargs={}, errback=None,
for retries in count():
try:
return fun(*args, **kwargs)
- except catch as exc:
+ except catch, exc:
if max_retries is not None and retries > max_retries:
raise
if callback:
callback()
tts = errback(exc, interval_range, retries) if errback else None
if tts:
- for i in fxrange(stop=tts):
- if i and callback:
+ for i in range(int(tts / interval_step)):
+ if callback:
callback()
- sleep(i)
+ sleep(interval_step)
def emergency_dump_state(state, open_file=open, dump=None):
diff --git a/kombu/utils/amq_manager.py b/kombu/utils/amq_manager.py
index 0bb9ce4c..62a7a95a 100644
--- a/kombu/utils/amq_manager.py
+++ b/kombu/utils/amq_manager.py
@@ -6,10 +6,10 @@ def get_manager(client, hostname=None, port=None, userid=None,
import pyrabbit
opt = client.transport_options.get
host = (hostname if hostname is not None
- else opt('manager_hostname', client.hostname))
+ else opt('manager_hostname', client.hostname or 'localhost'))
port = port if port is not None else opt('manager_port', 55672)
return pyrabbit.Client('%s:%s' % (host, port),
userid if userid is not None
- else opt('manager_userid', client.userid),
+ else opt('manager_userid', client.userid or 'guest'),
password if password is not None
- else opt('manager_password', client.password))
+ else opt('manager_password', client.password or 'guest'))
diff --git a/kombu/utils/compat.py b/kombu/utils/compat.py
index 2167f03c..a2f2e984 100644
--- a/kombu/utils/compat.py
+++ b/kombu/utils/compat.py
@@ -4,9 +4,6 @@ kombu.utils.compat
Helps compatibility with older Python versions.
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
diff --git a/kombu/utils/debug.py b/kombu/utils/debug.py
index 85793c21..aeea4e4f 100644
--- a/kombu/utils/debug.py
+++ b/kombu/utils/debug.py
@@ -4,9 +4,6 @@ kombu.utils.debug
Debugging support.
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
diff --git a/kombu/utils/encoding.py b/kombu/utils/encoding.py
index 83acc1a4..64eb5a43 100644
--- a/kombu/utils/encoding.py
+++ b/kombu/utils/encoding.py
@@ -7,9 +7,6 @@ Utilities to encode text, and to safely emit text from running
applications without crashing with the infamous :exc:`UnicodeDecodeError`
exception.
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
diff --git a/kombu/utils/eventio.py b/kombu/utils/eventio.py
index 5e276a3d..9b5458fa 100644
--- a/kombu/utils/eventio.py
+++ b/kombu/utils/eventio.py
@@ -4,9 +4,6 @@ kombu.utils.eventio
Evented IO support for multiple platforms.
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
@@ -73,7 +70,7 @@ class Poller(object):
def poll(self, timeout):
try:
return self._poll(timeout)
- except Exception as exc:
+ except Exception, exc:
if get_errno(exc) != errno.EINTR:
raise
@@ -86,7 +83,7 @@ class _epoll(Poller):
def register(self, fd, events):
try:
self._epoll.register(fd, events)
- except Exception as exc:
+ except Exception, exc:
if get_errno(exc) != errno.EEXIST:
raise
@@ -97,7 +94,7 @@ class _epoll(Poller):
pass
except ValueError:
pass
- except IOError as exc:
+ except IOError, exc:
if get_errno(exc) != errno.ENOENT:
raise
diff --git a/kombu/utils/limits.py b/kombu/utils/limits.py
index c488e083..2fe8e2e5 100644
--- a/kombu/utils/limits.py
+++ b/kombu/utils/limits.py
@@ -4,9 +4,6 @@ kombu.utils.limits
Token bucket implementation for rate limiting.
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
"""
from __future__ import absolute_import
diff --git a/requirements/funtest.txt b/requirements/funtest.txt
index 8fdd9bef..6ac859b7 100644
--- a/requirements/funtest.txt
+++ b/requirements/funtest.txt
@@ -7,9 +7,6 @@ pymongo
# CouchDB transport
couchdb
-# Pika transport
-pika==0.5.2
-
# Beanstalk transport
beanstalkc
diff --git a/setup.cfg b/setup.cfg
index f73cd556..4abe2003 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -6,15 +6,17 @@ cover3-branch = 1
cover3-html = 1
cover3-package = kombu
cover3-exclude = kombu
+ kombu.transport.mongodb
+ kombu.transport.filesystem
kombu.utils.compat
kombu.utils.eventio
kombu.utils.finalize
- kombu.transport.pika
kombu.transport.couchdb
- kombu.transport.mongodb
kombu.transport.beanstalk
+ kombu.transport.sqlalchemy
kombu.transport.SQS
kombu.transport.zookeeper
+ kombu.transport.zmq
[build_sphinx]
source-dir = docs/
diff --git a/tox.ini b/tox.ini
index 155b50c2..68be6956 100644
--- a/tox.ini
+++ b/tox.ini
@@ -42,8 +42,6 @@ commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
--cover3-xml-file={toxinidir}/coverage.xml \
--cover3-package=kombu \
--cover3-exclude="kombu kombu.utils.* \
- kombu.transport.pypika \
- kombu.transport.pycouchdb \
kombu.transport.mongodb \
kombu.transport.beanstalk \
kombu.transport.zookeeper" \