diff options
author | Ask Solem <ask@celeryproject.org> | 2012-08-29 15:46:46 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2012-08-29 15:46:46 +0100 |
commit | f1fe4e6881916e6993d7aa548f9d8601562a225e (patch) | |
tree | 9a4bb4ef2c84bbbe916f1f60c54101ec2b6ea9da | |
parent | c0244d514efe1694576683b6355ab123b809f620 (diff) | |
parent | b7c5f069a42aa400fffdf3c49f626fe7871ab8e4 (diff) | |
download | kombu-asynchronous-broadcasts.tar.gz |
Merge branch 'master' into asynchronous-broadcastsasynchronous-broadcasts
37 files changed, 1041 insertions, 94 deletions
@@ -13,6 +13,7 @@ Anton Gyllenberg <anton@iki.fi> Ask Solem <ask@celeryproject.org> Bobby Beever <bobby.beever@yahoo.com> Brian Bernstein +C Anthony Risinger <anthony+corvisa.com@xtfx.me> Christophe Chauvet <christophe.chauvet@gmail.com> Christopher Grebs <cg@webshox.org> Clay Gerrard <clay.gerrard@gmail.com> @@ -4,6 +4,164 @@ Change history ================ +.. _version-2.4.3: + +2.4.3 +===== +:release-date: 2012-08-25 10:30 P.M BST + +- Fixed problem with amqp transport alias (Issue #154). + +.. _version-2.4.2: + +2.4.2 +===== +:release-date: 2012-08-24 05:00 P.M BST + +- Having an empty transport name broke in 2.4.1. + + +.. _version-2.4.1: + +2.4.1 +===== +:release-date: 2012-08-24 04:00 P.M BST + +- Redis: Fixed race condition that could cause the consumer to crash (Issue #151) + + Often leading to the error message ``"could not convert string to float"`` + +- Connection retry could cause an inifite loop (Issue #145). + +- The ``amqp`` alias is now resolved at runtime, so that eventlet detection + works even if patching was done later. + +.. _version-2.4.0: + +2.4.0 +===== +:release-date: 2012-08-17 08:00 P.M BST + +- New experimental :mod:`ZeroMQ <kombu.transport.zmq` transport. + + Contributed by John Watson. + +- Redis: Ack timed-out messages were not restored when using the eventloop. + +- Now uses pickle protocol 2 by default to be cross-compatible with Python 3. + + The protocol can also now be changed using the :envvar:`PICKLE_PROTOCOL` + environment variable. + +- Adds ``Transport.supports_ev`` attribute. + +- Pika: Queue purge was not working properly. + + Fix contributed by Steeve Morin. + +- Pika backend was no longer working since Kombu 2.3 + + Fix contributed by Steeve Morin. + +.. _version-2.3.2: + +2.3.2 +===== +:release-date: 2012-08-01 06:00 P.M BST + +- Fixes problem with deserialization in Python 3. + +.. _version-2.3.1: + +2.3.1 +===== +:release-date: 2012-08-01 04:00 P.M BST + +- librabbitmq: Can now handle messages that does not have a + content_encoding/content_type set (Issue #149). + + Fix contributed by C Anthony Risinger. + +- Beanstalk: Now uses localhost by default if the URL does not contain a host. + +.. _version-2.3.0: + +2.3.0 +===== +:release-date: 2012-07-24 03:50 P.M BST + +- New ``pyamqp://`` transport! + + The new `py-amqp`_ library is a fork of amqplib started with the + following goals: + + - Uses AMQP 0.9.1 instead of 0.8 + - Should support all RabbitMQ extensions + - API compatible with :mod:`librabbitmq` so that it can be used + as a pure-python replacement in environments where rabbitmq-c cannot + be compiled. + + .. _`py-amqp`: http://amqp.readthedocs.org/ + + If you start using use py-amqp instead of amqplib you can enjoy many + advantages including: + + - Heartbeat support (Issue #79 + Issue #131) + - Consumer Cancel Notifications (Issue #131) + - Publisher Confirms + + amqplib has not been updated in a long while, so maintaining our own fork + ensures that we can quickly roll out new features and fixes without + resorting to monkey patching. + + To use the py-amqp transport you must install the :mod:`amqp` library:: + + $ pip install amqp + + and change the connection URL to use the correct transport:: + + >>> conn = Connection('pyamqp://guest:guest@localhost//') + + + The ``pyamqp://`` transport will be the default fallback transport + in Kombu version 3.0, when :mod:`librabbitmq` is not installed, + and librabbitmq will also be updated to support the same features. + +- Connection now supports heartbeat argument. + + If enabled you must make sure to manually maintain heartbeats + by calling the ``Connection.heartbeat_check`` at twice the rate + of the specified heartbeat interval. + + E.g. if you have ``Connection(heartbeat=10)``, + then you must call ``Connection.heartbeat_check()`` every 5 seconds. + + if the server has not sent heartbeats at a suitable rate then + the heartbeat check method must raise an error that is listed + in ``Connection.connection_errors``. + + The attribute ``Connection.supports_heartbeats`` has been added + for the ability to inspect if a transport supports heartbeats + or not. + + Calling ``heartbeat_check`` on a transport that does + not support heartbeats results in a noop operation. + +- SQS: Fixed bug with invalid characters in queue names. + + Fix contributed by Zach Smith. + +- utils.reprcall: Fixed typo where kwargs argument was an empty tuple by + default, and not an empty dict. + +.. _version-2.2.6: + +2.2.6 +===== +:release-date: 2012-07-10 17:00 P.M BST + +- Adds ``messaging.entry_to_queue`` for compat with previous versions. + .. _version-2.2.5: 2.2.5 @@ -2,7 +2,7 @@ kombu - Messaging Framework for Python ======================================== -:Version: 2.2.5 +:Version: 2.4.3 `Kombu` is a messaging framework for Python. diff --git a/docs/reference/index.rst b/docs/reference/index.rst index 551ea1b7..c6186341 100644 --- a/docs/reference/index.rst +++ b/docs/reference/index.rst @@ -22,10 +22,12 @@ kombu.transport kombu.transport.amqplib kombu.transport.librabbitmq + kombu.transport.pyamqp kombu.transport.pika kombu.transport.pika2 kombu.transport.memory kombu.transport.redis + kombu.transport.zmq kombu.transport.beanstalk kombu.transport.mongodb kombu.transport.couchdb diff --git a/docs/reference/kombu.transport.pyamqp.rst b/docs/reference/kombu.transport.pyamqp.rst new file mode 100644 index 00000000..33ebf0b4 --- /dev/null +++ b/docs/reference/kombu.transport.pyamqp.rst @@ -0,0 +1,36 @@ +.. currentmodule:: kombu.transport.pyamqp + +.. automodule:: kombu.transport.pyamqp + + .. contents:: + :local: + + Transport + --------- + + .. autoclass:: Transport + :members: + :undoc-members: + + Connection + ---------- + + .. autoclass:: Connection + :members: + :undoc-members: + :inherited-members: + + Channel + ------- + + .. autoclass:: Channel + :members: + :undoc-members: + + Message + ------- + + .. autoclass:: Message + :members: + :undoc-members: + diff --git a/docs/reference/kombu.transport.zmq.rst b/docs/reference/kombu.transport.zmq.rst new file mode 100644 index 00000000..0735840b --- /dev/null +++ b/docs/reference/kombu.transport.zmq.rst @@ -0,0 +1,9 @@ +.. currentmodule:: kombu.transport.zmq + +.. automodule:: kombu.transport.zmq + + .. contents:: + :local: + + :members: + :undoc-members: diff --git a/docs/userguide/serialization.rst b/docs/userguide/serialization.rst index c39cc3cc..d07c98a5 100644 --- a/docs/userguide/serialization.rst +++ b/docs/userguide/serialization.rst @@ -44,6 +44,10 @@ Each option has its advantages and disadvantages. smaller messages when sending binary files, and a slight speedup over `JSON` processing. + By default Kombu uses pickle protocol 2, but this can be changed + using the :envvar:`PICKLE_PROTOCOL` environment variable or by changing + the global :data:`kombu.serialization.pickle_protocol` flag. + `yaml` -- YAML has many of the same characteristics as `json`, except that it natively supports more data types (including dates, recursive references, etc.) diff --git a/funtests/tests/test_pyamqp.py b/funtests/tests/test_pyamqp.py new file mode 100644 index 00000000..8d493a80 --- /dev/null +++ b/funtests/tests/test_pyamqp.py @@ -0,0 +1,6 @@ +from funtests import transport + + +class test_pyamqp(transport.TransportCase): + transport = "pyamqp" + prefix = "pyamqp" diff --git a/kombu/__init__.py b/kombu/__init__.py index 21655fa1..6a1e2632 100644 --- a/kombu/__init__.py +++ b/kombu/__init__.py @@ -1,7 +1,7 @@ """Messaging Framework for Python""" from __future__ import absolute_import -VERSION = (2, 2, 5) +VERSION = (2, 4, 3) __version__ = '.'.join(map(str, VERSION[0:3])) + ''.join(VERSION[3:]) __author__ = 'Ask Solem' __contact__ = 'ask@celeryproject.org' diff --git a/kombu/clocks.py b/kombu/clocks.py index a45bc767..aecb68fb 100644 --- a/kombu/clocks.py +++ b/kombu/clocks.py @@ -11,7 +11,7 @@ Logical Clocks and Synchronization. from __future__ import absolute_import from __future__ import with_statement -from threading import Lock +import threading __all__ = ['LamportClock'] @@ -57,7 +57,7 @@ class LamportClock(object): def __init__(self, initial_value=0): self.value = initial_value - self.mutex = Lock() + self.mutex = threading.Lock() def adjust(self, other): with self.mutex: diff --git a/kombu/compat.py b/kombu/compat.py index 88874aad..88aedaa9 100644 --- a/kombu/compat.py +++ b/kombu/compat.py @@ -19,6 +19,9 @@ from .entity import Exchange, Queue __all__ = ['Publisher', 'Consumer'] +# XXX compat attribute +entry_to_queue = Queue.from_dict + def _iterconsume(connection, consumer, no_ack=False, limit=None): consumer.consume(no_ack=no_ack) diff --git a/kombu/compression.py b/kombu/compression.py index 5c88fee4..c76adfc8 100644 --- a/kombu/compression.py +++ b/kombu/compression.py @@ -38,7 +38,7 @@ def register(encoder, decoder, content_type, aliases=[]): def encoders(): """Returns a list of available compression methods.""" - return _encoders.keys() + return list(_encoders) def get_encoder(t): diff --git a/kombu/connection.py b/kombu/connection.py index 5c272ddf..ac90fd2d 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -24,16 +24,19 @@ from Queue import Empty # (Issue #112) from kombu import exceptions from .log import get_logger -from .transport import AMQP_ALIAS, get_transport_cls +from .transport import get_transport_cls, supports_librabbitmq from .utils import cached_property, retry_over_time from .utils.compat import OrderedDict, LifoQueue as _LifoQueue from .utils.url import parse_url +RESOLVE_ALIASES = {'amqplib': 'amqp', + 'librabbitmq': 'amqp'} + _LOG_CONNECTION = os.environ.get('KOMBU_LOG_CONNECTION', False) _LOG_CHANNEL = os.environ.get('KOMBU_LOG_CHANNEL', False) __all__ = ['Connection', 'ConnectionPool', 'ChannelPool'] -URI_PASSTHROUGH = frozenset(['sqla', 'sqlalchemy']) +URI_PASSTHROUGH = frozenset(['sqla', 'sqlalchemy', 'zeromq', 'zmq']) logger = get_logger(__name__) @@ -56,7 +59,12 @@ class Connection(object): :keyword transport_options: A dict of additional connection arguments to pass to alternate kombu channel implementations. Consult the transport documentation for available options. - :keyword insist: *Deprecated* + :keyword heartbeat: Heartbeat interval in int/float seconds. + Note that if heartbeats are enabled then the :meth:`heartbeat_check` + method must be called at an interval twice the frequency of the + heartbeat: e.g. if the heartbeat is 10, then the heartbeats must be + checked every 5 seconds (the rate can also be controlled by + the ``rate`` argument to :meth:`heartbeat_check``). .. note:: @@ -94,13 +102,13 @@ class Connection(object): password=None, virtual_host=None, port=None, insist=False, ssl=False, transport=None, connect_timeout=5, transport_options=None, login_method=None, uri_prefix=None, - **kwargs): + heartbeat=0, **kwargs): # have to spell the args out, just to get nice docstrings :( params = {'hostname': hostname, 'userid': userid, 'password': password, 'virtual_host': virtual_host, 'port': port, 'insist': insist, 'ssl': ssl, 'transport': transport, 'connect_timeout': connect_timeout, - 'login_method': login_method} + 'login_method': login_method, 'heartbeat': heartbeat} if hostname and '://' in hostname: if '+' in hostname[:hostname.index('://')]: # e.g. sqla+mysql://root:masterkey@localhost/ @@ -127,7 +135,10 @@ class Connection(object): self.declared_entities = set() def _init_params(self, hostname, userid, password, virtual_host, port, - insist, ssl, transport, connect_timeout, login_method): + insist, ssl, transport, connect_timeout, login_method, heartbeat): + transport = transport or 'amqp' + if transport == 'amqp' and supports_librabbitmq(): + transport = 'librabbitmq' self.hostname = hostname self.userid = userid self.password = password @@ -138,6 +149,7 @@ class Connection(object): self.connect_timeout = connect_timeout self.ssl = ssl self.transport_cls = transport + self.heartbeat = heartbeat def _debug(self, msg, ident='[Kombu connection:0x%(id)x] ', **kwargs): if self._logger: # pragma: no cover @@ -159,6 +171,20 @@ class Connection(object): '[Kombu channel:%(channel_id)s] ') return chan + def heartbeat_check(self, rate=2): + """Verify that hartbeats are sent and received. + + If the current transport does not support heartbeats then + this is a noop operation. + + :keyword rate: Rate is how often the tick is called + compared to the actual heartbeat value. E.g. if + the heartbeat is set to 3 seconds, and the tick + is called every 3 / 2 seconds, then the rate is 2. + + """ + return self.transport.heartbeat_check(self.connection, rate=rate) + def drain_events(self, **kwargs): """Wait for a single event from the server. @@ -379,11 +405,12 @@ class Connection(object): def clone(self, **kwargs): """Create a copy of the connection with the same connection settings.""" - return self.__class__(**dict(self._info(), **kwargs)) + return self.__class__(**dict(self._info(resolve=False), **kwargs)) - def _info(self): - transport_cls = self.transport_cls or 'amqp' - transport_cls = {AMQP_ALIAS: 'amqp'}.get(transport_cls, transport_cls) + def _info(self, resolve=True): + transport_cls = self.transport_cls + if resolve: + transport_cls = RESOLVE_ALIASES.get(transport_cls, transport_cls) D = self.transport.default_connection_params hostname = self.hostname or D.get('hostname') if self.uri_prefix: @@ -399,7 +426,8 @@ class Connection(object): ('connect_timeout', self.connect_timeout), ('transport_options', self.transport_options), ('login_method', self.login_method or D.get('login_method')), - ('uri_prefix', self.uri_prefix)) + ('uri_prefix', self.uri_prefix), + ('heartbeat', self.heartbeat)) return info def info(self): @@ -631,8 +659,12 @@ class Connection(object): return self.transport.eventmap(self.connection) @property + def supports_heartbeats(self): + return self.transport.supports_heartbeats + + @property def is_evented(self): - return getattr(self.transport, 'on_poll_start', None) + return self.transport.supports_ev BrokerConnection = Connection diff --git a/kombu/messaging.py b/kombu/messaging.py index c441df4c..c177c088 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -20,6 +20,9 @@ from .utils import maybe_list __all__ = ['Exchange', 'Queue', 'Producer', 'Consumer'] +# XXX compat attribute +entry_to_queue = Queue.from_dict + class Producer(object): """Message Producer. diff --git a/kombu/pidbox.py b/kombu/pidbox.py index 8859dc35..49b4d4f4 100644 --- a/kombu/pidbox.py +++ b/kombu/pidbox.py @@ -101,6 +101,7 @@ class Node(object): arguments = body.get('arguments') if not destination or self.hostname in destination: return self.dispatch(method, arguments, reply_to) + dispatch_from_message = handle_message def reply(self, data, exchange, routing_key, **kwargs): self.mailbox._publish_reply(data, exchange, routing_key, diff --git a/kombu/serialization.py b/kombu/serialization.py index 94a57ae1..5942408b 100644 --- a/kombu/serialization.py +++ b/kombu/serialization.py @@ -11,6 +11,7 @@ Serialization utilities. from __future__ import absolute_import import codecs +import os import sys import pickle as pypickle @@ -57,16 +58,24 @@ else: # cPickle.loads does not support buffer() objects, # but we can just create a StringIO and use load. if sys.version_info[0] == 3: - from io import StringIO + from io import BytesIO else: try: - from cStringIO import StringIO # noqa + from cStringIO import StringIO as BytesIO # noqa except ImportError: - from StringIO import StringIO # noqa + from StringIO import StringIO as BytesIO # noqa + +#: Kombu requires Python 2.5 or later so we use protocol 2 by default. +#: There's a new protocol (3) but this is only supported by Python 3. +pickle_protocol = int(os.environ.get('PICKLE_PROTOCOL', 2)) + +#: Kombu requires Python 2.5 or later so we use protocol 2 by default. +#: There's a new protocol (3) but this is only supported by Python 3. +pickle_protocol = int(os.environ.get('PICKLE_PROTOCOL', 2)) def pickle_loads(s, load=pickle_load): - return load(StringIO(s)) + return load(BytesIO(s)) class SerializerRegistry(object): @@ -327,7 +336,11 @@ else: def register_pickle(): """The fastest serialization method, but restricts you to python clients.""" - registry.register('pickle', pickle.dumps, unpickle, + + def dumps(obj, dumper=pickle.dumps): + return dumper(obj, protocol=pickle_protocol) + + registry.register('pickle', dumps, unpickle, content_type='application/x-python-serialize', content_encoding='binary') diff --git a/kombu/tests/test_serialization.py b/kombu/tests/test_serialization.py index 1a95a661..c4b8bd81 100644 --- a/kombu/tests/test_serialization.py +++ b/kombu/tests/test_serialization.py @@ -7,7 +7,7 @@ import sys from kombu.serialization import (registry, register, SerializerNotInstalled, raw_encode, register_yaml, register_msgpack, - decode, bytes_t, pickle, + decode, bytes_t, pickle, pickle_protocol, unregister, register_pickle) from .utils import TestCase @@ -37,7 +37,7 @@ json_data = ('{"int": 10, "float": 3.1415926500000002, ' 'th\\u00e9 lazy dog"}') # Pickle serialization tests -pickle_data = pickle.dumps(py_data) +pickle_data = pickle.dumps(py_data, protocol=pickle_protocol) # YAML serialization tests yaml_data = ('float: 3.1415926500000002\nint: 10\n' @@ -204,9 +204,9 @@ class test_Serialization(TestCase): content_encoding='binary')) def test_pickle_encode(self): - self.assertEqual(pickle_data, - registry.encode(py_data, - serializer='pickle')[-1]) + self.assertEqual(pickle.loads(pickle_data), + pickle.loads(registry.encode(py_data, + serializer='pickle')[-1])) def test_register(self): register(None, None, None, None) diff --git a/kombu/tests/transport/test_pyamqp.py b/kombu/tests/transport/test_pyamqp.py new file mode 100644 index 00000000..b48ec153 --- /dev/null +++ b/kombu/tests/transport/test_pyamqp.py @@ -0,0 +1,162 @@ +from __future__ import absolute_import + +import sys + +from nose import SkipTest + +try: + import amqp # noqa +except ImportError: + pyamqp = None # noqa +else: + from kombu.transport import pyamqp +from kombu.connection import Connection + +from kombu.tests.utils import TestCase +from kombu.tests.utils import mask_modules, Mock + + +class MockConnection(dict): + + def __setattr__(self, key, value): + self[key] = value + + +class test_Channel(TestCase): + + def setUp(self): + if pyamqp is None: + raise SkipTest('py-amqp not installed') + + class Channel(pyamqp.Channel): + wait_returns = [] + + def _x_open(self, *args, **kwargs): + pass + + def wait(self, *args, **kwargs): + return self.wait_returns + + def _send_method(self, *args, **kwargs): + pass + + self.conn = Mock() + self.conn.channels = {} + self.channel = Channel(self.conn, 0) + + def test_init(self): + self.assertFalse(self.channel.no_ack_consumers) + + def test_prepare_message(self): + x = self.channel.prepare_message('foobar', 10, + 'application/data', 'utf-8', + properties={}) + self.assertTrue(x) + + def test_message_to_python(self): + message = Mock() + message.headers = {} + message.properties = {} + self.assertTrue(self.channel.message_to_python(message)) + + def test_close_resolves_connection_cycle(self): + self.assertIsNotNone(self.channel.connection) + self.channel.close() + self.assertIsNone(self.channel.connection) + + def test_basic_consume_registers_ack_status(self): + self.channel.wait_returns = 'my-consumer-tag' + self.channel.basic_consume('foo', no_ack=True) + self.assertIn('my-consumer-tag', self.channel.no_ack_consumers) + + self.channel.wait_returns = 'other-consumer-tag' + self.channel.basic_consume('bar', no_ack=False) + self.assertNotIn('other-consumer-tag', self.channel.no_ack_consumers) + + self.channel.basic_cancel('my-consumer-tag') + self.assertNotIn('my-consumer-tag', self.channel.no_ack_consumers) + + +class test_Transport(TestCase): + + def setUp(self): + if pyamqp is None: + raise SkipTest('py-amqp not installed') + self.connection = Connection('pyamqp://') + self.transport = self.connection.transport + + def test_create_channel(self): + connection = Mock() + self.transport.create_channel(connection) + connection.channel.assert_called_with() + + def test_drain_events(self): + connection = Mock() + self.transport.drain_events(connection, timeout=10.0) + connection.drain_events.assert_called_with(timeout=10.0) + + def test_dnspython_localhost_resolve_bug(self): + + class Conn(object): + + def __init__(self, **kwargs): + vars(self).update(kwargs) + + self.transport.Connection = Conn + self.transport.client.hostname = 'localhost' + conn1 = self.transport.establish_connection() + self.assertEqual(conn1.host, '127.0.0.1:5672') + + self.transport.client.hostname = 'example.com' + conn2 = self.transport.establish_connection() + self.assertEqual(conn2.host, 'example.com:5672') + + def test_close_connection(self): + connection = Mock() + connection.client = Mock() + self.transport.close_connection(connection) + + self.assertIsNone(connection.client) + connection.close.assert_called_with() + + def test_verify_connection(self): + connection = Mock() + connection.channels = None + self.assertFalse(self.transport.verify_connection(connection)) + + connection.channels = {1: 1, 2: 2} + self.assertTrue(self.transport.verify_connection(connection)) + + @mask_modules('ssl') + def test_import_no_ssl(self): + pm = sys.modules.pop('amqp.connection') + try: + from amqp.connection import SSLError + self.assertEqual(SSLError.__module__, 'amqp.connection') + finally: + if pm is not None: + sys.modules['amqp.connection'] = pm + + +class test_pyamqp(TestCase): + + def setUp(self): + if pyamqp is None: + raise SkipTest('py-amqp not installed') + + def test_default_port(self): + + class Transport(pyamqp.Transport): + Connection = MockConnection + + c = Connection(port=None, transport=Transport).connect() + self.assertEqual(c['host'], + '127.0.0.1:%s' % (Transport.default_port, )) + + def test_custom_port(self): + + class Transport(pyamqp.Transport): + Connection = MockConnection + + c = Connection(port=1337, transport=Transport).connect() + self.assertEqual(c['host'], '127.0.0.1:1337') diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py index e08c4c14..af716ede 100644 --- a/kombu/tests/transport/virtual/test_base.py +++ b/kombu/tests/transport/virtual/test_base.py @@ -66,7 +66,7 @@ class test_QoS(TestCase): self.q.append(i + 1, uuid()) self.assertFalse(self.q.can_consume()) - tag1 = self.q._delivered.keys()[0] + tag1 = iter(self.q._delivered).next() self.q.ack(tag1) self.assertTrue(self.q.can_consume()) diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index bfed8aa0..1795fba3 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -160,13 +160,14 @@ class Channel(virtual.Channel): def _new_queue(self, queue, **kwargs): """Ensures a queue exists in SQS.""" - queue = self.queue_name_prefix + queue + # Translate to SQS name for consistency with initial + # _queue_cache population. + queue = self.entity_name(self.queue_name_prefix + queue) try: return self._queue_cache[queue] except KeyError: q = self._queue_cache[queue] = self.sqs.create_queue( - self.entity_name(queue), - self.visibility_timeout) + queue, self.visibility_timeout) return q def _queue_bind(self, *args): diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index f3d85ad5..63de88d8 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -12,19 +12,17 @@ from __future__ import absolute_import import sys -from kombu.syn import detect_environment +from kombu.syn import _detect_environment -DEFAULT_TRANSPORT = 'amqp' -AMQP_TRANSPORT = 'kombu.transport.amqplib.Transport' -AMQP_ALIAS = 'librabbitmq' -if detect_environment() == 'default': - try: - import librabbitmq # noqa - AMQP_TRANSPORT = 'kombu.transport.librabbitmq.Transport' # noqa - AMQP_ALIAS = 'amqp' # noqa - except ImportError: - pass +def supports_librabbitmq(): + if _detect_environment() == 'default': + try: + import librabbitmq # noqa + return True + except ImportError: + pass + return False def _ghettoq(name, new, alias=None): @@ -48,7 +46,8 @@ def _ghettoq(name, new, alias=None): TRANSPORT_ALIASES = { - 'amqp': AMQP_TRANSPORT, + 'amqp': 'kombu.transport.amqplib.Transport', + 'pyamqp': 'kombu.transport.pyamqp.Transport', 'amqplib': 'kombu.transport.amqplib.Transport', 'librabbitmq': 'kombu.transport.librabbitmq.Transport', 'pika': 'kombu.transport.pika2.Transport', @@ -70,6 +69,8 @@ TRANSPORT_ALIASES = { 'ghettoq.taproot.Beanstalk': _ghettoq('Beanstalk', 'beanstalk'), 'ghettoq.taproot.CouchDB': _ghettoq('CouchDB', 'couchdb'), 'filesystem': 'kombu.transport.filesystem.Transport', + 'zeromq': 'kombu.transport.zmq.Transport', + 'zmq': 'kombu.transport.zmq.Transport', } _transport_cache = {} @@ -103,7 +104,6 @@ def get_transport_cls(transport=None): the alias table will be consulted. """ - transport = transport or DEFAULT_TRANSPORT if transport not in _transport_cache: _transport_cache[transport] = _get_transport_cls(transport) return _transport_cache[transport] diff --git a/kombu/transport/amqplib.py b/kombu/transport/amqplib.py index f87269af..57304254 100644 --- a/kombu/transport/amqplib.py +++ b/kombu/transport/amqplib.py @@ -10,6 +10,7 @@ amqplib transport. """ from __future__ import absolute_import +import errno import socket try: @@ -17,6 +18,7 @@ try: except ImportError: class SSLError(Exception): # noqa pass +from struct import unpack from amqplib import client_0_8 as amqp from amqplib.client_0_8 import transport @@ -40,19 +42,47 @@ transport.AMQP_PROTOCOL_HEADER = str_to_bytes('AMQP\x01\x01\x08\x00') # - fixes warnings when socket is not connected. -class _TCPTransport(transport.TCPTransport): +class TCPTransport(transport.TCPTransport): + + def read_frame(self): + frame_type, channel, size = unpack('>BHI', self._read(7, True)) + payload = self._read(size) + ch = ord(self._read(1)) + if ch == 206: # '\xce' + return frame_type, channel, payload + else: + raise Exception( + 'Framing Error, received 0x%02x while expecting 0xce' % ch) + + def _read(self, n, initial=False): + while len(self._read_buffer) < n: + try: + s = self.sock.recv(65536) + except socket.error, exc: + if not initial and exc.errno in (errno.EAGAIN, errno.EINTR): + continue + raise + if not s: + raise IOError('Socket closed') + self._read_buffer += s + + result = self._read_buffer[:n] + self._read_buffer = self._read_buffer[n:] + + return result def __del__(self): try: - transport._AbstractTransport.__del__(self) - except socket.error: + self.close() + except Exception: pass finally: self.sock = None -transport.TCPTransport = _TCPTransport + +transport.TCPTransport = TCPTransport -class _SSLTransport(transport.SSLTransport): +class SSLTransport(transport.SSLTransport): def __init__(self, host, connect_timeout, ssl): if isinstance(ssl, dict): @@ -60,7 +90,41 @@ class _SSLTransport(transport.SSLTransport): self.sslobj = None transport._AbstractTransport.__init__(self, host, connect_timeout) -transport.SSLTransport = _SSLTransport + + def read_frame(self): + frame_type, channel, size = unpack('>BHI', self._read(7, True)) + payload = self._read(size) + ch = ord(self._read(1)) + if ch == 206: # '\xce' + return frame_type, channel, payload + else: + raise Exception( + 'Framing Error, received 0x%02x while expecting 0xce' % ch) + + def _read(self, n, initial=False): + result = '' + + while len(result) < n: + try: + s = self.sslobj.read(n - len(result)) + except socket.error, exc: + if not initial and exc.errno in (errno.EAGAIN, errno.EINTR): + continue + raise + if not s: + raise IOError('Socket closed') + result += s + + return result + + def __del__(self): + try: + self.close() + except Exception: + pass + finally: + self.sock = None +transport.SSLTransport = SSLTransport class Connection(amqp.Connection): # pragma: no cover @@ -250,6 +314,7 @@ class Transport(base.Transport): nb_keep_draining = True driver_name = "amqplib" driver_type = "amqp" + supports_ev = True def __init__(self, client, **kwargs): self.client = client diff --git a/kombu/transport/base.py b/kombu/transport/base.py index bc934a84..b4b48687 100644 --- a/kombu/transport/base.py +++ b/kombu/transport/base.py @@ -201,6 +201,13 @@ class Transport(object): #: Name of driver library (e.g. 'amqplib', 'redis', 'beanstalkc'). driver_name = 'N/A' + #: Whether this transports support heartbeats, + #: and that the :meth:`heartbeat_check` method has any effect. + supports_heartbeats = False + + #: Set to true if the transport supports the AIO interface. + supports_ev = False + def __init__(self, client, **kwargs): self.client = client @@ -219,6 +226,9 @@ class Transport(object): def drain_events(self, connection, **kwargs): raise NotImplementedError('Subclass responsibility') + def heartbeat_check(self, connection, rate=2): + pass + def driver_version(self): return 'N/A' @@ -227,6 +237,15 @@ class Transport(object): Unconvenient to use, and limited transport support.""" return {} + def on_poll_init(self, poller): + pass + + def on_poll_start(self): + raise NotImplementedError('transport: no eventloop support') + + def on_poll_empty(self): + pass + def verify_connection(self, connection): return True diff --git a/kombu/transport/beanstalk.py b/kombu/transport/beanstalk.py index 6745bc1e..0cdd2f98 100644 --- a/kombu/transport/beanstalk.py +++ b/kombu/transport/beanstalk.py @@ -105,8 +105,9 @@ class Channel(virtual.Channel): def _open(self): conninfo = self.connection.client + host = conninfo.hostname or 'localhost' port = conninfo.port or DEFAULT_PORT - conn = beanstalkc.Connection(host=conninfo.hostname, port=port) + conn = beanstalkc.Connection(host=host, port=port) conn.connect() return conn diff --git a/kombu/transport/librabbitmq.py b/kombu/transport/librabbitmq.py index 62baec96..a3be2d3c 100644 --- a/kombu/transport/librabbitmq.py +++ b/kombu/transport/librabbitmq.py @@ -39,9 +39,9 @@ class Message(base.Message): body=body, delivery_info=info, properties=props, - delivery_tag=info['delivery_tag'], - content_type=props['content_type'], - content_encoding=props['content_encoding'], + delivery_tag=info.get('delivery_tag'), + content_type=props.get('content_type'), + content_encoding=props.get('content_encoding'), headers=props.get('headers')) @@ -77,6 +77,7 @@ class Transport(base.Transport): driver_type = 'amqp' driver_name = 'librabbitmq' + supports_ev = True nb_keep_draining = True def __init__(self, client, **kwargs): diff --git a/kombu/transport/pika2.py b/kombu/transport/pika2.py index bf1ab721..aa5ed029 100644 --- a/kombu/transport/pika2.py +++ b/kombu/transport/pika2.py @@ -67,8 +67,8 @@ class Channel(blocking.BlockingChannel, base.StdChannel): return None, method, method._properties, method._body def queue_purge(self, queue=None, nowait=False): - return super(Channel, self).queue_purge(queue=queue, - nowait=nowait).message_count + return super(Channel, self).\ + queue_purge(queue=queue, nowait=nowait).method.message_count def basic_publish(self, message, exchange, routing_key, mandatory=False, immediate=False): @@ -103,8 +103,7 @@ class Channel(blocking.BlockingChannel, base.StdChannel): properties = spec.BasicProperties(priority=priority, content_type=content_type, content_encoding=content_encoding, - headers=headers, - **properties) + headers=headers) return body, properties def message_to_python(self, raw_message): diff --git a/kombu/transport/pyamqp.py b/kombu/transport/pyamqp.py new file mode 100644 index 00000000..2ad938c7 --- /dev/null +++ b/kombu/transport/pyamqp.py @@ -0,0 +1,136 @@ +""" +kombu.transport.pyamqp +====================== + +pure python amqp transport. + +:copyright: (c) 2009 - 2012 by Ask Solem. +:license: BSD, see LICENSE for more details. + +""" +from __future__ import absolute_import + +import amqp + +from kombu.exceptions import StdChannelError +from kombu.utils.amq_manager import get_manager + +from . import base + +DEFAULT_PORT = 5672 + + +class Message(base.Message): + + def __init__(self, channel, msg, **kwargs): + props = msg.properties + super(Message, self).__init__(channel, + body=msg.body, + delivery_tag=msg.delivery_tag, + content_type=props.get('content_type'), + content_encoding=props.get('content_encoding'), + delivery_info=msg.delivery_info, + properties=msg.properties, + headers=props.get('application_headers') or {}, + **kwargs) + + +class Channel(amqp.Channel, base.StdChannel): + Message = Message + + def prepare_message(self, message_data, priority=None, + content_type=None, content_encoding=None, headers=None, + properties=None): + """Encapsulate data into a AMQP message.""" + return amqp.Message(message_data, priority=priority, + content_type=content_type, + content_encoding=content_encoding, + application_headers=headers, + **properties) + + def message_to_python(self, raw_message): + """Convert encoded message body back to a Python value.""" + return self.Message(self, raw_message) + + +class Connection(amqp.Connection): + Channel = Channel + + +class Transport(base.Transport): + Connection = Connection + + default_port = DEFAULT_PORT + + # it's very annoying that pyamqp sometimes raises AttributeError + # if the connection is lost, but nothing we can do about that here. + connection_errors = amqp.Connection.connection_errors + channel_errors = (StdChannelError, ) + amqp.Connection.channel_errors + + nb_keep_draining = True + driver_name = "py-amqp" + driver_type = "amqp" + supports_heartbeats = True + supports_ev = True + + def __init__(self, client, **kwargs): + self.client = client + self.default_port = kwargs.get("default_port") or self.default_port + + def create_channel(self, connection): + return connection.channel() + + def drain_events(self, connection, **kwargs): + return connection.drain_events(**kwargs) + + def establish_connection(self): + """Establish connection to the AMQP broker.""" + conninfo = self.client + for name, default_value in self.default_connection_params.items(): + if not getattr(conninfo, name, None): + setattr(conninfo, name, default_value) + if conninfo.hostname == 'localhost': + conninfo.hostname = '127.0.0.1' + conn = self.Connection(host=conninfo.host, + userid=conninfo.userid, + password=conninfo.password, + login_method=conninfo.login_method, + virtual_host=conninfo.virtual_host, + insist=conninfo.insist, + ssl=conninfo.ssl, + connect_timeout=conninfo.connect_timeout, + heartbeat=conninfo.heartbeat) + conn.client = self.client + return conn + + def close_connection(self, connection): + """Close the AMQP broker connection.""" + connection.client = None + connection.close() + + def is_alive(self, connection): + return connection.is_alive() + + def verify_connection(self, connection): + return connection.channels is not None and self.is_alive(connection) + + def eventmap(self, connection): + return {connection.sock: self.client.drain_nowait} + + def on_poll_init(self, poller): + pass + + def on_poll_start(self): + return {} + + def heartbeat_check(self, connection, rate=2): + return connection.heartbeat_tick(rate=rate) + + @property + def default_connection_params(self): + return {'userid': 'guest', 'password': 'guest', + 'port': self.default_port, + 'hostname': 'localhost', 'login_method': 'AMQPLAIN'} + + def get_manager(self, *args, **kwargs): + return get_manager(self.client, *args, **kwargs) diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 9125c55e..5b826bfe 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -76,7 +76,7 @@ class QoS(virtual.QoS): super(QoS, self).append(message, delivery_tag) def restore_unacked(self): - for tag in self._delivered.iterkeys(): + for tag in self._delivered: self.restore_by_tag(tag) self._delivered.clear() @@ -111,7 +111,7 @@ class QoS(virtual.QoS): M, EX, RK = loads(p) self.channel._do_restore_message(M, EX, RK) - @cached_property + @property def client(self): return self.channel._avail_client @@ -198,6 +198,17 @@ class MultiChannelPoller(object): if channel.active_fanout_queues: # LISTEN mode? self._register_LISTEN(channel) + def on_poll_init(self, poller): + self.poller = poller + for channel in self._channels: + channel.qos.restore_visible() + + def on_poll_empty(self): + for channel in self._channels: + if channel.active_queues: + # only need to do this once, as they are not local to channel. + return channel.qos.restore_visible() + def handle_event(self, fileno, event): if event & READ: chan, type = self._fd_to_chan[fileno] @@ -223,9 +234,8 @@ class MultiChannelPoller(object): # - no new data, so try to restore messages. # - reset active redis commands. - for channel in self._channels: - if channel.active_queues: - channel.qos.restore_visible() + self.on_poll_empty() + raise Empty() @property @@ -289,7 +299,7 @@ class Channel(virtual.Channel): def _restore(self, message, payload=None): tag = message.delivery_tag - P, _ = self.client.pipeline() \ + P, _ = self._avail_client.pipeline() \ .hget(self.unacked_key, tag) \ .hdel(self.unacked_key, tag) \ .execute() @@ -460,7 +470,7 @@ class Channel(virtual.Channel): def get_table(self, exchange): key = self.keyprefix_queue % exchange - values = self.client.smembers(key) + values = self._avail_client.smembers(key) if not values: raise InconsistencyError( 'Queue list empty or key does not exist: %r' % ( @@ -481,7 +491,7 @@ class Channel(virtual.Channel): self.connection.cycle.discard(self) # delete fanout bindings - for queue in self._fanout_queues.iterkeys(): + for queue in self._fanout_queues: if queue in self.auto_delete_queues: self.queue_delete(queue) @@ -599,6 +609,7 @@ class Transport(virtual.Transport): polling_interval = None # disable sleep between unsuccessful polls. default_port = DEFAULT_PORT + supports_ev = True driver_type = 'redis' driver_name = 'redis' @@ -615,7 +626,7 @@ class Transport(virtual.Transport): def on_poll_init(self, poller): """Called when hub starts.""" - self.cycle.poller = poller + self.cycle.on_poll_init(poller) def on_poll_start(self): """Called by hub before each ``poll()``""" @@ -623,6 +634,9 @@ class Transport(virtual.Transport): cycle.on_poll_start() return dict((fd, self.handle_event) for fd in cycle.fds) + def on_poll_empty(self): + self.cycle.on_poll_empty() + def handle_event(self, fileno, event): """Handle AIO event for one of our file descriptors.""" ret = self.cycle.handle_event(fileno, event) diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index 6f8087b9..fe2983ed 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -433,7 +433,7 @@ class Channel(AbstractChannel, base.StdChannel): self._queue_bind(exchange, *meta) def list_bindings(self): - for exchange in self.get_exchanges(): + for exchange in self.state.exchanges: table = self.get_table(exchange) for routing_key, pattern, queue in table: yield queue, exchange, routing_key @@ -512,7 +512,7 @@ class Channel(AbstractChannel, base.StdChannel): self.qos.prefetch_count = prefetch_count def get_exchanges(self): - return self.state.exchanges.keys() + return list(self.state.exchanges) def get_table(self, exchange): """Get table of bindings for `exchange`.""" diff --git a/kombu/transport/zmq.py b/kombu/transport/zmq.py new file mode 100644 index 00000000..369e8ff8 --- /dev/null +++ b/kombu/transport/zmq.py @@ -0,0 +1,290 @@ +""" +kombu.transport.zmq +=================== + +ZeroMQ transport. + +""" +from __future__ import absolute_import + +import errno +import os +import socket + +from cPickle import loads, dumps +from Queue import Empty + +import zmq + +from kombu.exceptions import StdChannelError +from kombu.log import get_logger +from kombu.utils import cached_property +from kombu.utils.eventio import poll, READ + +from . import virtual + +logger = get_logger('kombu.transport.zmq') + +DEFAULT_PORT = 5555 +DEFAULT_HWM = 128 +DEFAULT_INCR = 1 + + +class MultiChannelPoller(object): + eventflags = READ + + def __init__(self): + # active channels + self._channels = set() + # file descriptor -> channel map + self._fd_to_chan = {} + # poll implementation (epoll/kqueue/select) + self.poller = poll() + + def close(self): + for fd in self._fd_to_chan: + try: + self.poller.unregister(fd) + except KeyError: + pass + self._channels.clear() + self._fd_to_chan.clear() + self.poller = None + + def add(self, channel): + self._channels.add(channel) + + def discard(self, channel): + self._channels.discard(channel) + self._fd_to_chan.pop(channel.client.connection.fd, None) + + def _register(self, channel): + conn = channel.client.connection + self._fd_to_chan[conn.fd] = channel + self.poller.register(conn.fd, self.eventflags) + + def on_poll_start(self): + for channel in self._channels: + self._register(channel) + + def handle_event(self, fileno, event): + chan = self._fd_to_chan[fileno] + return (chan.drain_events(), chan) + + def get(self, timeout=None): + self.on_poll_start() + + events = self.poller.poll(timeout) + for fileno, event in events or []: + return self.handle_event(fileno, event) + + raise Empty() + + @property + def fds(self): + return self._fd_to_chan + + +class Client(object): + def __init__(self, uri='tcp://127.0.0.1', port=DEFAULT_PORT, + hwm=DEFAULT_HWM, swap_size=None, enable_sink=True, context=None): + try: + scheme, parts = uri.split('://') + except ValueError: + scheme = 'tcp' + parts = uri + endpoints = parts.split(';') + + if scheme != 'tcp': + raise NotImplementedError('Currently only TCP can be used') + + self.context = context or zmq.Context.instance() + + if enable_sink: + self.sink = self.context.socket(zmq.PULL) + self.sink.bind('tcp://*:%s' % port) + else: + self.sink = None + + self.vent = self.context.socket(zmq.PUSH) + self.vent.setsockopt(zmq.HWM, hwm) + if swap_size: + self.vent.setsockopt(zmq.SWAP, swap_size) + + for endpoint in endpoints: + if scheme == 'tcp' and ':' not in endpoint: + endpoint += ':' + str(DEFAULT_PORT) + + endpoint = ''.join([scheme, '://', endpoint]) + + self.connect(endpoint) + + def connect(self, endpoint): + self.vent.connect(endpoint) + + def get(self, queue=None, timeout=None): + try: + return self.sink.recv(flags=zmq.NOBLOCK) + except zmq.ZMQError, e: + if e.errno == zmq.EAGAIN: + raise socket.error(errno.EAGAIN, e.strerror) + else: + raise + + def put(self, queue, message, **kwargs): + return self.vent.send(message) + + def close(self): + if self.sink and not self.sink.closed: + self.sink.close() + if not self.vent.closed: + self.vent.close() + + @property + def connection(self): + if self.sink: + return self.sink + return self.vent + + +class Channel(virtual.Channel): + Client = Client + + hwm = DEFAULT_HWM + swap_size = None + enable_sink = True + port_incr = DEFAULT_INCR + + from_transport_options = (virtual.Channel.from_transport_options + + ('hwm', + 'swap_size', + 'enable_sink', + 'port_incr')) + + def __init__(self, *args, **kwargs): + super_ = super(Channel, self) + super_.__init__(*args, **kwargs) + + # Evaluate socket + self.client.connection.closed + + self.connection.cycle.add(self) + self.connection_errors = self.connection.connection_errors + + def _get(self, queue, timeout=None): + try: + return loads(self.client.get(queue, timeout)) + except socket.error, exc: + if exc.errno == errno.EAGAIN and timeout != 0: + raise Empty() + else: + raise + + def _put(self, queue, message, **kwargs): + self.client.put(queue, dumps(message, -1), **kwargs) + + def _purge(self, queue): + return 0 + + def _poll(self, cycle, timeout=None): + return cycle.get(timeout=timeout) + + def close(self): + if not self.closed: + self.connection.cycle.discard(self) + try: + self.__dict__['client'].close() + except KeyError: + pass + super(Channel, self).close() + + def _prepare_port(self, port): + return (port + self.channel_id - 1) * self.port_incr + + def _create_client(self): + conninfo = self.connection.client + port = self._prepare_port(conninfo.port or DEFAULT_PORT) + return self.Client(uri=conninfo.hostname or 'tcp://127.0.0.1', + port=port, + hwm=self.hwm, + swap_size=self.swap_size, + enable_sink=self.enable_sink, + context=self.connection.context) + + @cached_property + def client(self): + return self._create_client() + + +class Transport(virtual.Transport): + Channel = Channel + + default_port = DEFAULT_PORT + driver_type = 'zeromq' + driver_name = 'zmq' + + connection_errors = (zmq.ZMQError,) + channel_errors = (zmq.ZMQError, StdChannelError,) + + supports_ev = True + polling_interval = None + nb_keep_draining = True + + def __init__(self, *args, **kwargs): + super(Transport, self).__init__(*args, **kwargs) + + self.cycle = MultiChannelPoller() + + def driver_version(self): + return zmq.__version__ + + def on_poll_init(self, poller): + self.cycle.poller = poller + + def on_poll_start(self): + cycle = self.cycle + cycle.on_poll_start() + return dict((fd, self.handle_event) for fd in cycle.fds) + + def handle_event(self, fileno, event): + evt = self.cycle.handle_event(fileno, event) + self._handle_event(evt) + + def drain_events(self, connection, timeout=None): + more_to_read = False + for channel in connection.channels: + try: + evt = channel.cycle.get(timeout=timeout) + except socket.error, e: + if e.errno == errno.EAGAIN: + continue + raise + else: + connection._handle_event((evt, channel)) + more_to_read = True + if not more_to_read: + raise socket.error(errno.EAGAIN, os.strerror(errno.EAGAIN)) + + def _handle_event(self, evt): + item, channel = evt + message, queue = item + if not queue or queue not in self._callbacks: + raise KeyError( + "Received message for queue '%s' without consumers: %s" % ( + queue, message)) + self._callbacks[queue](message) + + def establish_connection(self): + self.context.closed + return super(Transport, self).establish_connection() + + def close_connection(self, connection): + super(Transport, self).close_connection(connection) + try: + connection.__dict__['context'].term() + except KeyError: + pass + + @cached_property + def context(self): + return zmq.Context(1) diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py index 8e230afe..b402ff6c 100644 --- a/kombu/utils/__init__.py +++ b/kombu/utils/__init__.py @@ -153,7 +153,6 @@ def retry_over_time(fun, catch, args=[], kwargs={}, errback=None, interval_range = fxrange(interval_start, interval_max + interval_start, interval_step, repeatlast=True) - for retries, interval in enumerate(interval_range): # for infinity try: return fun(*args, **kwargs) @@ -164,10 +163,10 @@ def retry_over_time(fun, catch, args=[], kwargs={}, errback=None, callback() if errback: errback(exc, interval) - for i in fxrange(stop=interval): + for i in fxrange(stop=interval or 1.0): if i and callback: callback() - sleep(1.0) + sleep(i) def emergency_dump_state(state, open_file=open, dump=None): @@ -264,7 +263,7 @@ def reprkwargs(kwargs, sep=', ', fmt='%s=%s'): return sep.join(fmt % (k, _safe_repr(v)) for k, v in kwargs.iteritems()) -def reprcall(name, args=(), kwargs=(), sep=', '): +def reprcall(name, args=(), kwargs={}, sep=', '): return '%s(%s%s%s)' % (name, sep.join(map(_safe_repr, args or ())), (args and kwargs) and sep or '', reprkwargs(kwargs, sep)) diff --git a/requirements/test-ci.txt b/requirements/test-ci.txt new file mode 100644 index 00000000..19049d44 --- /dev/null +++ b/requirements/test-ci.txt @@ -0,0 +1,3 @@ +coverage>=3.0 +PyYAML +msgpack-python<0.2.0 # 0.2.0 dropped 2.5 support diff --git a/requirements/test-jython.txt b/requirements/test-jython.txt deleted file mode 100644 index 8931aca4..00000000 --- a/requirements/test-jython.txt +++ /dev/null @@ -1,6 +0,0 @@ -nose -nose-cover3 -unittest2>=0.5.0 -coverage>=3.0 -mock>=0.7.0 -simplejson diff --git a/requirements/test-py3k.txt b/requirements/test-py3k.txt index 87e69702..3b76ba50 100644 --- a/requirements/test-py3k.txt +++ b/requirements/test-py3k.txt @@ -1,5 +1,3 @@ distribute nose -nose-cover3 -coverage>=3.0 mock>=0.7.0 diff --git a/requirements/test-pypy.txt b/requirements/test-pypy.txt deleted file mode 100644 index c0af83aa..00000000 --- a/requirements/test-pypy.txt +++ /dev/null @@ -1,7 +0,0 @@ -nose -nose-cover3 -unittest2>=0.5.0 -coverage>=3.0 -mock>=0.7.0 -simplejson -PyYAML==3.09 # 3.10 dropped 2.4 support diff --git a/requirements/test.txt b/requirements/test.txt index 75350c84..5cf3124c 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -1,8 +1,4 @@ nose nose-cover3 unittest2>=0.5.0 -coverage>=3.0 mock -simplejson -PyYAML -msgpack-python<0.2.0 # 0.2.0 dropped 2.5 support @@ -12,6 +12,7 @@ recreate = True basepython = python3.2 changedir = .tox deps = -r{toxinidir}/requirements/default.txt + -r{toxinidir}/requirements/test-ci.txt commands = {toxinidir}/extra/release/removepyc.sh {toxinidir} {envbindir}/easy_install -U distribute {envbindir}/pip install \ @@ -39,6 +40,7 @@ commands = {toxinidir}/extra/release/removepyc.sh {toxinidir} basepython = python2.7 deps = -r{toxinidir}/requirements/default.txt -r{toxinidir}/requirements/test.txt + -r{toxinidir}/requirements/test-ci.txt commands = {toxinidir}/extra/release/removepyc.sh {toxinidir} nosetests --with-xunit \ --xunit-file={toxinidir}/nosetests.xml \ @@ -50,6 +52,7 @@ commands = {toxinidir}/extra/release/removepyc.sh {toxinidir} basepython = python2.6 deps = -r{toxinidir}/requirements/default.txt -r{toxinidir}/requirements/test.txt + -r{toxinidir}/requirements/test-ci.txt commands = {toxinidir}/extra/release/removepyc.sh {toxinidir} nosetests --with-xunit \ --xunit-file={toxinidir}/nosetests.xml \ @@ -61,6 +64,8 @@ commands = {toxinidir}/extra/release/removepyc.sh {toxinidir} basepython = python2.5 deps = -r{toxinidir}/requirements/default.txt -r{toxinidir}/requirements/test.txt + -r{toxinidir}/requirements/test-py25.txt + -r{toxinidir}/requirements/test-ci.txt commands = {toxinidir}/extra/release/removepyc.sh {toxinidir} nosetests --with-xunit \ --xunit-file={toxinidir}/nosetests.xml \ @@ -71,7 +76,8 @@ commands = {toxinidir}/extra/release/removepyc.sh {toxinidir} [testenv:pypy] basepython = pypy deps = -r{toxinidir}/requirements/default.txt - -r{toxinidir}/requirements/test-pypy.txt + -r{toxinidir}/requirements/test.txt + -r{toxinidir}/requirements/test-ci.txt commands = {toxinidir}/extra/release/removepyc.sh {toxinidir} nosetests --with-xunit \ --xunit-file={toxinidir}/nosetests.xml \ @@ -84,6 +90,8 @@ basepython = jython recreate = True where = .tox deps = -r{toxinidir}/requirements/default.txt - -r{toxinidir}/requirements/test-jython.txt + -r{toxinidir}/requirements/test.txt + -r{toxinidir}/requirements/test-py25.txt + -r{toxinidir}/requirements/test-ci.txt commands = {toxinidir}/extra/release/removepyc.sh {toxinidir} {toxinidir}/extra/release/jython-run-tests {toxinidir} |