diff options
86 files changed, 1241 insertions, 976 deletions
@@ -46,6 +46,7 @@ Mher Movsisyan <mher.movsisyan@gmail.com> Michael Barrett <mb@eventbrite.com> Nitzan Miron <bug.assembla@bugbug.me> Noah Kantrowitz <noah@coderanger.net> +Ollie Walsh <ollie.walsh@geemail.kom> Pascal Hartig <phartig@rdrei.net> Patrick Schneider <patrick.p2k.schneider@gmail.com> Paul McLanahan <paul@mclanahan.net> @@ -8,7 +8,7 @@ 2.5.0 ===== -:release-date: TBA +:release-date: 2012-11-27 04:00 P.M UTC - `py-amqp`_ is now the new default transport, replacing ``amqplib``. @@ -27,7 +27,7 @@ be compiled. librabbitmq will be updated to support all the same features as py-amqp. -- Support for multiple connection URL's used for failover. +- Support for using multiple connection URL's for failover. The first argument to :class:`~kombu.Connection` can now be a list of connection URLs: @@ -62,7 +62,7 @@ - Now supports PyDev, PyCharm, pylint and other static code analysis tools. -- :class:`~kombu.entity.Queue` now supports multiple bindings. +- :class:`~kombu.Queue` now supports multiple bindings. You can now have multiple bindings in the same queue by having the second argument be a list: @@ -79,8 +79,8 @@ To enable this, helper methods have been added: - - :meth:`~kombu.entity.Queue.bind_to` - - :meth:`~kombu.entity.Queue.unbind_from` + - :meth:`~kombu.Queue.bind_to` + - :meth:`~kombu.Queue.unbind_from` Contributed by Rumyana Neykova. @@ -95,7 +95,7 @@ the :mod:`kombu.common` module. - Consumer now supports a ``on_message`` callback that can be used to process - raw undecoded messages. + raw messages (not decoded). Other callbacks specified using the ``callbacks`` argument, and the ``receive`` method will be not be called when a on message callback @@ -108,7 +108,7 @@ - Support for exchange-to-exchange bindings. - The :class:`~kombu.entity.Exchange` entity gained ``bind_to`` + The :class:`~kombu.Exchange` entity gained ``bind_to`` and ``unbind_from`` methods: .. code-block:: python @@ -123,32 +123,43 @@ Contributed by Rumyana Neykova. -- Redis: Unacked message restore limit is now unlimited by default. +.. _version-2.4.10: - Also, the limit can now be configured using the ``unacked_restore_limit`` - transport option: +2.4.10 +====== +:release-date: 2012-11-22 06:00 P.M UTC - .. code-block:: python +- The previous versions connection pool changes broke Redis support so that + it would always connect to localhost (default setting) no matter what + connection parameters were provided (Issue #176). - Connection('redis://', transport_options={ - 'unacked_restore_limit': 100, - }) +.. _version-2.4.9: - A limit of 100 means that the consumer will restore at most 100 - messages at each pass. +2.4.9 +===== +:release-date: 2012-11-21 03:00 P.M UTC -- Redis: Now uses a mutex to ensure only one consumer restores messages at a - time. +- Redis: Fixed race condition that could occur while trying to restore + messages (Issue #171). - The mutex expires after 5 minutes by default, but can be configured - using the ``unacked_mutex_expire`` transport option. + Fix contributed by Ollie Walsh. - Redis: Each channel is now using a specific connection pool instance, which is disconnected on connection failure. -- amqplib: Fixed bug with timeouts when SSL is used in non-blocking mode. +- ProducerPool: Fixed possible dead-lock in the acquire method. - Fix contributed by Mher Movsisyan +- ProducerPool: ``force_close_all`` no longer tries to call the non-existent + ``Producer._close``. + +- librabbitmq: Now implements ``transport.verify_connection`` so that + connection pools will not give back connections that are no longer working. + +- New and better ``repr()`` for Queue and Exchange objects. + +- Python3: Fixed problem with running the unit test suite. + +- Python3: Fixed problem with JSON codec. .. _version-2.4.8: @@ -156,14 +167,29 @@ ===== :release-date: 2012-11-02 05:00 P.M UTC -- Redis: Fair queue cyle implementation improved (Issue #166). +- Redis: Improved fair queue cycle implementation (Issue #166). Contributed by Kevin McCarthy. -- Redis: Number of messages to restore in one iteration is now unlimited, - but can be configured using the unacked_restore_limit transport option. +- Redis: Unacked message restore limit is now unlimited by default. -- Redis: A Redis based mutex is now used while restoring messages. + Also, the limit can now be configured using the ``unacked_restore_limit`` + transport option: + + .. code-block:: python + + Connection('redis://', transport_options={ + 'unacked_restore_limit': 100, + }) + + A limit of 100 means that the consumer will restore at most 100 + messages at each pass. + +- Redis: Now uses a mutex to ensure only one consumer restores messages at a + time. + + The mutex expires after 5 minutes by default, but can be configured + using the ``unacked_mutex_expire`` transport option. - LamportClock.adjust now returns the new clock value. @@ -177,6 +203,10 @@ Fix contributed by Jasper Bryant-Greene +- amqplib: Fixed bug with timeouts when SSL is used in non-blocking mode. + + Fix contributed by Mher Movsisyan + .. _version-2.4.7: @@ -627,6 +657,8 @@ News Contributed by Mahendra M. +.. _`Apache ZooKeeper`: http://zookeeper.apache.org/ + - Redis: Priority support. The message's ``priority`` field is now respected by the Redis @@ -1,3 +1,4 @@ +Copyright (c) 2012 VMware, Inc. All rights reserved. Copyright (c) 2009-2012, Ask Solem & contributors. All rights reserved. @@ -2,7 +2,7 @@ kombu - Messaging Framework for Python ======================================== -:Version: 2.5.0rc1 +:Version: 2.5.0 `Kombu` is a messaging framework for Python. @@ -68,7 +68,6 @@ and the `Wikipedia article about AMQP`_. .. _`Beanstalk`: http://kr.github.com/beanstalkd/ .. _`Rabbits and warrens`: http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/ .. _`amqplib`: http://barryp.org/software/py-amqplib/ -.. _`pika`: http://github.com/pika/pika .. _`Wikipedia article about AMQP`: http://en.wikipedia.org/wiki/AMQP .. _`carrot`: http://pypi.python.org/pypi/carrot/ .. _`librabbitmq`: http://pypi.python.org/pypi/librabbitmq @@ -90,8 +89,6 @@ Transport Comparison +---------------+----------+------------+------------+---------------+ | *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ | +---------------+----------+------------+------------+---------------+ -| *pika* | Native | Yes | Yes | Yes | -+---------------+----------+------------+------------+---------------+ | *couchdb* | Virtual | Yes | Yes [#f1]_ | No | +---------------+----------+------------+------------+---------------+ | *zookeeper* | Virtual | Yes | Yes [#f1]_ | No | diff --git a/docs/conf.py b/docs/conf.py index af7be574..27534746 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -46,7 +46,7 @@ exclude_trees = ['.build'] add_function_parentheses = True # The name of the Pygments (syntax highlighting) style to use. -pygments_style = 'trac' +pygments_style = 'colorful' # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, diff --git a/docs/reference/index.rst b/docs/reference/index.rst index 1cdb4bc2..c09075a0 100644 --- a/docs/reference/index.rst +++ b/docs/reference/index.rst @@ -25,9 +25,6 @@ kombu.transport kombu.transport.pyamqp kombu.transport.librabbitmq - kombu.transport.pika - kombu.transport.pika2 - kombu.transport.amqplib kombu.transport.memory kombu.transport.redis kombu.transport.zmq @@ -43,15 +40,18 @@ kombu.transport.sqlalchemy kombu.transport.sqlalchemy.models kombu.transport.SQS + kombu.transport.amqplib kombu.transport.base kombu.transport.virtual kombu.transport.virtual.exchange kombu.transport.virtual.scheduling kombu.serialization kombu.utils + kombu.utils.eventio kombu.utils.limits kombu.utils.compat kombu.utils.debug kombu.utils.encoding kombu.utils.functional kombu.utils.url + kombu.utils.amq_manager diff --git a/docs/reference/kombu.rst b/docs/reference/kombu.rst index 71aa62ec..46fceeaf 100644 --- a/docs/reference/kombu.rst +++ b/docs/reference/kombu.rst @@ -1,11 +1,11 @@ .. currentmodule:: kombu -.. automodule:: kombu +.. contents:: + :local: - .. contents:: - :local: +.. automodule:: kombu - Connection + Connection ---------- .. autoclass:: Connection @@ -73,8 +73,6 @@ .. automethod:: SimpleQueue .. automethod:: SimpleBuffer - - Exchange -------- @@ -110,9 +108,9 @@ Example creating a queue using our exchange in the :class:`Exchange` example:: - >>> science_news = Queue("science_news", + >>> science_news = Queue('science_news', ... exchange=news_exchange, - ... routing_key="news.science") + ... routing_key='news.science') For now `science_news` is just a declaration, you can't perform actions on it. It just describes the name and options for the queue. @@ -124,7 +122,9 @@ >>> bound_science_news = science_news(channel) - Now you can perform operations like :meth:`declare` or :meth:`purge`:: + Now you can perform operations like :meth:`declare` or :meth:`purge`: + + .. code-block:: python >>> bound_sicence_news.declare() >>> bound_science_news.purge() diff --git a/docs/reference/kombu.simple.rst b/docs/reference/kombu.simple.rst index bee729df..b43e2ec8 100644 --- a/docs/reference/kombu.simple.rst +++ b/docs/reference/kombu.simple.rst @@ -28,7 +28,7 @@ .. attribute:: queue - :class:`~kombu.entity.Queue` to consume from (if consuming). + :class:`~kombu.Queue` to consume from (if consuming). .. attribute:: queue_opts @@ -69,7 +69,7 @@ .. attribute:: queue - :class:`~kombu.entity.Queue` to consume from (if consuming). + :class:`~kombu.Queue` to consume from (if consuming). .. attribute:: queue_opts diff --git a/docs/reference/kombu.transport.pika.rst b/docs/reference/kombu.transport.pika.rst deleted file mode 100644 index 783af0d6..00000000 --- a/docs/reference/kombu.transport.pika.rst +++ /dev/null @@ -1,46 +0,0 @@ -======================================= - kombu.transport.pika -======================================= - -.. currentmodule:: kombu.transport.pika - -.. automodule:: kombu.transport.pika - - .. contents:: - :local: - - Transports - ---------- - - .. autoclass:: AsyncoreTransport - :members: - :undoc-members: - - .. autoclass:: SyncTransport - :members: - :undoc-members: - - Connections - ----------- - - .. autoclass:: AsyncoreConnection - :members: - :undoc-members: - - .. autoclass:: BlockingConnection - :members: - :undoc-members: - - Channel - ------- - - .. autoclass:: Channel - :members: - :undoc-members: - - Message - ------- - - .. autoclass:: Message - :members: - :undoc-members: diff --git a/docs/reference/kombu.transport.pika2.rst b/docs/reference/kombu.transport.pika2.rst deleted file mode 100644 index b06dd0a7..00000000 --- a/docs/reference/kombu.transport.pika2.rst +++ /dev/null @@ -1,34 +0,0 @@ -.. currentmodule:: kombu.transport.pika2 - -.. automodule:: kombu.transport.pika2 - - .. contents:: - :local: - - Transport - --------- - - .. autoclass:: Transport - :members: - :undoc-members: - - Connection - ---------- - - .. autoclass:: Connection - :members: - :undoc-members: - - Channel - ------- - - .. autoclass:: Channel - :members: - :undoc-members: - - Message - ------- - - .. autoclass:: Message - :members: - :undoc-members: diff --git a/docs/reference/kombu.transport.zmq.rst b/docs/reference/kombu.transport.zmq.rst index 0735840b..08d0ea5e 100644 --- a/docs/reference/kombu.transport.zmq.rst +++ b/docs/reference/kombu.transport.zmq.rst @@ -1,3 +1,7 @@ +===================== + kombu.transport.zmq +===================== + .. currentmodule:: kombu.transport.zmq .. automodule:: kombu.transport.zmq diff --git a/docs/reference/kombu.transport.zookeeper.rst b/docs/reference/kombu.transport.zookeeper.rst index 976ad6af..af900a35 100644 --- a/docs/reference/kombu.transport.zookeeper.rst +++ b/docs/reference/kombu.transport.zookeeper.rst @@ -1,3 +1,7 @@ +=========================== + kombu.transport.zookeeper +=========================== + .. currentmodule:: kombu.transport.zookeeper .. automodule:: kombu.transport.zookeeper diff --git a/docs/reference/kombu.utils.eventio.rst b/docs/reference/kombu.utils.eventio.rst new file mode 100644 index 00000000..16c40f33 --- /dev/null +++ b/docs/reference/kombu.utils.eventio.rst @@ -0,0 +1,11 @@ +========================================================== + Evented I/O - kombu.utils.eventio +========================================================== + +.. contents:: + :local: +.. currentmodule:: kombu.utils.eventio + +.. automodule:: kombu.utils.eventio + :members: + :undoc-members: diff --git a/docs/userguide/connections.rst b/docs/userguide/connections.rst index d7275ea4..aa88add8 100644 --- a/docs/userguide/connections.rst +++ b/docs/userguide/connections.rst @@ -116,10 +116,7 @@ keyword arguments, these are: :ssl: Use SSL to connect to the server. Default is ``False``. Only supported by the amqp transport. :insist: Insist on connecting to a server. - In a configuration with multiple load-sharing servers, the insist - option tells the server that the client is insisting on a connection - to the specified server. Default is ``False``. - Only supported by the amqp and pika transports, and not by AMQP 0-9-1. + *No longer supported, relic from AMQP 0.8* :connect_timeout: Timeout in seconds for connecting to the server. May not be supported by the specified transport. :transport_options: A dict of additional connection arguments to @@ -142,8 +139,6 @@ Transport Comparison +---------------+----------+------------+------------+---------------+ | *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ | +---------------+----------+------------+------------+---------------+ -| *pika* | Native | Yes | Yes | Yes | -+---------------+----------+------------+------------+---------------+ | *couchdb* | Virtual | Yes | Yes [#f1]_ | No | +---------------+----------+------------+------------+---------------+ | *zookeeper* | Virtual | Yes | Yes [#f1]_ | No | diff --git a/docs/userguide/consumers.rst b/docs/userguide/consumers.rst index 74ab79cf..5e9d3d61 100644 --- a/docs/userguide/consumers.rst +++ b/docs/userguide/consumers.rst @@ -1,4 +1,3 @@ -.. currentmodule:: kombu.messaging .. _guide-consumers: =========== @@ -62,7 +61,7 @@ and with multiple channels again: .. code-block:: python - from kombu.messaging import Consumer + from kombu import Consumer from kombu.mixins import ConsumerMixin class C(ConsumerMixin): @@ -88,8 +87,6 @@ and with multiple channels again: Reference ========= -.. module:: kombu.messaging - -.. autoclass:: Consumer +.. autoclass:: kombu.Consumer :noindex: :members: diff --git a/docs/userguide/producers.rst b/docs/userguide/producers.rst index c6e80625..454b4ca3 100644 --- a/docs/userguide/producers.rst +++ b/docs/userguide/producers.rst @@ -1,4 +1,3 @@ -.. currentmodule:: kombu.messaging .. _guide-producers: =========== @@ -20,8 +19,6 @@ See :ref:`guide-serialization`. Reference ========= -.. module:: kombu.messaging - -.. autoclass:: Producer +.. autoclass:: kombu.Producer :noindex: :members: diff --git a/docs/userguide/serialization.rst b/docs/userguide/serialization.rst index b3648d14..3f236861 100644 --- a/docs/userguide/serialization.rst +++ b/docs/userguide/serialization.rst @@ -138,24 +138,24 @@ supported by Kombu. .. admonition:: Buffer Objects -The decoder function of custom serializer must support both strings -and Python's old-style buffer objects. + The decoder function of custom serializer must support both strings + and Python's old-style buffer objects. -Python pickle and json modules usually don't do this via its ``loads`` -function, but you can easily add support by making a wrapper around the -``load`` function that takes file objects instead of strings. + Python pickle and json modules usually don't do this via its ``loads`` + function, but you can easily add support by making a wrapper around the + ``load`` function that takes file objects instead of strings. -Here's an example wrapping :func:`pickle.loads` in such a way: + Here's an example wrapping :func:`pickle.loads` in such a way: -.. code-block:: python + .. code-block:: python - import pickle - from kombu.serialization import BytesIO, register + import pickle + from kombu.serialization import BytesIO, register - def loads(s): - return pickle.load(BytesIO(s)) + def loads(s): + return pickle.load(BytesIO(s)) - register('my_pickle', loads, pickle.dumps, - content_type='application/x-pickle2', - content_encoding='binary') + register('my_pickle', loads, pickle.dumps, + content_type='application/x-pickle2', + content_encoding='binary') diff --git a/docs/userguide/simple.rst b/docs/userguide/simple.rst index 9239f118..69caaf2d 100644 --- a/docs/userguide/simple.rst +++ b/docs/userguide/simple.rst @@ -16,7 +16,7 @@ messaging needs. Instead of defining exchanges and queues, the simple classes only requires two arguments, a connection channel and a name. The name is used as the queue, exchange and routing key. If the need arises, you can specify -a :class:`~kombu.entity.Queue` as the name argument instead. +a :class:`~kombu.Queue` as the name argument instead. In addition, the :class:`~kombu.Connection` comes with shortcuts to create simple queues using the current connection:: diff --git a/examples/simple_receive.py b/examples/simple_receive.py index e15427e3..906c2742 100644 --- a/examples/simple_receive.py +++ b/examples/simple_receive.py @@ -1,6 +1,7 @@ """ Example receiving a message using the SimpleQueue interface. """ + from kombu import Connection #: Create connection diff --git a/examples/simple_task_queue/worker.py b/examples/simple_task_queue/worker.py index 807542e9..d9fb691c 100644 --- a/examples/simple_task_queue/worker.py +++ b/examples/simple_task_queue/worker.py @@ -23,7 +23,7 @@ class Worker(ConsumerMixin): logger.info('Got task: %s', reprcall(fun.__name__, args, kwargs)) try: fun(*args, **kwdict(kwargs)) - except Exception as exc: + except Exception, exc: logger.error('task raised exception: %r', exc) message.ack() diff --git a/extra/release/bump_version.py b/extra/release/bump_version.py index d7b9c6fd..abda578e 100755 --- a/extra/release/bump_version.py +++ b/extra/release/bump_version.py @@ -1,5 +1,4 @@ #!/usr/bin/env python - from __future__ import absolute_import import errno @@ -22,7 +21,7 @@ def cmd(*args): def no_enoent(): try: yield - except OSError as exc: + except OSError, exc: if exc.errno != errno.ENOENT: raise diff --git a/extra/release/doc4allmods b/extra/release/doc4allmods index 4651dcd4..f95fee2f 100755 --- a/extra/release/doc4allmods +++ b/extra/release/doc4allmods @@ -2,7 +2,8 @@ PACKAGE="$1" SKIP_PACKAGES="$PACKAGE tests management urls" -SKIP_FILES="kombu.utils.eventio.rst +SKIP_FILES="kombu.entity.rst + kombu.messaging.rst kombu.transport.django.migrations.rst kombu.transport.django.migrations.0001_initial.rst kombu.transport.django.management.rst diff --git a/extra/release/flakeplus.py b/extra/release/flakeplus.py new file mode 100755 index 00000000..6fe1f1fc --- /dev/null +++ b/extra/release/flakeplus.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python +from __future__ import absolute_import +from __future__ import with_statement + +import os +import re +import sys + +from collections import defaultdict +from unipath import Path + +RE_COMMENT = r'^\s*\#' +RE_NOQA = r'.+?\#\s+noqa+' +RE_MULTILINE_COMMENT_O = r'^\s*(?:\'\'\'|""").+?(?:\'\'\'|""")' +RE_MULTILINE_COMMENT_S = r'^\s*(?:\'\'\'|""")' +RE_MULTILINE_COMMENT_E = r'(?:^|.+?)(?:\'\'\'|""")' +RE_WITH = r'(?:^|\s+)with\s+' +RE_WITH_IMPORT = r'''from\s+ __future__\s+ import\s+ with_statement''' +RE_PRINT = r'''(?:^|\s+)print\((?:"|')(?:\W+?)?[A-Z0-9:]{2,}''' +RE_ABS_IMPORT = r'''from\s+ __future__\s+ import\s+ absolute_import''' + +acc = defaultdict(lambda: {"abs": False, "print": False}) + + +def compile(regex): + return re.compile(regex, re.VERBOSE) + + +class FlakePP(object): + re_comment = compile(RE_COMMENT) + re_ml_comment_o = compile(RE_MULTILINE_COMMENT_O) + re_ml_comment_s = compile(RE_MULTILINE_COMMENT_S) + re_ml_comment_e = compile(RE_MULTILINE_COMMENT_E) + re_abs_import = compile(RE_ABS_IMPORT) + re_print = compile(RE_PRINT) + re_with_import = compile(RE_WITH_IMPORT) + re_with = compile(RE_WITH) + re_noqa = compile(RE_NOQA) + map = {"abs": True, "print": False, + "with": False, "with-used": False} + + def __init__(self, verbose=False): + self.verbose = verbose + self.steps = (("abs", self.re_abs_import), + ("with", self.re_with_import), + ("with-used", self.re_with), + ("print", self.re_print)) + + def analyze_fh(self, fh): + steps = self.steps + filename = fh.name + acc = dict(self.map) + index = 0 + errors = [0] + + def error(fmt, **kwargs): + errors[0] += 1 + self.announce(fmt, **dict(kwargs, filename=filename)) + + for index, line in enumerate(self.strip_comments(fh)): + for key, pattern in steps: + if pattern.match(line): + acc[key] = True + if index: + if not acc["abs"]: + error("%(filename)s: missing abs import") + if acc["with-used"] and not acc["with"]: + error("%(filename)s: missing with import") + if acc["print"]: + error("%(filename)s: left over print statement") + + return filename, errors[0], acc + + def analyze_file(self, filename): + with open(filename) as fh: + return self.analyze_fh(fh) + + def analyze_tree(self, dir): + for dirpath, _, filenames in os.walk(dir): + for path in (Path(dirpath, f) for f in filenames): + if path.endswith(".py"): + yield self.analyze_file(path) + + def analyze(self, *paths): + for path in map(Path, paths): + if path.isdir(): + for res in self.analyze_tree(path): + yield res + else: + yield self.analyze_file(path) + + def strip_comments(self, fh): + re_comment = self.re_comment + re_ml_comment_o = self.re_ml_comment_o + re_ml_comment_s = self.re_ml_comment_s + re_ml_comment_e = self.re_ml_comment_e + re_noqa = self.re_noqa + in_ml = False + + for line in fh.readlines(): + if in_ml: + if re_ml_comment_e.match(line): + in_ml = False + else: + if re_noqa.match(line) or re_ml_comment_o.match(line): + pass + elif re_ml_comment_s.match(line): + in_ml = True + elif re_comment.match(line): + pass + else: + yield line + + def announce(self, fmt, **kwargs): + sys.stderr.write((fmt + "\n") % kwargs) + + +def main(argv=sys.argv, exitcode=0): + for _, errors, _ in FlakePP(verbose=True).analyze(*argv[1:]): + if errors: + exitcode = 1 + return exitcode + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/funtests/setup.py b/funtests/setup.py index b024edff..fdb7a98c 100644 --- a/funtests/setup.py +++ b/funtests/setup.py @@ -53,7 +53,6 @@ setup( "pymongo", "couchdb", "kazoo", - "pika", "beanstalkc", "kombu-sqlalchemy", "django", diff --git a/funtests/tests/test_pika.py b/funtests/tests/test_pika.py deleted file mode 100644 index 4dd357b6..00000000 --- a/funtests/tests/test_pika.py +++ /dev/null @@ -1,38 +0,0 @@ -from funtests import transport -from nose import SkipTest - -from kombu.exceptions import VersionMismatch - - -class test_pika_blocking(transport.TransportCase): - transport = "syncpika" - prefix = "syncpika" - - def before_connect(self): - try: - from kombu.transport import pika - except VersionMismatch: - raise SkipTest("Pika version mismatch") - - def test_produce__consume_large_messages(self, *args, **kwargs): - raise SkipTest("test currently fails for sync pika") - - def test_cyclic_reference_channel(self, *args, **kwargs): - raise SkipTest("known memory leak") - - -class test_pika_async(transport.TransportCase): - transport = "pika" - prefix = "pika" - - def before_connect(self): - try: - from kombu.transport import pika - except VersionMismatch: - raise SkipTest("Pika version mismatch") - - def test_produce__consume_large_messages(self, *args, **kwargs): - raise SkipTest("test currently fails for async pika") - - def test_cyclic_reference_channel(self, *args, **kwargs): - raise SkipTest("known memory leak") diff --git a/funtests/transport.py b/funtests/transport.py index 4df10738..5924a72a 100644 --- a/funtests/transport.py +++ b/funtests/transport.py @@ -79,7 +79,7 @@ class TransportCase(unittest.TestCase): if self.transport: try: self.before_connect() - except SkipTest as exc: + except SkipTest, exc: self.skip_test_reason = str(exc) else: self.do_connect() diff --git a/kombu/__init__.py b/kombu/__init__.py index ed9ca1e8..0a482802 100644 --- a/kombu/__init__.py +++ b/kombu/__init__.py @@ -1,7 +1,7 @@ """Messaging Framework for Python""" from __future__ import absolute_import -VERSION = (2, 5, 0, 'rc1') +VERSION = (2, 5, 0) __version__ = '.'.join(map(str, VERSION[0:3])) + ''.join(VERSION[3:]) __author__ = 'Ask Solem' __contact__ = 'ask@celeryproject.org' @@ -23,7 +23,7 @@ if sys.version_info < (2, 5): # pragma: no cover STATICA_HACK = True globals()['kcah_acitats'[::-1].upper()] = False -if STATICA_HACK: +if STATICA_HACK: # pragma: no cover # This is never executed, but tricks static analyzers (PyDev, PyCharm, # pylint, etc.) into knowing the types of these symbols, and what # they contain. @@ -74,7 +74,7 @@ class module(ModuleType): # 2.5 does not define __package__ try: package = __package__ -except NameError: +except NameError: # pragma: no cover package = 'kombu' # keep a reference to this module so that it's not garbage collected @@ -94,7 +94,7 @@ new_module.__dict__.update({ '__package__': package, 'VERSION': VERSION}) -if os.environ.get('KOMBU_LOG_DEBUG'): +if os.environ.get('KOMBU_LOG_DEBUG'): # pragma: no cover os.environ.update(KOMBU_LOG_CHANNEL='1', KOMBU_LOG_CONNECTION='1') from .utils import debug debug.setup_logging() diff --git a/kombu/abstract.py b/kombu/abstract.py index f0968c1a..382505dd 100644 --- a/kombu/abstract.py +++ b/kombu/abstract.py @@ -4,9 +4,6 @@ kombu.compression Object utilities. -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import @@ -14,7 +11,6 @@ from copy import copy from .connection import maybe_channel from .exceptions import NotBoundError -from .five import items from .utils import ChannelPromise __all__ = ['Object', 'MaybeChannelBound'] @@ -41,11 +37,6 @@ class Object(object): except AttributeError: setattr(self, name, None) - def setdefault(self, **defaults): - for key, value in items(defaults): - if getattr(self, key) is None: - setattr(self, key, value) - def as_dict(self, recurse=False): def f(obj): if recurse and isinstance(obj, Object): @@ -101,7 +92,8 @@ class MaybeChannelBound(Object): def __repr__(self, item=''): item = item or type(self).__name__ if self.is_bound: - return '<bound {0} of {1}>'.format(item, self.channel) + return '<{0} bound to chan:{1}>'.format( + item or type(self).__name__, self.channel.channel_id) return '<unbound {0}>'.format(item) @property diff --git a/kombu/clocks.py b/kombu/clocks.py index 1afff225..8d85bf21 100644 --- a/kombu/clocks.py +++ b/kombu/clocks.py @@ -4,9 +4,6 @@ kombu.clocks Logical Clocks and Synchronization. -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import @@ -71,9 +68,6 @@ class LamportClock(object): self.value += 1 return self.value - def sort(self, d): - return d[sorted(d)[0]] - def sort_heap(self, h): """List of tuples containing at least two elements, representing an event, where the first element is the event's scalar clock value, @@ -89,13 +83,13 @@ class LamportClock(object): """ if h[0][0] == h[1][0]: same = [] - for i, PN in zip(h, islice(h, 1, None)): + for PN in zip(h, islice(h, 1, None)): if PN[0][0] != PN[1][0]: break # Prev and Next's clocks differ same.append(PN[0]) # return first item sorted by process id return sorted(same, key=lambda event: event[1])[0] - # all clock values unique, return first item + # clock values unique, return first item return h[0] def __str__(self): diff --git a/kombu/common.py b/kombu/common.py index 927f4829..c6f9a6e0 100644 --- a/kombu/common.py +++ b/kombu/common.py @@ -4,9 +4,6 @@ kombu.common Common Utilities. -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import @@ -67,7 +64,7 @@ class Broadcast(Queue): :keyword queue: By default a unique id is used for the queue name for every consumer. You can specify a custom queue name here. - :keyword \*\*kwargs: See :class:`~kombu.entity.Queue` for a list + :keyword \*\*kwargs: See :class:`~kombu.Queue` for a list of additional keyword arguments supported. """ @@ -169,9 +166,9 @@ def eventloop(conn, limit=None, timeout=None, ignore_timeouts=False): try: yield conn.drain_events(timeout=timeout) except socket.timeout: - if timeout and not ignore_timeouts: + if timeout and not ignore_timeouts: # pragma: no cover raise - except socket.error: + except socket.error: # pragma: no cover pass diff --git a/kombu/compat.py b/kombu/compat.py index bdd876b9..7a588880 100644 --- a/kombu/compat.py +++ b/kombu/compat.py @@ -6,9 +6,6 @@ Carrot compatible interface for :class:`Publisher` and :class:`Producer`. See http://packages.python.org/pypi/carrot for documentation. -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import diff --git a/kombu/compression.py b/kombu/compression.py index c76adfc8..34daa7a2 100644 --- a/kombu/compression.py +++ b/kombu/compression.py @@ -4,9 +4,6 @@ kombu.compression Compression utilities. -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import diff --git a/kombu/connection.py b/kombu/connection.py index e23be740..c4c11ac2 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -4,9 +4,6 @@ kombu.connection Broker connection and pools. -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import @@ -146,9 +143,9 @@ class Connection(object): failover_strategy = 'round-robin' #: Heartbeat value, currently only supported by the py-amqp transport. - hertbeat = None + heartbeat = None - userid = password = ssl = login_method = None + hostname = userid = password = ssl = login_method = None def __init__(self, hostname='localhost', userid=None, password=None, virtual_host=None, port=None, insist=False, @@ -293,7 +290,7 @@ class Connection(object): except socket.timeout: self.more_to_read = False return False - except socket.error as exc: + except socket.error, exc: if exc.errno in (errno.EAGAIN, errno.EINTR): self.more_to_read = False return False @@ -383,8 +380,8 @@ class Connection(object): self.maybe_close_channel(self._default_channel) self._default_channel = None - def _default_ensure_callback(exc, interval): - logger.error("Ensure: Couldn't send message: %r. Retry in %ss", + def _default_ensure_callback(self, exc, interval): + logger.error("Ensure: Operation error: %r. Retry in %ss", exc, interval, exc_info=True) def ensure(self, obj, fun, errback=None, max_retries=None, @@ -427,7 +424,7 @@ class Connection(object): for retries in count(0): # for infinity try: return fun(*args, **kwargs) - except self.recoverable_connection_errors as exc: + except self.recoverable_connection_errors, exc: if got_connection: raise if max_retries is not None and retries > max_retries: @@ -450,7 +447,7 @@ class Connection(object): if on_revive: on_revive(new_channel) got_connection += 1 - except self.recoverable_channel_errors as exc: + except self.recoverable_channel_errors, exc: if max_retries is not None and retries > max_retries: raise self._debug('ensure channel error: %r', exc, exc_info=1) @@ -651,14 +648,14 @@ class Connection(object): created using that name as the name of the queue and exchange, also it will be used as the default routing key. - :param name: Name of the queue/or a :class:`~kombu.entity.Queue`. + :param name: Name of the queue/or a :class:`~kombu.Queue`. :keyword no_ack: Disable acknowledgements. Default is false. :keyword queue_opts: Additional keyword arguments passed to the constructor of the automatically created - :class:`~kombu.entity.Queue`. + :class:`~kombu.Queue`. :keyword exchange_opts: Additional keyword arguments passed to the constructor of the automatically created - :class:`~kombu.entity.Exchange`. + :class:`~kombu.Exchange`. :keyword channel: Channel to use. If not specified a new channel from the current connection will be used. Remember to call :meth:`~kombu.simple.SimpleQueue.close` when done with the @@ -856,7 +853,11 @@ class Resource(object): except Empty: self._add_when_empty() else: - R = self.prepare(R) + try: + R = self.prepare(R) + except BaseException: + self.release(R) + raise self._dirty.add(R) break else: @@ -932,7 +933,7 @@ class Resource(object): except AttributeError: pass # Issue #78 finally: - if mutex: + if mutex: # pragma: no cover mutex.release() if os.environ.get('KOMBU_DEBUG_POOL'): # pragma: no cover @@ -974,7 +975,10 @@ class ConnectionPool(Resource): return self.connection.clone() def release_resource(self, resource): - resource._debug('released') + try: + resource._debug('released') + except AttributeError: + pass def close_resource(self, resource): resource._close() diff --git a/kombu/entity.py b/kombu/entity.py index e7ec141c..84a7a445 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -4,9 +4,6 @@ kombu.entity Exchange and Queue declarations. -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import @@ -266,8 +263,10 @@ class Exchange(MaybeChannelBound): return False def __repr__(self): - return super(Exchange, self).__repr__('Exchange %s(%s)' % (self.name, - self.type)) + return super(Exchange, self).__repr__(str(self)) + + def __str__(self): + return 'Exchange %s(%s)' % (self.name, self.type) @property def can_cache_declaration(self): @@ -289,12 +288,13 @@ class binding(object): self.exchange = exchange self.routing_key = routing_key self.arguments = arguments - self.unbind_arguments = None + self.unbind_arguments = unbind_arguments def declare(self, channel, nowait=False): """Declare destination exchange.""" if self.exchange and self.exchange.name: - self.exchange(channel).declare(nowait=nowait) + ex = self.exchange(channel) + ex.declare(nowait=nowait) def bind(self, entity, nowait=False): """Bind entity to this binding.""" @@ -305,11 +305,14 @@ class binding(object): def unbind(self, entity, nowait=False): """Unbind entity from this binding.""" - entity.unbind_from(exchange=self.exchange, + entity.unbind_from(self.exchange, routing_key=self.routing_key, arguments=self.unbind_arguments, nowait=nowait) + def __repr__(self): + return '<binding: %r -> %r>' % (self.exchange.name, self.routing_key) + class Queue(MaybeChannelBound): """A Queue declaration. @@ -423,19 +426,22 @@ class Queue(MaybeChannelBound): ('exclusive', bool), ('auto_delete', bool), ('no_ack', None), - ('alias', None)) + ('alias', None), + ('bindings', None)) def __init__(self, name='', exchange=None, routing_key='', channel=None, - **kwargs): + bindings=None, **kwargs): super(Queue, self).__init__(**kwargs) self.name = name or self.name self.exchange = exchange or self.exchange self.routing_key = routing_key or self.routing_key - self.bindings = set() + self.bindings = set(bindings or []) # allows Queue('name', [binding(...), binding(...), ...]) if isinstance(exchange, (list, tuple, set)): - self.bindings, self.exchange = set(exchange), None + self.bindings |= set(exchange) + if self.bindings: + self.exchange = None # exclusive implies auto-delete. if self.exclusive: @@ -457,9 +463,7 @@ class Queue(MaybeChannelBound): self.exchange.declare(nowait) self.queue_declare(nowait, passive=False) - if self.name: - # name should be set by queue_declare in the case - # of anonymous queues. + if self.exchange is not None: self.queue_bind(nowait) # - declare extra/multi-bindings. diff --git a/kombu/exceptions.py b/kombu/exceptions.py index dadc740a..2bf57f37 100644 --- a/kombu/exceptions.py +++ b/kombu/exceptions.py @@ -4,9 +4,6 @@ kombu.exceptions Exceptions. -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import diff --git a/kombu/messaging.py b/kombu/messaging.py index 7b16628a..e2ee2d6e 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -4,9 +4,6 @@ kombu.messaging Sending and receiving messages. -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import @@ -202,10 +199,12 @@ class Producer(object): def revive(self, channel): """Revive the producer after connection loss.""" if is_connection(channel): - promise = ChannelPromise(lambda: channel.default_channel) - self.__connection__ = channel - self._channel = promise - self.exchange = self.exchange(promise) + connection = channel + self.__connection__ = connection + channel = ChannelPromise(lambda: connection.default_channel) + if isinstance(channel, ChannelPromise): + self._channel = channel + self.exchange = self.exchange(channel) else: # Channel already concrete self._channel = channel @@ -277,7 +276,7 @@ class Consumer(object): #: The connection/channel to use for this consumer. channel = None - #: A single :class:`~kombu.entity.Queue`, or a list of queues to + #: A single :class:`~kombu.Queue`, or a list of queues to #: consume from. queues = None @@ -536,7 +535,7 @@ class Consumer(object): if m2p: message = m2p(message) decoded = None if on_m else message.decode() - except Exception as exc: + except Exception, exc: if not self.on_decode_error: raise self.on_decode_error(message, exc) diff --git a/kombu/mixins.py b/kombu/mixins.py index 818b5563..5b0dcbf4 100644 --- a/kombu/mixins.py +++ b/kombu/mixins.py @@ -4,9 +4,6 @@ kombu.mixins Useful mixin classes. -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import diff --git a/kombu/pidbox.py b/kombu/pidbox.py index e3998fd4..3723a4a1 100644 --- a/kombu/pidbox.py +++ b/kombu/pidbox.py @@ -4,9 +4,6 @@ kombu.pidbox Generic process mailbox. -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import @@ -18,11 +15,10 @@ from itertools import count from threading import local from time import time +from . import Exchange, Queue, Consumer, Producer from .clocks import LamportClock from .common import maybe_declare, oid_from -from .entity import Exchange, Queue from .five import range -from .messaging import Consumer, Producer from .utils import cached_property, kwdict, uuid REPLY_QUEUE_EXPIRES = 10 @@ -83,7 +79,7 @@ class Node(object): reply = handle(method, kwdict(arguments)) except SystemExit: raise - except Exception as exc: + except Exception, exc: reply = {'error': repr(exc)} if reply_to: diff --git a/kombu/pools.py b/kombu/pools.py index 0cd03c41..0aba7dc1 100644 --- a/kombu/pools.py +++ b/kombu/pools.py @@ -4,9 +4,6 @@ kombu.pools Public resource pools. -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import @@ -41,7 +38,12 @@ class ProducerPool(Resource): return self.connections.acquire(block=True) def create_producer(self): - return self.Producer(self._acquire_connection()) + conn = self._acquire_connection() + try: + return self.Producer(conn) + except BaseException: + conn.release() + raise def new(self): return lambda: self.create_producer() @@ -51,11 +53,19 @@ class ProducerPool(Resource): for _ in range(self.limit): self._resource.put_nowait(self.new()) + def close_resource(self, resource): + pass + def prepare(self, p): if isinstance(p, Callable): p = p() - if not p.channel: - p.revive(self._acquire_connection()) + if p._channel is None: + conn = self._acquire_connection() + try: + p.revive(conn) + except BaseException: + conn.release() + raise return p def release(self, resource): diff --git a/kombu/serialization.py b/kombu/serialization.py index 391c6a22..b1256164 100644 --- a/kombu/serialization.py +++ b/kombu/serialization.py @@ -5,9 +5,6 @@ kombu.serialization Serialization utilities. -:copyright: (c) 2009 - 2012 by Ask Solem -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import @@ -287,7 +284,7 @@ def register_json(): from anyjson import loads, dumps def _loads(obj): - if isinstance(obj, bytes): + if isinstance(obj, bytes_t): obj = obj.decode() return loads(obj) diff --git a/kombu/simple.py b/kombu/simple.py index 89d756a2..5f8a3e90 100644 --- a/kombu/simple.py +++ b/kombu/simple.py @@ -4,9 +4,6 @@ kombu.simple Simple interface. -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import diff --git a/kombu/syn.py b/kombu/syn.py index 7983f41c..7f6e8099 100644 --- a/kombu/syn.py +++ b/kombu/syn.py @@ -2,9 +2,6 @@ kombu.syn ========= -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import diff --git a/kombu/tests/mocks.py b/kombu/tests/mocks.py index 6e1be5eb..c3f1e882 100644 --- a/kombu/tests/mocks.py +++ b/kombu/tests/mocks.py @@ -22,6 +22,7 @@ class Message(base.Message): class Channel(base.StdChannel): open = True throw_decode_error = False + _ids = count(1) def __init__(self, connection): self.connection = connection @@ -29,6 +30,7 @@ class Channel(base.StdChannel): self.deliveries = count(1) self.to_deliver = [] self.events = {'basic_return': set()} + self.channel_id = next(self._ids) def _called(self, name): self.called.append(name) diff --git a/kombu/tests/test_clocks.py b/kombu/tests/test_clocks.py index 27b68f93..ed8c9fa8 100644 --- a/kombu/tests/test_clocks.py +++ b/kombu/tests/test_clocks.py @@ -1,5 +1,7 @@ from __future__ import absolute_import +from heapq import heappush + from kombu.clocks import LamportClock from .utils import TestCase @@ -17,6 +19,7 @@ class test_LamportClock(TestCase): c1.forward() c2.adjust(c1.value) self.assertEqual(c2.value, c1.value + 1) + self.assertTrue(repr(c1)) c2_val = c2.value c2.forward() @@ -26,3 +29,27 @@ class test_LamportClock(TestCase): c1.adjust(c2.value) self.assertEqual(c1.value, c2.value + 1) + + def test_sort(self): + c = LamportClock() + pid1 = 'a.example.com:312' + pid2 = 'b.example.com:311' + + events = [] + + m1 = (c.forward(), pid1) + heappush(events, m1) + m2 = (c.forward(), pid2) + heappush(events, m2) + m3 = (c.forward(), pid1) + heappush(events, m3) + m4 = (30, pid1) + heappush(events, m4) + m5 = (30, pid2) + heappush(events, m5) + + self.assertEqual(str(c), str(c.value)) + + self.assertEqual(c.sort_heap(events), m1) + self.assertEqual(c.sort_heap([m4, m5]), m4) + self.assertEqual(c.sort_heap([m4, m5, m1]), m4) diff --git a/kombu/tests/test_common.py b/kombu/tests/test_common.py index 524f60b9..ffcf9424 100644 --- a/kombu/tests/test_common.py +++ b/kombu/tests/test_common.py @@ -5,13 +5,55 @@ import socket from mock import patch from kombu import common -from kombu.common import (Broadcast, maybe_declare, - send_reply, isend_reply, collect_replies) +from kombu.common import ( + Broadcast, maybe_declare, + send_reply, isend_reply, collect_replies, + declaration_cached, ignore_errors, + QoS, PREFETCH_COUNT_MAX, + entry_to_queue, +) +from kombu.exceptions import StdChannelError from .utils import TestCase from .utils import ContextMock, Mock, MockPool +class test_ignore_errors(TestCase): + + def test_ignored(self): + connection = Mock() + connection.channel_errors = (KeyError, ) + connection.connection_errors = (KeyError, ) + + with ignore_errors(connection): + raise KeyError() + + def raising(): + raise KeyError() + + ignore_errors(connection, raising) + + connection.channel_errors = connection.connection_errors = \ + () + + with self.assertRaises(KeyError): + with ignore_errors(connection): + raise KeyError() + + +class test_declaration_cached(TestCase): + + def test_when_cached(self): + chan = Mock() + chan.connection.client.declared_entities = ['foo'] + self.assertTrue(declaration_cached('foo', chan)) + + def test_when_not_cached(self): + chan = Mock() + chan.connection.client.declared_entities = ['bar'] + self.assertFalse(declaration_cached('foo', chan)) + + class test_Broadcast(TestCase): def test_arguments(self): @@ -46,6 +88,10 @@ class test_maybe_declare(TestCase): maybe_declare(entity, channel) self.assertEqual(entity.declare.call_count, 1) + entity.channel.connection = None + with self.assertRaises(StdChannelError): + maybe_declare(entity) + def test_binds_entities(self): channel = Mock() channel.connection.client.declared_entities = set() @@ -298,3 +344,125 @@ class test_itermessages(TestCase): with self.assertRaises(StopIteration): next(it) + + +class test_entry_to_queue(TestCase): + + def test_calls_Queue_from_dict(self): + with patch('kombu.common.Queue') as Queue: + entry_to_queue('name', exchange='bar') + Queue.from_dict.assert_called_with('name', exchange='bar') + + +class test_QoS(TestCase): + + class _QoS(QoS): + def __init__(self, value): + self.value = value + QoS.__init__(self, None, value) + + def set(self, value): + return value + + def test_qos_exceeds_16bit(self): + with patch('kombu.common.logger') as logger: + callback = Mock() + qos = QoS(callback, 10) + qos.prev = 100 + qos.set(2 ** 32) + self.assertTrue(logger.warn.called) + callback.assert_called_with(prefetch_count=0) + + def test_qos_increment_decrement(self): + qos = self._QoS(10) + self.assertEqual(qos.increment_eventually(), 11) + self.assertEqual(qos.increment_eventually(3), 14) + self.assertEqual(qos.increment_eventually(-30), 14) + self.assertEqual(qos.decrement_eventually(7), 7) + self.assertEqual(qos.decrement_eventually(), 6) + + def test_qos_disabled_increment_decrement(self): + qos = self._QoS(0) + self.assertEqual(qos.increment_eventually(), 0) + self.assertEqual(qos.increment_eventually(3), 0) + self.assertEqual(qos.increment_eventually(-30), 0) + self.assertEqual(qos.decrement_eventually(7), 0) + self.assertEqual(qos.decrement_eventually(), 0) + self.assertEqual(qos.decrement_eventually(10), 0) + + def test_qos_thread_safe(self): + qos = self._QoS(10) + + def add(): + for i in range(1000): + qos.increment_eventually() + + def sub(): + for i in range(1000): + qos.decrement_eventually() + + def threaded(funs): + from threading import Thread + threads = [Thread(target=fun) for fun in funs] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + threaded([add, add]) + self.assertEqual(qos.value, 2010) + + qos.value = 1000 + threaded([add, sub]) # n = 2 + self.assertEqual(qos.value, 1000) + + def test_exceeds_short(self): + qos = QoS(Mock(), PREFETCH_COUNT_MAX - 1) + qos.update() + self.assertEqual(qos.value, PREFETCH_COUNT_MAX - 1) + qos.increment_eventually() + self.assertEqual(qos.value, PREFETCH_COUNT_MAX) + qos.increment_eventually() + self.assertEqual(qos.value, PREFETCH_COUNT_MAX + 1) + qos.decrement_eventually() + self.assertEqual(qos.value, PREFETCH_COUNT_MAX) + qos.decrement_eventually() + self.assertEqual(qos.value, PREFETCH_COUNT_MAX - 1) + + def test_consumer_increment_decrement(self): + mconsumer = Mock() + qos = QoS(mconsumer.qos, 10) + qos.update() + self.assertEqual(qos.value, 10) + mconsumer.qos.assert_called_with(prefetch_count=10) + qos.decrement_eventually() + qos.update() + self.assertEqual(qos.value, 9) + mconsumer.qos.assert_called_with(prefetch_count=9) + qos.decrement_eventually() + self.assertEqual(qos.value, 8) + mconsumer.qos.assert_called_with(prefetch_count=9) + self.assertIn({'prefetch_count': 9}, mconsumer.qos.call_args) + + # Does not decrement 0 value + qos.value = 0 + qos.decrement_eventually() + self.assertEqual(qos.value, 0) + qos.increment_eventually() + self.assertEqual(qos.value, 0) + + def test_consumer_decrement_eventually(self): + mconsumer = Mock() + qos = QoS(mconsumer.qos, 10) + qos.decrement_eventually() + self.assertEqual(qos.value, 9) + qos.value = 0 + qos.decrement_eventually() + self.assertEqual(qos.value, 0) + + def test_set(self): + mconsumer = Mock() + qos = QoS(mconsumer.qos, 10) + qos.set(12) + self.assertEqual(qos.prev, 12) + qos.set(qos.prev) diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py index 9bdf6343..4af9eb37 100644 --- a/kombu/tests/test_connection.py +++ b/kombu/tests/test_connection.py @@ -1,7 +1,11 @@ from __future__ import absolute_import +import errno import pickle +import socket +from copy import copy +from mock import patch from nose import SkipTest from kombu import Connection, Consumer, Producer, parse_url @@ -10,6 +14,7 @@ from kombu.five import items from .mocks import Transport from .utils import TestCase + from .utils import Mock, skip_if_not_module @@ -154,6 +159,157 @@ class test_Connection(TestCase): self.assertFalse(_connection.connected) self.assertIsInstance(conn.transport, Transport) + def test_multiple_urls(self): + conn1 = Connection('amqp://foo;amqp://bar') + self.assertEqual(conn1.hostname, 'foo') + self.assertListEqual(conn1.alt, ['amqp://foo', 'amqp://bar']) + + conn2 = Connection(['amqp://foo', 'amqp://bar']) + self.assertEqual(conn2.hostname, 'foo') + self.assertListEqual(conn2.alt, ['amqp://foo', 'amqp://bar']) + + def test_uri_passthrough(self): + from kombu import connection as mod + prev, mod.URI_PASSTHROUGH = mod.URI_PASSTHROUGH, set(['foo']) + try: + with patch('kombu.connection.parse_url') as parse_url: + c = Connection('foo+mysql://some_host') + self.assertEqual(c.transport_cls, 'foo') + self.assertFalse(parse_url.called) + self.assertEqual(c.hostname, 'mysql://some_host') + self.assertTrue(c.as_uri().startswith('foo+')) + with patch('kombu.connection.parse_url') as parse_url: + c = Connection('mysql://some_host', transport='foo') + self.assertEqual(c.transport_cls, 'foo') + self.assertFalse(parse_url.called) + self.assertEqual(c.hostname, 'mysql://some_host') + finally: + mod.URI_PASSTHROUGH = prev + c = Connection('amqp+sqlite://some_host') + self.assertTrue(c.as_uri().startswith('amqp+')) + + def test_default_ensure_callback(self): + with patch('kombu.connection.logger') as logger: + c = Connection(transport=Mock) + c._default_ensure_callback(KeyError(), 3) + self.assertTrue(logger.error.called) + + def test_ensure_connection_on_error(self): + c = Connection('amqp://A;amqp://B') + with patch('kombu.connection.retry_over_time') as rot: + c.ensure_connection() + self.assertTrue(rot.called) + + args = rot.call_args[0] + cb = args[4] + intervals = iter([1, 2, 3, 4, 5]) + self.assertEqual(cb(KeyError(), intervals, 0), 0) + self.assertEqual(cb(KeyError(), intervals, 1), 1) + self.assertEqual(cb(KeyError(), intervals, 2), 0) + self.assertEqual(cb(KeyError(), intervals, 3), 2) + self.assertEqual(cb(KeyError(), intervals, 4), 0) + self.assertEqual(cb(KeyError(), intervals, 5), 3) + self.assertEqual(cb(KeyError(), intervals, 6), 0) + self.assertEqual(cb(KeyError(), intervals, 7), 4) + + errback = Mock() + c.ensure_connection(errback=errback) + args = rot.call_args[0] + cb = args[4] + self.assertEqual(cb(KeyError(), intervals, 0), 0) + self.assertTrue(errback.called) + + def test_drain_nowait(self): + c = Connection(transport=Mock) + c.drain_events = Mock() + c.drain_events.side_effect = socket.timeout() + + c.more_to_read = True + self.assertFalse(c.drain_nowait()) + self.assertFalse(c.more_to_read) + + c.drain_events.side_effect = socket.error() + c.drain_events.side_effect.errno = errno.EAGAIN + c.more_to_read = True + self.assertFalse(c.drain_nowait()) + self.assertFalse(c.more_to_read) + + c.drain_events.side_effect = socket.error() + c.drain_events.side_effect.errno = errno.EPERM + with self.assertRaises(socket.error): + c.drain_nowait() + + c.more_to_read = False + c.drain_events = Mock() + self.assertTrue(c.drain_nowait()) + c.drain_events.assert_called_with(timeout=0) + self.assertTrue(c.more_to_read) + + def test_supports_heartbeats(self): + c = Connection(transport=Mock) + c.transport.supports_heartbeats = False + self.assertFalse(c.supports_heartbeats) + + def test_is_evented(self): + c = Connection(transport=Mock) + c.transport.supports_ev = False + self.assertFalse(c.is_evented) + + def test_eventmap(self): + c = Connection(transport=Mock) + c.transport.eventmap.return_value = {1: 1, 2: 2} + self.assertDictEqual(c.eventmap, {1: 1, 2: 2}) + c.transport.eventmap.assert_called_with(c.connection) + + def test_manager(self): + c = Connection(transport=Mock) + self.assertIs(c.manager, c.transport.manager) + + def test_copy(self): + c = Connection('amqp://example.com') + self.assertEqual(copy(c).info(), c.info()) + + def test_switch(self): + c = Connection('amqp://foo') + c._closed = True + c.switch('redis://example.com//3') + self.assertFalse(c._closed) + self.assertEqual(c.hostname, 'example.com') + self.assertEqual(c.transport_cls, 'redis') + self.assertEqual(c.virtual_host, '/3') + + def test_maybe_switch_next(self): + c = Connection('amqp://foo;redis://example.com//3') + c.maybe_switch_next() + self.assertFalse(c._closed) + self.assertEqual(c.hostname, 'example.com') + self.assertEqual(c.transport_cls, 'redis') + self.assertEqual(c.virtual_host, '/3') + + def test_maybe_switch_next_no_cycle(self): + c = Connection('amqp://foo') + c.maybe_switch_next() + self.assertFalse(c._closed) + self.assertEqual(c.hostname, 'foo') + self.assertIn(c.transport_cls, ('librabbitmq', 'pyamqp')) + + def test_heartbeat_check(self): + c = Connection(transport=Transport) + c.transport.heartbeat_check = Mock() + c.heartbeat_check(3) + c.transport.heartbeat_check.assert_called_with(c.connection, rate=3) + + def test_completes_cycle_no_cycle(self): + c = Connection('amqp://') + self.assertTrue(c.completes_cycle(0)) + self.assertTrue(c.completes_cycle(1)) + + def test_completes_cycle(self): + c = Connection('amqp://a;amqp://b;amqp://c') + self.assertFalse(c.completes_cycle(0)) + self.assertFalse(c.completes_cycle(1)) + self.assertTrue(c.completes_cycle(2)) + def test__enter____exit__(self): conn = self.conn context = conn.__enter__() @@ -357,6 +513,18 @@ class ResourceCase(TestCase): [chan.release() for chan in chans] self.assertState(P, 10, 0) + def test_acquire_prepare_raises(self): + if self.abstract: + return + P = self.create_resource(10, 0) + + self.assertEqual(len(P._resource.queue), 10) + P.prepare = Mock() + P.prepare.side_effect = IOError() + with self.assertRaises(IOError): + P.acquire(block=True) + self.assertEqual(len(P._resource.queue), 10) + def test_acquire_no_limit(self): if self.abstract: return @@ -440,6 +608,12 @@ class test_ConnectionPool(ResourceCase): self.assertIsNotNone(q[1]._connection) self.assertIsNone(q[2]()._connection) + def test_release_no__debug(self): + P = self.create_resource(10, 2) + R = Mock() + R._debug.side_effect = AttributeError() + P.release_resource(R) + def test_setup_no_limit(self): P = self.create_resource(None, None) self.assertFalse(P._resource.queue) diff --git a/kombu/tests/test_entities.py b/kombu/tests/test_entities.py index cdd9c34a..fadaf358 100644 --- a/kombu/tests/test_entities.py +++ b/kombu/tests/test_entities.py @@ -1,7 +1,10 @@ from __future__ import absolute_import -from kombu import Connection -from kombu.entity import Exchange, Queue +import pickle + +from mock import call + +from kombu import Connection, Exchange, Queue, binding from kombu.exceptions import NotBoundError from .mocks import Transport @@ -13,6 +16,48 @@ def get_conn(): return Connection(transport=Transport) +class test_binding(TestCase): + + def test_constructor(self): + x = binding(Exchange('foo'), 'rkey', + arguments={'barg': 'bval'}, + unbind_arguments={'uarg': 'uval'}, + ) + self.assertEqual(x.exchange, Exchange('foo')) + self.assertEqual(x.routing_key, 'rkey') + self.assertDictEqual(x.arguments, {'barg': 'bval'}) + self.assertDictEqual(x.unbind_arguments, {'uarg': 'uval'}) + + def test_declare(self): + chan = get_conn().channel() + x = binding(Exchange('foo'), 'rkey') + x.declare(chan) + self.assertIn('exchange_declare', chan) + + def test_declare_no_exchange(self): + chan = get_conn().channel() + x = binding() + x.declare(chan) + self.assertNotIn('exchange_declare', chan) + + def test_bind(self): + chan = get_conn().channel() + x = binding(Exchange('foo')) + x.bind(Exchange('bar')(chan)) + self.assertIn('exchange_bind', chan) + + def test_unbind(self): + chan = get_conn().channel() + x = binding(Exchange('foo')) + x.unbind(Exchange('bar')(chan)) + self.assertIn('exchange_unbind', chan) + + def test_repr(self): + b = binding(Exchange('foo'), 'rkey') + self.assertIn('foo', repr(b)) + self.assertIn('rkey', repr(b)) + + class test_Exchange(TestCase): def test_bound(self): @@ -24,7 +69,8 @@ class test_Exchange(TestCase): bound = exchange.bind(chan) self.assertTrue(bound.is_bound) self.assertIs(bound.channel, chan) - self.assertIn('<bound', repr(bound)) + self.assertIn('bound to chan:%r' % (chan.channel_id, ), + repr(bound)) def test_hash(self): self.assertEqual(hash(Exchange('a')), hash(Exchange('a'))) @@ -34,6 +80,11 @@ class test_Exchange(TestCase): self.assertTrue(Exchange('a', durable=True).can_cache_declaration) self.assertFalse(Exchange('a', durable=False).can_cache_declaration) + def test_pickle(self): + e1 = Exchange('foo', 'direct') + e2 = pickle.loads(pickle.dumps(e1)) + self.assertEqual(e1, e2) + def test_eq(self): e1 = Exchange('foo', 'direct') e2 = Exchange('foo', 'direct') @@ -111,6 +162,12 @@ class test_Exchange(TestCase): foo(chan).bind_to(bar) self.assertIn('exchange_bind', chan) + def test_bind_to_by_name(self): + chan = get_conn().channel() + foo = Exchange('foo', 'topic') + foo(chan).bind_to('bar') + self.assertIn('exchange_bind', chan) + def test_unbind_from(self): chan = get_conn().channel() foo = Exchange('foo', 'topic') @@ -118,6 +175,12 @@ class test_Exchange(TestCase): foo(chan).unbind_from(bar) self.assertIn('exchange_unbind', chan) + def test_unbind_from_by_name(self): + chan = get_conn().channel() + foo = Exchange('foo', 'topic') + foo(chan).unbind_from('bar') + self.assertIn('exchange_unbind', chan) + class test_Queue(TestCase): @@ -128,6 +191,14 @@ class test_Queue(TestCase): self.assertEqual(hash(Queue('a')), hash(Queue('a'))) self.assertNotEqual(hash(Queue('a')), hash(Queue('b'))) + def test_anonymous(self): + chan = Mock() + x = Queue(bindings=[binding(Exchange('foo'), 'rkey')]) + chan.queue_declare.return_value = 'generated', 0, 0 + xx = x(chan) + xx.declare() + self.assertEqual(xx.name, 'generated') + def test_when_bound_but_no_exchange(self): q = Queue('a') q.exchange = None @@ -141,7 +212,37 @@ class test_Queue(TestCase): q.declare() q.queue_declare.assert_called_with(False, passive=False) - q.queue_bind.assert_called_with(False) + + def test_bind_to_when_name(self): + chan = Mock() + q = Queue('a') + q(chan).bind_to('ex') + self.assertTrue(chan.queue_bind.called) + + def test_get_when_no_m2p(self): + chan = Mock() + q = Queue('a')(chan) + chan.message_to_python = None + self.assertTrue(q.get()) + + def test_multiple_bindings(self): + chan = Mock() + q = Queue('mul', [ + binding(Exchange('mul1'), 'rkey1'), + binding(Exchange('mul2'), 'rkey2'), + binding(Exchange('mul3'), 'rkey3'), + ]) + q(chan).declare() + self.assertIn( + call(nowait=False, + exchange='mul1', + auto_delete=False, + passive=False, + arguments=None, + type='direct', + durable=True, + ), chan.exchange_declare.call_args_list, + ) def test_can_cache_declaration(self): self.assertTrue(Queue('a', durable=True).can_cache_declaration) diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py index 4c5fe619..249af87f 100644 --- a/kombu/tests/test_messaging.py +++ b/kombu/tests/test_messaging.py @@ -1,13 +1,14 @@ from __future__ import absolute_import, unicode_literals import anyjson +import pickle +from collections import defaultdict from mock import patch -from kombu import Connection +from kombu import Connection, Consumer, Producer, Exchange, Queue from kombu.exceptions import MessageStateError -from kombu.messaging import Consumer, Producer -from kombu.entity import Exchange, Queue +from kombu.utils import ChannelPromise from .mocks import Transport from .utils import TestCase @@ -23,6 +24,16 @@ class test_Producer(TestCase): self.assertTrue(self.connection.connection.connected) self.assertFalse(self.exchange.is_bound) + def test_pickle(self): + chan = Mock() + producer = Producer(chan, serializer='pickle') + p2 = pickle.loads(pickle.dumps(producer)) + self.assertEqual(p2.serializer, producer.serializer) + + def test_no_channel(self): + p = Producer(None) + self.assertFalse(p._channel) + @patch('kombu.common.maybe_declare') def test_maybe_declare(self, maybe_declare): p = self.connection.Producer() @@ -109,11 +120,25 @@ class test_Producer(TestCase): def test_publish_with_Exchange_instance(self): p = self.connection.Producer() p.channel = Mock() - p.publish('hello', exchange=Exchange('foo')) + p.publish('hello', exchange=Exchange('foo'), delivery_mode='transient') self.assertEqual( p._channel.basic_publish.call_args[1]['exchange'], 'foo', ) + def test_set_on_return(self): + chan = Mock() + chan.events = defaultdict(Mock) + p = Producer(ChannelPromise(lambda: chan), on_return='on_return') + p.channel + chan.events['basic_return'].add.assert_called_with('on_return') + + def test_publish_retry_calls_ensure(self): + p = Producer(Mock()) + p._connection = Mock() + ensure = p.connection.ensure = Mock() + p.publish('foo', exchange='foo', retry=True) + self.assertTrue(ensure.called) + def test_publish_retry_with_declare(self): p = self.connection.Producer() p.maybe_declare = Mock() @@ -195,6 +220,12 @@ class test_Consumer(TestCase): self.assertTrue(self.connection.connection.connected) self.exchange = Exchange('foo', 'direct') + def test_set_no_channel(self): + c = Consumer(None) + self.assertIsNone(c.channel) + c.revive(Mock()) + self.assertTrue(c.channel) + def test_set_no_ack(self): channel = self.connection.channel() queue = Queue('qname', self.exchange, 'rkey') diff --git a/kombu/tests/test_pools.py b/kombu/tests/test_pools.py index 32d1294f..a9386602 100644 --- a/kombu/tests/test_pools.py +++ b/kombu/tests/test_pools.py @@ -1,9 +1,8 @@ from __future__ import absolute_import -from kombu import Connection +from kombu import Connection, Producer from kombu import pools from kombu.connection import ConnectionPool -from kombu.messaging import Producer from kombu.utils import eqhash from .utils import TestCase @@ -26,6 +25,34 @@ class test_ProducerPool(TestCase): self.connections = Mock() self.pool = self.Pool(self.connections, limit=10) + def test_releases_connection_when_Producer_raises(self): + self.pool.Producer = Mock() + self.pool.Producer.side_effect = IOError() + acq = self.pool._acquire_connection = Mock() + conn = acq.return_value = Mock() + with self.assertRaises(IOError): + self.pool.create_producer() + conn.release.assert_called_with() + + def test_prepare_release_connection_on_error(self): + pp = Mock() + p = pp.return_value = Mock() + p.revive.side_effect = IOError() + acq = self.pool._acquire_connection = Mock() + conn = acq.return_value = Mock() + p._channel = None + with self.assertRaises(IOError): + self.pool.prepare(pp) + conn.release.assert_called_with() + + def test_release_releases_connection(self): + p = Mock() + p.__connection__ = Mock() + self.pool.release(p) + p.__connection__.release.assert_called_with() + p.__connection__ = None + self.pool.release(p) + def test_init(self): self.assertIs(self.pool.connections, self.connections) @@ -57,7 +84,7 @@ class test_ProducerPool(TestCase): def test_prepare(self): connection = self.connections.acquire.return_value = Mock() pool = self.MyPool(self.connections, limit=10) - pool.instance.channel = None + pool.instance._channel = None first = pool._resource.get_nowait() producer = pool.prepare(first) self.assertTrue(self.connections.acquire.called) @@ -66,7 +93,7 @@ class test_ProducerPool(TestCase): def test_prepare_channel_already_created(self): self.connections.acquire.return_value = Mock() pool = self.MyPool(self.connections, limit=10) - pool.instance.channel = Mock() + pool.instance._channel = Mock() first = pool._resource.get_nowait() self.connections.acquire.reset() producer = pool.prepare(first) diff --git a/kombu/tests/test_utils.py b/kombu/tests/test_utils.py index 3633f8b1..38b0f581 100644 --- a/kombu/tests/test_utils.py +++ b/kombu/tests/test_utils.py @@ -5,12 +5,15 @@ import sys from functools import wraps from io import BytesIO, StringIO +from mock import Mock, patch from kombu import utils from kombu.five import string_t -from .utils import redirect_stdouts, mask_modules, skip_if_module -from .utils import TestCase +from .utils import ( + TestCase, + redirect_stdouts, mask_modules, module_exists, skip_if_module, +) class OldString(object): @@ -28,6 +31,13 @@ class OldString(object): return self.value.rsplit(*args, **kwargs) +class test_kombu_module(TestCase): + + def test_dir(self): + import kombu + self.assertTrue(dir(kombu)) + + class test_utils(TestCase): def test_maybe_list(self): @@ -174,10 +184,21 @@ class test_retry_over_time(TestCase): @insomnia def test_simple(self): - x = utils.retry_over_time(self.myfun, self.Predicate, - errback=self.errback, interval_max=14) - self.assertEqual(x, 42) - self.assertEqual(self.index, 9) + prev_count, utils.count = utils.count, Mock() + try: + utils.count.return_value = range(1) + x = utils.retry_over_time(self.myfun, self.Predicate, + errback=None, interval_max=14) + self.assertIsNone(x) + utils.count.return_value = range(10) + cb = Mock() + x = utils.retry_over_time(self.myfun, self.Predicate, + errback=self.errback, callback=cb, interval_max=14) + self.assertEqual(x, 42) + self.assertEqual(self.index, 9) + cb.assert_called_with() + finally: + utils.count = prev_count @insomnia def test_retry_once(self): @@ -200,6 +221,26 @@ class test_retry_over_time(TestCase): class test_cached_property(TestCase): + def test_deleting(self): + + class X(object): + xx = False + + @utils.cached_property + def foo(self): + return 42 + + @foo.deleter # noqa + def foo(self, value): + self.xx = value + + x = X() + del(x.foo) + self.assertFalse(x.xx) + x.__dict__['foo'] = 'here' + del(x.foo) + self.assertEqual(x.xx, 'here') + def test_when_access_from_class(self): class X(object): @@ -226,3 +267,75 @@ class test_cached_property(TestCase): self.assertEqual(x.xx, 10) del(x.foo) + + +class test_symbol_by_name(TestCase): + + def test_instance_returns_instance(self): + instance = object() + self.assertIs(utils.symbol_by_name(instance), instance) + + def test_returns_default(self): + default = object() + self.assertIs(utils.symbol_by_name('xyz.ryx.qedoa.weq:foz', + default=default), default) + + def test_no_default(self): + with self.assertRaises(ImportError): + utils.symbol_by_name('xyz.ryx.qedoa.weq:foz') + + def test_imp_reraises_ValueError(self): + imp = Mock() + imp.side_effect = ValueError() + with self.assertRaises(ValueError): + utils.symbol_by_name('kombu.Connection', imp=imp) + + def test_package(self): + from kombu.entity import Exchange + self.assertIs(utils.symbol_by_name('.entity:Exchange', + package='kombu'), Exchange) + self.assertTrue(utils.symbol_by_name(':Consumer', package='kombu')) + + +class test_ChannelPromise(TestCase): + + def test_repr(self): + self.assertEqual(repr(utils.ChannelPromise(lambda: 'foo')), + "<promise: 'foo'>") + + +class test_entrypoints(TestCase): + + @mask_modules('pkg_resources') + def test_without_pkg_resources(self): + self.assertListEqual(list(utils.entrypoints('kombu.test')), []) + + @module_exists('pkg_resources') + def test_with_pkg_resources(self): + with patch('pkg_resources.iter_entry_points', create=True) as iterep: + eps = iterep.return_value = [Mock(), Mock()] + + self.assertTrue(list(utils.entrypoints('kombu.test'))) + iterep.assert_called_with('kombu.test') + eps[0].load.assert_called_with() + eps[1].load.assert_called_with() + + +class test_shufflecycle(TestCase): + + def test_shuffles(self): + prev_repeat, utils.repeat = utils.repeat, Mock() + try: + utils.repeat.return_value = range(10) + values = set(['A', 'B', 'C']) + cycle = utils.shufflecycle(values) + seen = set() + for i in xrange(10): + cycle.next() + utils.repeat.assert_called_with(None) + self.assertTrue(seen.issubset(values)) + with self.assertRaises(StopIteration): + cycle.next() + cycle.next() + finally: + utils.repeat = prev_repeat diff --git a/kombu/tests/transport/test_amqplib.py b/kombu/tests/transport/test_amqplib.py index 6ce95cc9..fb6c0141 100644 --- a/kombu/tests/transport/test_amqplib.py +++ b/kombu/tests/transport/test_amqplib.py @@ -4,7 +4,7 @@ import sys from nose import SkipTest -from kombu.connection import Connection +from kombu import Connection from kombu.tests.utils import TestCase from kombu.tests.utils import mask_modules, Mock diff --git a/kombu/tests/transport/test_filesystem.py b/kombu/tests/transport/test_filesystem.py index 6a8d64d1..7bdad8f5 100644 --- a/kombu/tests/transport/test_filesystem.py +++ b/kombu/tests/transport/test_filesystem.py @@ -4,9 +4,7 @@ import tempfile from nose import SkipTest -from kombu.connection import Connection -from kombu.entity import Exchange, Queue -from kombu.messaging import Consumer, Producer +from kombu import Connection, Exchange, Queue, Consumer, Producer from kombu.tests.utils import TestCase diff --git a/kombu/tests/transport/test_memory.py b/kombu/tests/transport/test_memory.py index 95e3432e..970681d7 100644 --- a/kombu/tests/transport/test_memory.py +++ b/kombu/tests/transport/test_memory.py @@ -2,9 +2,7 @@ from __future__ import absolute_import import socket -from kombu.connection import Connection -from kombu.entity import Exchange, Queue -from kombu.messaging import Consumer, Producer +from kombu import Connection, Exchange, Queue, Consumer, Producer from kombu.tests.utils import TestCase diff --git a/kombu/tests/transport/test_mongodb.py b/kombu/tests/transport/test_mongodb.py index b05e7d9f..522df04a 100644 --- a/kombu/tests/transport/test_mongodb.py +++ b/kombu/tests/transport/test_mongodb.py @@ -2,7 +2,7 @@ from __future__ import absolute_import from nose import SkipTest -from kombu.connection import Connection +from kombu import Connection from kombu.tests.utils import TestCase, skip_if_not_module diff --git a/kombu/tests/transport/test_pyamqp.py b/kombu/tests/transport/test_pyamqp.py index b48ec153..7936484f 100644 --- a/kombu/tests/transport/test_pyamqp.py +++ b/kombu/tests/transport/test_pyamqp.py @@ -2,6 +2,7 @@ from __future__ import absolute_import import sys +from mock import patch from nose import SkipTest try: @@ -10,7 +11,7 @@ except ImportError: pyamqp = None # noqa else: from kombu.transport import pyamqp -from kombu.connection import Connection +from kombu import Connection from kombu.tests.utils import TestCase from kombu.tests.utils import mask_modules, Mock @@ -160,3 +161,26 @@ class test_pyamqp(TestCase): c = Connection(port=1337, transport=Transport).connect() self.assertEqual(c['host'], '127.0.0.1:1337') + + def test_eventmap(self): + t = pyamqp.Transport(Mock()) + conn = Mock() + self.assertDictEqual(t.eventmap(conn), + {conn.sock: t.client.drain_nowait}) + + def test_event_interface(self): + t = pyamqp.Transport(Mock()) + t.on_poll_init(Mock()) + t.on_poll_start() + + def test_heartbeat_check(self): + t = pyamqp.Transport(Mock()) + conn = Mock() + t.heartbeat_check(conn, rate=4.331) + conn.heartbeat_tick.assert_called_with(rate=4.331) + + def test_get_manager(self): + with patch('kombu.transport.pyamqp.get_manager') as get_manager: + t = pyamqp.Transport(Mock()) + t.get_manager(1, kw=2) + get_manager.assert_called_with(t.client, 1, kw=2) diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py index 403e815e..cf166e87 100644 --- a/kombu/tests/transport/test_redis.py +++ b/kombu/tests/transport/test_redis.py @@ -7,11 +7,9 @@ from anyjson import dumps from collections import defaultdict from itertools import count -from kombu.connection import Connection -from kombu.entity import Exchange, Queue +from kombu import Connection, Exchange, Queue, Consumer, Producer from kombu.exceptions import InconsistencyError, VersionMismatch from kombu.five import Empty, Queue as _Queue -from kombu.messaging import Consumer, Producer from kombu.utils import eventio # patch poll from kombu.tests.utils import TestCase @@ -42,9 +40,7 @@ class Client(object): hashes = defaultdict(dict) shard_hint = None - def __init__(self, db=None, port=None, **kwargs): - self.port = port - self.db = db + def __init__(self, db=None, port=None, connection_pool=None, **kwargs): self._called = [] self._connection = None self.bgsave_raises_ResponseError = False @@ -384,10 +380,9 @@ class test_Channel(TestCase): c.connection.disconnect.assert_called_with() def test_invalid_database_raises_ValueError(self): - self.channel.connection.client.virtual_host = 'xfeqwewkfk' with self.assertRaises(ValueError): - self.channel._create_client() + Connection('redis:///dwqwewqe').channel() @skip_if_not_module('redis') def test_get_client(self): @@ -395,7 +390,7 @@ class test_Channel(TestCase): KombuRedis = redis.Channel._get_client(self.channel) self.assertTrue(KombuRedis) - Rv = getattr(R, 'VERSION') + Rv = getattr(R, 'VERSION', None) try: R.VERSION = (2, 4, 0) with self.assertRaises(VersionMismatch): @@ -528,29 +523,23 @@ class test_Redis(TestCase): channel.close() def test_db_values(self): - c1 = Connection(virtual_host=1, - transport=Transport).channel() - self.assertEqual(c1.client.db, 1) + Connection(virtual_host=1, + transport=Transport).channel() - c2 = Connection(virtual_host='1', - transport=Transport).channel() - self.assertEqual(c2.client.db, 1) + Connection(virtual_host='1', + transport=Transport).channel() - c3 = Connection(virtual_host='/1', - transport=Transport).channel() - self.assertEqual(c3.client.db, 1) + Connection(virtual_host='/1', + transport=Transport).channel() with self.assertRaises(Exception): - Connection(virtual_host='/foo', - transport=Transport).channel() + Connection('redis:///foo').channel() def test_db_port(self): c1 = Connection(port=None, transport=Transport).channel() - self.assertEqual(c1.client.port, Transport.default_port) c1.close() c2 = Connection(port=9999, transport=Transport).channel() - self.assertEqual(c2.client.port, 9999) c2.close() def test_close_poller_not_active(self): diff --git a/kombu/tests/transport/test_sqlalchemy.py b/kombu/tests/transport/test_sqlalchemy.py index 82b51ebc..f3d7d239 100644 --- a/kombu/tests/transport/test_sqlalchemy.py +++ b/kombu/tests/transport/test_sqlalchemy.py @@ -3,7 +3,7 @@ from __future__ import absolute_import from mock import patch from nose import SkipTest -from kombu.connection import Connection +from kombu import Connection from kombu.tests.utils import TestCase diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py index eb7eaf6d..232be80b 100644 --- a/kombu/tests/transport/virtual/test_base.py +++ b/kombu/tests/transport/virtual/test_base.py @@ -91,6 +91,9 @@ class test_QoS(TestCase): self.assertTrue(stderr.getvalue()) self.assertFalse(stdout.getvalue()) + self.q.restore_at_shutdown = False + self.q.restore_unacked_once() + def test_get(self): self.q._delivered['foo'] = 1 self.assertEqual(self.q.get('foo'), 1) @@ -176,10 +179,34 @@ class test_Channel(TestCase): if self.channel._qos is not None: self.channel._qos._on_collect.cancel() + def test_exchange_bind_interface(self): + with self.assertRaises(NotImplementedError): + self.channel.exchange_bind('dest', 'src', 'key') + + def test_exchange_unbind_interface(self): + with self.assertRaises(NotImplementedError): + self.channel.exchange_unbind('dest', 'src', 'key') + + def test_queue_unbind_interface(self): + with self.assertRaises(NotImplementedError): + self.channel.queue_unbind('dest', 'ex', 'key') + + def test_management(self): + m = self.channel.connection.client.get_manager() + self.assertTrue(m) + m.get_bindings() + m.close() + def test_exchange_declare(self): c = self.channel + + with self.assertRaises(StdChannelError): + c.exchange_declare('test_exchange_declare', 'direct', + durable=True, auto_delete=True, passive=True) c.exchange_declare('test_exchange_declare', 'direct', durable=True, auto_delete=True) + c.exchange_declare('test_exchange_declare', 'direct', + durable=True, auto_delete=True, passive=True) self.assertIn('test_exchange_declare', c.state.exchanges) # can declare again with same values c.exchange_declare('test_exchange_declare', 'direct', @@ -348,7 +375,7 @@ class test_Channel(TestCase): exc = None try: raise KeyError() - except KeyError as exc_: + except KeyError, exc_: exc = exc_ ru.return_value = [(exc, 1)] diff --git a/kombu/tests/utilities/test_amq_manager.py b/kombu/tests/utilities/test_amq_manager.py new file mode 100644 index 00000000..8f85a2c3 --- /dev/null +++ b/kombu/tests/utilities/test_amq_manager.py @@ -0,0 +1,35 @@ +from __future__ import absolute_import + +from mock import patch + +from kombu import Connection +from kombu.tests.utils import TestCase, mask_modules, module_exists + + +class test_get_manager(TestCase): + + @mask_modules('pyrabbit') + def test_without_pyrabbit(self): + with self.assertRaises(ImportError): + Connection('amqp://').get_manager() + + @module_exists('pyrabbit') + def test_with_pyrabbit(self): + with patch('pyrabbit.Client', create=True) as Client: + manager = Connection('amqp://').get_manager() + self.assertIsNotNone(manager) + Client.assert_called_with('localhost:55672', + 'guest', 'guest') + + @module_exists('pyrabbit') + def test_transport_options(self): + with patch('pyrabbit.Client', create=True) as Client: + manager = Connection('amqp://', transport_options={ + 'manager_hostname': 'admin.mq.vandelay.com', + 'manager_port': 808, + 'manager_userid': 'george', + 'manager_password': 'bosco', + }).get_manager() + self.assertIsNotNone(manager) + Client.assert_called_with('admin.mq.vandelay.com:808', + 'george', 'bosco') diff --git a/kombu/tests/utilities/test_debug.py b/kombu/tests/utilities/test_debug.py new file mode 100644 index 00000000..d364400f --- /dev/null +++ b/kombu/tests/utilities/test_debug.py @@ -0,0 +1,58 @@ +from __future__ import absolute_import + +import logging + +from mock import Mock, patch + +from kombu.utils.debug import ( + setup_logging, + Logwrapped, +) +from kombu.tests.utils import TestCase + + +class test_setup_logging(TestCase): + + def test_adds_handlers_sets_level(self): + with patch('kombu.utils.debug.get_logger') as get_logger: + logger = get_logger.return_value = Mock() + setup_logging(loggers=['kombu.test']) + + get_logger.assert_called_with('kombu.test') + + self.assertTrue(logger.addHandler.called) + logger.setLevel.assert_called_with(logging.DEBUG) + + +class test_Logwrapped(TestCase): + + def test_wraps(self): + with patch('kombu.utils.debug.get_logger') as get_logger: + logger = get_logger.return_value = Mock() + + W = Logwrapped(Mock(), 'kombu.test') + get_logger.assert_called_with('kombu.test') + self.assertIsNotNone(W.instance) + self.assertIs(W.logger, logger) + + W.instance.__repr__ = lambda s: 'foo' + self.assertEqual(repr(W), 'foo') + self.assertListEqual(dir(W), dir(W.instance)) + + W.instance.some_attr = 303 + self.assertEqual(W.some_attr, 303) + + W.instance.some_method.__name__ = 'some_method' + W.some_method(1, 2, kw=1) + W.instance.some_method.assert_called_with(1, 2, kw=1) + + W.some_method() + W.instance.some_method.assert_called_with() + + W.some_method(kw=1) + W.instance.some_method.assert_called_with(kw=1) + + W.ident = 'ident' + W.some_method(kw=1) + self.assertTrue(logger.debug.called) + self.assertIn('ident', logger.debug.call_args[0][0]) diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index 2020b41a..12d92226 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -4,9 +4,6 @@ kombu.transport.SQS Amazon SQS transport. -:copyright: (c) 2010 - 2012 by Ask Solem -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import @@ -273,7 +270,7 @@ class Channel(virtual.Channel): if conn: try: conn.close() - except AttributeError as exc: # FIXME ??? + except AttributeError, exc: # FIXME ??? if "can't set attribute" not in str(exc): raise diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index e8f01486..e14015ef 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -4,9 +4,6 @@ kombu.transport Built-in transports. -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import @@ -51,8 +48,6 @@ TRANSPORT_ALIASES = { 'amqp': 'kombu.transport.pyamqp:Transport', 'pyamqp': 'kombu.transport.pyamqp:Transport', 'librabbitmq': 'kombu.transport.librabbitmq:Transport', - 'pika': 'kombu.transport.pika2:Transport', - 'oldpika': 'kombu.transport.pika:SyncTransport', 'memory': 'kombu.transport.memory:Transport', 'redis': 'kombu.transport.redis:Transport', 'SQS': 'kombu.transport.SQS:Transport', diff --git a/kombu/transport/amqplib.py b/kombu/transport/amqplib.py index 9f9f467d..1f9697fb 100644 --- a/kombu/transport/amqplib.py +++ b/kombu/transport/amqplib.py @@ -4,9 +4,6 @@ kombu.transport.amqplib amqplib transport. -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import @@ -59,7 +56,7 @@ class TCPTransport(transport.TCPTransport): while len(self._read_buffer) < n: try: s = self.sock.recv(65536) - except socket.error as exc: + except socket.error, exc: if not initial and exc.errno in (errno.EAGAIN, errno.EINTR): continue raise @@ -108,7 +105,7 @@ class SSLTransport(transport.SSLTransport): while len(result) < n: try: s = self.sslobj.read(n - len(result)) - except socket.error as exc: + except socket.error, exc: if not initial and exc.errno in (errno.EAGAIN, errno.EINTR): continue raise @@ -192,7 +189,7 @@ class Connection(amqp.Connection): # pragma: no cover try: try: return self.method_reader.read_method() - except SSLError as exc: + except SSLError, exc: # http://bugs.python.org/issue10272 if 'timed out' in str(exc): raise socket.timeout() diff --git a/kombu/transport/base.py b/kombu/transport/base.py index c402f2f7..ff7a1640 100644 --- a/kombu/transport/base.py +++ b/kombu/transport/base.py @@ -4,9 +4,6 @@ kombu.transport.base Base transport interface. -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import @@ -107,14 +104,14 @@ class Message(object): def ack_log_error(self, logger, errors): try: self.ack() - except errors as exc: + except errors, exc: logger.critical("Couldn't ack %r, reason:%r", self.delivery_tag, exc, exc_info=True) def reject_log_error(self, logger, errors): try: self.reject() - except errors as exc: + except errors, exc: logger.critical("Couldn't ack %r, reason: %r", self.delivery_tag, exc, exc_info=True) diff --git a/kombu/transport/librabbitmq.py b/kombu/transport/librabbitmq.py index 7578c55a..3548a8c0 100644 --- a/kombu/transport/librabbitmq.py +++ b/kombu/transport/librabbitmq.py @@ -6,9 +6,6 @@ kombu.transport.librabbitmq .. _`librabbitmq`: http://pypi.python.org/librabbitmq/ -:copyright: (c) 2010 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import @@ -116,6 +113,9 @@ class Transport(base.Transport): """Close the AMQP broker connection.""" connection.close() + def verify_connection(self, connection): + return connection.connected + def on_poll_init(self, poller): pass diff --git a/kombu/transport/memory.py b/kombu/transport/memory.py index b8fe4058..1c67956d 100644 --- a/kombu/transport/memory.py +++ b/kombu/transport/memory.py @@ -4,9 +4,6 @@ kombu.transport.memory In-memory transport. -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index 18c49971..91603688 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -55,7 +55,7 @@ class Channel(virtual.Channel): msg = self.client.command('findandmodify', 'messages', query={'queue': queue}, sort={'_id': pymongo.ASCENDING}, remove=True) - except errors.OperationFailure as exc: + except errors.OperationFailure, exc: if 'No matching object found' in exc.args[0]: raise Empty() raise diff --git a/kombu/transport/pika.py b/kombu/transport/pika.py deleted file mode 100644 index ce60dd6c..00000000 --- a/kombu/transport/pika.py +++ /dev/null @@ -1,262 +0,0 @@ -""" -kombu.transport.pika -==================== - -Pika transport. - -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - -""" -from __future__ import absolute_import - -import socket - -from operator import attrgetter - -from kombu.exceptions import ( - StdConnectionError, - StdChannelError, - VersionMismatch, -) - -from . import base - -from pika import channel # must be here to raise import error -try: - from pika import asyncore_adapter -except ImportError: - raise VersionMismatch('Kombu only works with pika version 0.5.2') -from pika import blocking_adapter -from pika import connection -from pika import exceptions -from pika.spec import Basic, BasicProperties - - -DEFAULT_PORT = 5672 - - -BASIC_PROPERTIES = ('content_type', 'content_encoding', - 'headers', 'delivery_mode', 'priority', - 'correlation_id', 'reply_to', 'expiration', - 'message_id', 'timestamp', 'type', 'user_id', - 'app_id', 'cluster_id') - - -class Message(base.Message): - - def __init__(self, channel, amqp_message, **kwargs): - channel_id, method, props, body = amqp_message - propdict = dict(zip(BASIC_PROPERTIES, - attrgetter(*BASIC_PROPERTIES)(props))) - - kwargs.update({'body': body, - 'delivery_tag': method.delivery_tag, - 'content_type': props.content_type, - 'content_encoding': props.content_encoding, - 'headers': props.headers, - 'properties': propdict, - 'delivery_info': dict( - consumer_tag=getattr(method, 'consumer_tag', None), - routing_key=method.routing_key, - delivery_tag=method.delivery_tag, - redelivered=method.redelivered, - exchange=method.exchange)}) - - super(Message, self).__init__(channel, **kwargs) - - -class Channel(channel.Channel, base.StdChannel): - Message = Message - - def basic_get(self, queue, no_ack): - method = channel.Channel.basic_get(self, queue=queue, no_ack=no_ack) - # pika returns semi-predicates (GetEmpty/GetOk). - if isinstance(method, Basic.GetEmpty): - return - return None, method, method._properties, method._body - - def queue_purge(self, queue=None, nowait=False): - return channel.Channel.queue_purge(self, queue=queue, nowait=nowait) \ - .message_count - - def basic_publish(self, message, exchange, routing_key, mandatory=False, - immediate=False): - message_data, properties = message - try: - return channel.Channel.basic_publish(self, - exchange, - routing_key, - message_data, - properties, - mandatory, - immediate) - finally: - # Pika does not automatically flush the outbound buffer - # TODO async: Needs to support `nowait`. - self.handler.connection.flush_outbound() - - def basic_consume(self, queue, no_ack=False, consumer_tag=None, - callback=None, nowait=False): - - # Kombu callbacks only take a single `message` argument, - # but pika applies with 4 arguments, so need to wrap - # these into a single tuple. - def _callback_decode(channel, method, header, body): - return callback((channel, method, header, body)) - - return channel.Channel.basic_consume(self, _callback_decode, - queue, no_ack, - False, consumer_tag) - - def prepare_message(self, body, priority=None, - content_type=None, content_encoding=None, headers=None, - properties=None): - properties = BasicProperties(priority=priority, - content_type=content_type, - content_encoding=content_encoding, - headers=headers, - **properties) - return body, properties - - def message_to_python(self, raw_message): - return self.Message(channel=self, amqp_message=raw_message) - - def basic_ack(self, delivery_tag): - return channel.Channel.basic_ack(self, delivery_tag) - - def __enter__(self): - return self - - def __exit__(self, *exc_info): - self.close() - - def close(self): - super(Channel, self).close() - if getattr(self, 'handler', None): - if getattr(self.handler, 'connection', None): - self.handler.connection.channels.pop( - self.handler.channel_number, None) - self.handler.connection = None - self.handler = None - - @property - def channel_id(self): - return self.channel_number - - -class BlockingConnection(blocking_adapter.BlockingConnection): - Super = blocking_adapter.BlockingConnection - - def __init__(self, client, *args, **kwargs): - self.client = client - self.Super.__init__(self, *args, **kwargs) - - def channel(self): - c = Channel(channel.ChannelHandler(self)) - c.connection = self - return c - - def close(self): - self.client = None - self.Super.close(self) - - def ensure_drain_events(self, timeout=None): - return self.drain_events(timeout=timeout) - - -class AsyncoreConnection(asyncore_adapter.AsyncoreConnection): - _event_counter = 0 - Super = asyncore_adapter.AsyncoreConnection - - def __init__(self, client, *args, **kwargs): - self.client = client - self.Super.__init__(self, *args, **kwargs) - - def channel(self): - c = Channel(channel.ChannelHandler(self)) - c.connection = self - return c - - def ensure_drain_events(self, timeout=None): - # asyncore connection does not raise socket.timeout when timing out - # so need to do a little trick here to mimic the behavior - # of sync connection. - current_events = self._event_counter - self.drain_events(timeout=timeout) - if timeout and self._event_counter <= current_events: - raise socket.timeout('timed out') - - def on_data_available(self, buf): - self._event_counter += 1 - self.Super.on_data_available(self, buf) - - def close(self): - self.client = None - self.Super.close(self) - - -class SyncTransport(base.Transport): - Message = Message - Connection = BlockingConnection - - default_port = DEFAULT_PORT - connection_errors = (StdConnectionError, - socket.error, - exceptions.ConnectionClosed, - exceptions.ChannelClosed, - exceptions.LoginError, - exceptions.NoFreeChannels, - exceptions.DuplicateConsumerTag, - exceptions.UnknownConsumerTag, - exceptions.RecursiveOperationDetected, - exceptions.ContentTransmissionForbidden, - exceptions.ProtocolSyntaxError) - channel_errors = (StdChannelError, - exceptions.ChannelClosed, - exceptions.DuplicateConsumerTag, - exceptions.UnknownConsumerTag, - exceptions.ProtocolSyntaxError) - driver_type = 'amqp' - driver_name = 'pika' - - def __init__(self, client, **kwargs): - self.client = client - self.default_port = kwargs.get('default_port', self.default_port) - - def driver_version(self): - import pika - return pika.__version__ - - def create_channel(self, connection): - return connection.channel() - - def drain_events(self, connection, **kwargs): - return connection.ensure_drain_events(**kwargs) - - def establish_connection(self): - """Establish connection to the AMQP broker.""" - conninfo = self.client - for name, default_value in self.default_connection_params.items(): - if not getattr(conninfo, name, None): - setattr(conninfo, name, default_value) - credentials = connection.PlainCredentials(conninfo.userid, - conninfo.password) - return self.Connection(self.client, - connection.ConnectionParameters( - conninfo.hostname, port=conninfo.port, - virtual_host=conninfo.virtual_host, - credentials=credentials)) - - def close_connection(self, connection): - """Close the AMQP broker connection.""" - connection.close() - - @property - def default_connection_params(self): - return {'hostname': 'localhost', 'port': self.default_port, - 'userid': 'guest', 'password': 'guest'} - - -class AsyncoreTransport(SyncTransport): - Connection = AsyncoreConnection diff --git a/kombu/transport/pika2.py b/kombu/transport/pika2.py deleted file mode 100644 index 4015ecf5..00000000 --- a/kombu/transport/pika2.py +++ /dev/null @@ -1,235 +0,0 @@ -""" -kombu.transport.pika -==================== - -Pika transport. - -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - -""" -from __future__ import absolute_import - -import socket - -from operator import attrgetter - -from kombu.exceptions import StdConnectionError, StdChannelError -from kombu.utils.amq_manager import get_manager - -from . import base - -import pika -from pika import spec -from pika.adapters import blocking_connection as blocking -from pika import exceptions - -DEFAULT_PORT = 5672 -BASIC_PROPERTIES = ('content_type', 'content_encoding', - 'headers', 'delivery_mode', 'priority', - 'correlation_id', 'reply_to', 'expiration', - 'message_id', 'timestamp', 'type', 'user_id', - 'app_id', 'cluster_id') - - -class Message(base.Message): - - def __init__(self, channel, amqp_message, **kwargs): - channel_id, method, props, body = amqp_message - propdict = dict(zip(BASIC_PROPERTIES, - attrgetter(*BASIC_PROPERTIES)(props))) - - kwargs.update({'body': body, - 'delivery_tag': method.delivery_tag, - 'content_type': props.content_type, - 'content_encoding': props.content_encoding, - 'headers': props.headers, - 'properties': propdict, - 'delivery_info': dict( - consumer_tag=getattr(method, 'consumer_tag', None), - routing_key=method.routing_key, - delivery_tag=method.delivery_tag, - redelivered=method.redelivered, - exchange=method.exchange)}) - - super(Message, self).__init__(channel, **kwargs) - - -class Channel(blocking.BlockingChannel, base.StdChannel): - Message = Message - - def basic_get(self, queue, no_ack): - method = super(Channel, self).basic_get(self, queue=queue, - no_ack=no_ack) - # pika returns semi-predicates (GetEmpty/GetOk). - if isinstance(method, spec.Basic.GetEmpty): - return - return None, method, method._properties, method._body - - def queue_purge(self, queue=None, nowait=False): - return super(Channel, self).\ - queue_purge(queue=queue, nowait=nowait).method.message_count - - def basic_publish(self, message, exchange, routing_key, mandatory=False, - immediate=False): - body, properties = message - try: - return super(Channel, self).basic_publish(exchange, - routing_key, - body, - properties, - mandatory, - immediate) - finally: - # Pika does not automatically flush the outbound buffer - # TODO async: Needs to support `nowait`. - self.connection._flush_outbound() - - def basic_consume(self, queue, no_ack=False, consumer_tag=None, - callback=None, nowait=False): - - # Kombu callbacks only take a single `message` argument, - # but pika applies with 4 arguments, so need to wrap - # these into a single tuple. - def _callback_decode(channel, method, header, body): - return callback((channel, method, header, body)) - - return super(Channel, self).basic_consume( - _callback_decode, queue, no_ack, False, consumer_tag) - - def prepare_message(self, body, priority=None, - content_type=None, content_encoding=None, headers=None, - properties=None): - properties = spec.BasicProperties(priority=priority, - content_type=content_type, - content_encoding=content_encoding, - headers=headers) - return body, properties - - def message_to_python(self, raw_message): - return self.Message(channel=self, amqp_message=raw_message) - - def basic_qos(self, prefetch_size, prefetch_count, a_global=False): - return super(Channel, self).basic_qos(prefetch_size=prefetch_size, - prefetch_count=prefetch_count, - global_=a_global) - - def __enter__(self): - return self - - def __exit__(self, *exc_info): - self.close() - - def close(self, *args): - super(Channel, self).close(*args) - self.connection = None - if getattr(self, 'handler', None): - if getattr(self.handler, 'connection', None): - self.handler.connection.channels.pop( - self.handler.channel_number, None) - self.handler.connection = None - self.handler = None - - @property - def channel_id(self): - return self.channel_number - - -class Connection(blocking.BlockingConnection): - Channel = Channel - - def __init__(self, client, *args, **kwargs): - self.client = client - super(Connection, self).__init__(*args, **kwargs) - - def channel(self): - self._channel_open = False - cid = self._next_channel_number() - - self.callbacks.add(cid, spec.Channel.CloseOk, self._on_channel_close) - transport = blocking.BlockingChannelTransport(self, cid) - channel = self._channels[cid] = self.Channel(self, cid, transport) - channel.connection = self - return channel - - def drain_events(self, timeout=None): - if timeout: - prev = self.socket.gettimeout() - self.socket.settimeout(timeout) - try: - self._handle_read() - finally: - if timeout: - self.socket.settimeout(prev) - self._flush_outbound() - - def close(self, *args): - self.client = None - super(Connection, self).close(*args) - - -AuthenticationError = getattr(exceptions, 'AuthenticationError', - getattr(exceptions, 'LoginError')) - - -class Transport(base.Transport): - Message = Message - Connection = Connection - - default_port = DEFAULT_PORT - connection_errors = (StdConnectionError, - socket.error, - exceptions.ConnectionClosed, - exceptions.ChannelClosed, - AuthenticationError, - exceptions.NoFreeChannels, - exceptions.DuplicateConsumerTag, - exceptions.UnknownConsumerTag, - exceptions.RecursiveOperationDetected, - exceptions.ProtocolSyntaxError) - channel_errors = (StdChannelError, - exceptions.ChannelClosed, - exceptions.DuplicateConsumerTag, - exceptions.UnknownConsumerTag, - exceptions.ProtocolSyntaxError) - driver_type = 'amqp' - driver_name = 'pika' - - def __init__(self, client, **kwargs): - self.client = client - self.default_port = kwargs.get('default_port', self.default_port) - - def driver_version(self): - return pika.__version__ - - def create_channel(self, connection): - return connection.channel() - - def drain_events(self, connection, **kwargs): - return connection.drain_events(**kwargs) - - def establish_connection(self): - """Establish connection to the AMQP broker.""" - conninfo = self.client - for name, default_value in self.default_connection_params.items(): - if not getattr(conninfo, name, None): - setattr(conninfo, name, default_value) - credentials = pika.PlainCredentials(conninfo.userid, - conninfo.password) - return self.Connection(self.client, - pika.ConnectionParameters( - conninfo.hostname, port=conninfo.port, - virtual_host=conninfo.virtual_host, - credentials=credentials)) - - def close_connection(self, connection): - """Close the AMQP broker connection.""" - connection.close() - - def get_manager(self, *args, **kwargs): - return get_manager(self.client, *args, **kwargs) - - @property - def default_connection_params(self): - return {'hostname': 'localhost', 'port': self.default_port, - 'userid': 'guest', 'password': 'guest'} diff --git a/kombu/transport/pyamqp.py b/kombu/transport/pyamqp.py index d03178e0..eec7d744 100644 --- a/kombu/transport/pyamqp.py +++ b/kombu/transport/pyamqp.py @@ -4,9 +4,6 @@ kombu.transport.pyamqp pure python amqp transport. -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import @@ -23,7 +20,7 @@ from . import base DEFAULT_PORT = 5672 -if amqp.VERSION < (0, 9, 3): +if amqp.VERSION < (0, 9, 3): # pragma: no cover raise VersionMismatch('Please install amqp version 0.9.3 or higher.') diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 4badf8fd..dc242bae 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -4,9 +4,6 @@ kombu.transport.redis Redis transport. -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import @@ -399,8 +396,8 @@ class Channel(virtual.Channel): c = self.subclient if c.connection._sock is None: c.connection.connect() - self.subclient.subscribe(keys) self._in_listen = True + self.subclient.subscribe(keys) def _handle_message(self, client, r): if r[0] == 'unsubscribe' and r[2] == 0: @@ -432,8 +429,8 @@ class Channel(virtual.Channel): return keys = [self._q_for_pri(queue, pri) for pri in PRIORITY_STEPS for queue in queues] + [timeout or 0] - self.client.connection.send_command('BRPOP', *keys) self._in_poll = True + self.client.connection.send_command('BRPOP', *keys) def _brpop_read(self, **options): try: @@ -563,7 +560,7 @@ class Channel(virtual.Channel): pass super(Channel, self).close() - def _create_client(self): + def _connparams(self): conninfo = self.connection.client database = conninfo.virtual_host if not isinstance(database, int): @@ -576,15 +573,16 @@ class Channel(virtual.Channel): except ValueError: raise ValueError( 'Database name must be int between 0 and limit - 1') + return {'host': conninfo.hostname or '127.0.0.1', + 'port': conninfo.port or DEFAULT_PORT, + 'db': database, + 'password': conninfo.password} - return self.Client(host=conninfo.hostname or '127.0.0.1', - port=conninfo.port or DEFAULT_PORT, - db=database, - password=conninfo.password, - connection_pool=self.pool) + def _create_client(self): + return self.Client(connection_pool=self.pool) def _get_pool(self): - return redis.ConnectionPool() + return redis.ConnectionPool(**self._connparams()) def _get_client(self): if redis.VERSION < (2, 4, 4): diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index 4bf13f85..d7258b2e 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -6,9 +6,6 @@ Virtual transport implementation. Emulates the AMQ API for non-AMQ transports. -:copyright: (c) 2009, 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import, unicode_literals @@ -176,7 +173,7 @@ class QoS(object): try: self.channel._restore(message) - except BaseException as exc: + except BaseException, exc: errors.append((exc, message)) delivered.clear() return errors diff --git a/kombu/transport/virtual/exchange.py b/kombu/transport/virtual/exchange.py index fed253ba..bbc6a7b1 100644 --- a/kombu/transport/virtual/exchange.py +++ b/kombu/transport/virtual/exchange.py @@ -5,9 +5,6 @@ kombu.transport.virtual.exchange Implementations of the standard exchanges defined by the AMQ protocol (excluding the `headers` exchange). -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import diff --git a/kombu/transport/virtual/scheduling.py b/kombu/transport/virtual/scheduling.py index 7304f67b..bf92a3a8 100644 --- a/kombu/transport/virtual/scheduling.py +++ b/kombu/transport/virtual/scheduling.py @@ -4,9 +4,6 @@ Consumer utilities. - :copyright: (c) 2009 - 2012 by Ask Solem. - :license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import diff --git a/kombu/transport/zmq.py b/kombu/transport/zmq.py index 1c7ff4ae..c03a78fc 100644 --- a/kombu/transport/zmq.py +++ b/kombu/transport/zmq.py @@ -11,7 +11,11 @@ import errno import os import socket -import zmq +try: + import zmq + from zmq import ZMQError +except ImportError: + zmq = ZMQError = None # noqa from kombu.exceptions import StdConnectionError, StdChannelError from kombu.five import Empty @@ -128,9 +132,9 @@ class Client(object): def get(self, queue=None, timeout=None): try: return self.sink.recv(flags=zmq.NOBLOCK) - except zmq.ZMQError as exc: - if exc.errno == zmq.EAGAIN: - raise socket.error(errno.EAGAIN, exc.strerror) + except ZMQError, e: + if e.errno == zmq.EAGAIN: + raise socket.error(errno.EAGAIN, e.strerror) else: raise @@ -177,7 +181,7 @@ class Channel(virtual.Channel): def _get(self, queue, timeout=None): try: return loads(self.client.get(queue, timeout)) - except socket.error as exc: + except socket.error, exc: if exc.errno == errno.EAGAIN and timeout != 0: raise Empty() else: @@ -226,7 +230,7 @@ class Transport(virtual.Transport): driver_type = 'zeromq' driver_name = 'zmq' - connection_errors = (StdConnectionError, zmq.ZMQError,) + connection_errors = (StdConnectionError, ZMQError,) channel_errors = (StdChannelError, ) supports_ev = True @@ -234,8 +238,9 @@ class Transport(virtual.Transport): nb_keep_draining = True def __init__(self, *args, **kwargs): + if zmq is None: + raise ImportError('The zmq library is not installed') super(Transport, self).__init__(*args, **kwargs) - self.cycle = MultiChannelPoller() def driver_version(self): @@ -258,8 +263,8 @@ class Transport(virtual.Transport): for channel in connection.channels: try: evt = channel.cycle.get(timeout=timeout) - except socket.error as exc: - if exc.errno == errno.EAGAIN: + except socket.error, e: + if e.errno == errno.EAGAIN: continue raise else: diff --git a/kombu/transport/zookeeper.py b/kombu/transport/zookeeper.py index d7655797..8eb61185 100644 --- a/kombu/transport/zookeeper.py +++ b/kombu/transport/zookeeper.py @@ -34,7 +34,6 @@ Zookeeper transport. """ from __future__ import absolute_import -import kazoo import socket from anyjson import loads, dumps @@ -44,6 +43,41 @@ from kombu.five import Empty from . import virtual +try: + import kazoo + + KZ_CONNECTION_ERRORS = ( + kazoo.zkclient.SystemErrorException, + kazoo.zkclient.ConnectionLossException, + kazoo.zkclient.MarshallingErrorException, + kazoo.zkclient.UnimplementedException, + kazoo.zkclient.OperationTimeoutException, + kazoo.zkclient.NoAuthException, + kazoo.zkclient.InvalidACLException, + kazoo.zkclient.AuthFailedException, + kazoo.zkclient.SessionExpiredException, + ) + + KZ_CHANNEL_ERRORS = ( + kazoo.zkclient.RuntimeInconsistencyException, + kazoo.zkclient.DataInconsistencyException, + kazoo.zkclient.BadArgumentsException, + kazoo.zkclient.MarshallingErrorException, + kazoo.zkclient.UnimplementedException, + kazoo.zkclient.OperationTimeoutException, + kazoo.zkclient.ApiErrorException, + kazoo.zkclient.NoNodeException, + kazoo.zkclient.NoAuthException, + kazoo.zkclient.NodeExistsException, + kazoo.zkclient.NoChildrenForEphemeralsException, + kazoo.zkclient.NotEmptyException, + kazoo.zkclient.SessionExpiredException, + kazoo.zkclient.InvalidCallbackException, + ) +except ImportError: + kazoo = None # noqa + KZ_CONNECTION_ERRORS = KZ_CHANNEL_ERRORS = () # noqa + DEFAULT_PORT = 2181 __author__ = 'Mahendra M <mahendra.m@gmail.com>' @@ -134,35 +168,16 @@ class Transport(virtual.Transport): Channel = Channel polling_interval = 1 default_port = DEFAULT_PORT - connection_errors = (StdConnectionError, - socket.error, - kazoo.zkclient.SystemErrorException, - kazoo.zkclient.ConnectionLossException, - kazoo.zkclient.MarshallingErrorException, - kazoo.zkclient.UnimplementedException, - kazoo.zkclient.OperationTimeoutException, - kazoo.zkclient.NoAuthException, - kazoo.zkclient.InvalidACLException, - kazoo.zkclient.AuthFailedException, - kazoo.zkclient.SessionExpiredException) - - channel_errors = (StdChannelError, - kazoo.zkclient.RuntimeInconsistencyException, - kazoo.zkclient.DataInconsistencyException, - kazoo.zkclient.BadArgumentsException, - kazoo.zkclient.MarshallingErrorException, - kazoo.zkclient.UnimplementedException, - kazoo.zkclient.OperationTimeoutException, - kazoo.zkclient.ApiErrorException, - kazoo.zkclient.NoNodeException, - kazoo.zkclient.NoAuthException, - kazoo.zkclient.NodeExistsException, - kazoo.zkclient.NoChildrenForEphemeralsException, - kazoo.zkclient.NotEmptyException, - kazoo.zkclient.SessionExpiredException, - kazoo.zkclient.InvalidCallbackException) + connection_errors = (StdConnectionError, ) + KZ_CONNECTION_ERRORS + channel_errors = (StdChannelError, socket.error) + KZ_CHANNEL_ERRORS driver_type = 'zookeeper' driver_name = 'kazoo' + def __init__(self, *args, **kwargs): + if kazoo is None: + raise ImportError('The kazoo library is not installed') + + super(Transport, self).__init__(*args, **kwargs) + def driver_version(self): return kazoo.__version__ diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py index e18cc269..70f75573 100644 --- a/kombu/utils/__init__.py +++ b/kombu/utils/__init__.py @@ -4,9 +4,6 @@ kombu.utils Internal utilities. -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import, print_function @@ -221,17 +218,17 @@ def retry_over_time(fun, catch, args=[], kwargs={}, errback=None, for retries in count(): try: return fun(*args, **kwargs) - except catch as exc: + except catch, exc: if max_retries is not None and retries > max_retries: raise if callback: callback() tts = errback(exc, interval_range, retries) if errback else None if tts: - for i in fxrange(stop=tts): - if i and callback: + for i in range(int(tts / interval_step)): + if callback: callback() - sleep(i) + sleep(interval_step) def emergency_dump_state(state, open_file=open, dump=None): diff --git a/kombu/utils/amq_manager.py b/kombu/utils/amq_manager.py index 0bb9ce4c..62a7a95a 100644 --- a/kombu/utils/amq_manager.py +++ b/kombu/utils/amq_manager.py @@ -6,10 +6,10 @@ def get_manager(client, hostname=None, port=None, userid=None, import pyrabbit opt = client.transport_options.get host = (hostname if hostname is not None - else opt('manager_hostname', client.hostname)) + else opt('manager_hostname', client.hostname or 'localhost')) port = port if port is not None else opt('manager_port', 55672) return pyrabbit.Client('%s:%s' % (host, port), userid if userid is not None - else opt('manager_userid', client.userid), + else opt('manager_userid', client.userid or 'guest'), password if password is not None - else opt('manager_password', client.password)) + else opt('manager_password', client.password or 'guest')) diff --git a/kombu/utils/compat.py b/kombu/utils/compat.py index 2167f03c..a2f2e984 100644 --- a/kombu/utils/compat.py +++ b/kombu/utils/compat.py @@ -4,9 +4,6 @@ kombu.utils.compat Helps compatibility with older Python versions. -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import diff --git a/kombu/utils/debug.py b/kombu/utils/debug.py index 85793c21..aeea4e4f 100644 --- a/kombu/utils/debug.py +++ b/kombu/utils/debug.py @@ -4,9 +4,6 @@ kombu.utils.debug Debugging support. -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import diff --git a/kombu/utils/encoding.py b/kombu/utils/encoding.py index 83acc1a4..64eb5a43 100644 --- a/kombu/utils/encoding.py +++ b/kombu/utils/encoding.py @@ -7,9 +7,6 @@ Utilities to encode text, and to safely emit text from running applications without crashing with the infamous :exc:`UnicodeDecodeError` exception. -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import diff --git a/kombu/utils/eventio.py b/kombu/utils/eventio.py index 5e276a3d..9b5458fa 100644 --- a/kombu/utils/eventio.py +++ b/kombu/utils/eventio.py @@ -4,9 +4,6 @@ kombu.utils.eventio Evented IO support for multiple platforms. -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import @@ -73,7 +70,7 @@ class Poller(object): def poll(self, timeout): try: return self._poll(timeout) - except Exception as exc: + except Exception, exc: if get_errno(exc) != errno.EINTR: raise @@ -86,7 +83,7 @@ class _epoll(Poller): def register(self, fd, events): try: self._epoll.register(fd, events) - except Exception as exc: + except Exception, exc: if get_errno(exc) != errno.EEXIST: raise @@ -97,7 +94,7 @@ class _epoll(Poller): pass except ValueError: pass - except IOError as exc: + except IOError, exc: if get_errno(exc) != errno.ENOENT: raise diff --git a/kombu/utils/limits.py b/kombu/utils/limits.py index c488e083..2fe8e2e5 100644 --- a/kombu/utils/limits.py +++ b/kombu/utils/limits.py @@ -4,9 +4,6 @@ kombu.utils.limits Token bucket implementation for rate limiting. -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - """ from __future__ import absolute_import diff --git a/requirements/funtest.txt b/requirements/funtest.txt index 8fdd9bef..6ac859b7 100644 --- a/requirements/funtest.txt +++ b/requirements/funtest.txt @@ -7,9 +7,6 @@ pymongo # CouchDB transport couchdb -# Pika transport -pika==0.5.2 - # Beanstalk transport beanstalkc @@ -6,15 +6,17 @@ cover3-branch = 1 cover3-html = 1 cover3-package = kombu cover3-exclude = kombu + kombu.transport.mongodb + kombu.transport.filesystem kombu.utils.compat kombu.utils.eventio kombu.utils.finalize - kombu.transport.pika kombu.transport.couchdb - kombu.transport.mongodb kombu.transport.beanstalk + kombu.transport.sqlalchemy kombu.transport.SQS kombu.transport.zookeeper + kombu.transport.zmq [build_sphinx] source-dir = docs/ @@ -42,8 +42,6 @@ commands = {toxinidir}/extra/release/removepyc.sh {toxinidir} --cover3-xml-file={toxinidir}/coverage.xml \ --cover3-package=kombu \ --cover3-exclude="kombu kombu.utils.* \ - kombu.transport.pypika \ - kombu.transport.pycouchdb \ kombu.transport.mongodb \ kombu.transport.beanstalk \ kombu.transport.zookeeper" \ |