summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2012-08-29 15:46:46 +0100
committerAsk Solem <ask@celeryproject.org>2012-08-29 15:46:46 +0100
commitf1fe4e6881916e6993d7aa548f9d8601562a225e (patch)
tree9a4bb4ef2c84bbbe916f1f60c54101ec2b6ea9da
parentc0244d514efe1694576683b6355ab123b809f620 (diff)
parentb7c5f069a42aa400fffdf3c49f626fe7871ab8e4 (diff)
downloadkombu-asynchronous-broadcasts.tar.gz
Merge branch 'master' into asynchronous-broadcastsasynchronous-broadcasts
-rw-r--r--AUTHORS1
-rw-r--r--Changelog158
-rw-r--r--README.rst2
-rw-r--r--docs/reference/index.rst2
-rw-r--r--docs/reference/kombu.transport.pyamqp.rst36
-rw-r--r--docs/reference/kombu.transport.zmq.rst9
-rw-r--r--docs/userguide/serialization.rst4
-rw-r--r--funtests/tests/test_pyamqp.py6
-rw-r--r--kombu/__init__.py2
-rw-r--r--kombu/clocks.py4
-rw-r--r--kombu/compat.py3
-rw-r--r--kombu/compression.py2
-rw-r--r--kombu/connection.py56
-rw-r--r--kombu/messaging.py3
-rw-r--r--kombu/pidbox.py1
-rw-r--r--kombu/serialization.py23
-rw-r--r--kombu/tests/test_serialization.py10
-rw-r--r--kombu/tests/transport/test_pyamqp.py162
-rw-r--r--kombu/tests/transport/virtual/test_base.py2
-rw-r--r--kombu/transport/SQS.py7
-rw-r--r--kombu/transport/__init__.py26
-rw-r--r--kombu/transport/amqplib.py77
-rw-r--r--kombu/transport/base.py19
-rw-r--r--kombu/transport/beanstalk.py3
-rw-r--r--kombu/transport/librabbitmq.py7
-rw-r--r--kombu/transport/pika2.py7
-rw-r--r--kombu/transport/pyamqp.py136
-rw-r--r--kombu/transport/redis.py32
-rw-r--r--kombu/transport/virtual/__init__.py4
-rw-r--r--kombu/transport/zmq.py290
-rw-r--r--kombu/utils/__init__.py7
-rw-r--r--requirements/test-ci.txt3
-rw-r--r--requirements/test-jython.txt6
-rw-r--r--requirements/test-py3k.txt2
-rw-r--r--requirements/test-pypy.txt7
-rw-r--r--requirements/test.txt4
-rw-r--r--tox.ini12
37 files changed, 1041 insertions, 94 deletions
diff --git a/AUTHORS b/AUTHORS
index 8aedb36d..9b1556dc 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -13,6 +13,7 @@ Anton Gyllenberg <anton@iki.fi>
Ask Solem <ask@celeryproject.org>
Bobby Beever <bobby.beever@yahoo.com>
Brian Bernstein
+C Anthony Risinger <anthony+corvisa.com@xtfx.me>
Christophe Chauvet <christophe.chauvet@gmail.com>
Christopher Grebs <cg@webshox.org>
Clay Gerrard <clay.gerrard@gmail.com>
diff --git a/Changelog b/Changelog
index 064c0d2e..15db95b2 100644
--- a/Changelog
+++ b/Changelog
@@ -4,6 +4,164 @@
Change history
================
+.. _version-2.4.3:
+
+2.4.3
+=====
+:release-date: 2012-08-25 10:30 P.M BST
+
+- Fixed problem with amqp transport alias (Issue #154).
+
+.. _version-2.4.2:
+
+2.4.2
+=====
+:release-date: 2012-08-24 05:00 P.M BST
+
+- Having an empty transport name broke in 2.4.1.
+
+
+.. _version-2.4.1:
+
+2.4.1
+=====
+:release-date: 2012-08-24 04:00 P.M BST
+
+- Redis: Fixed race condition that could cause the consumer to crash (Issue #151)
+
+ Often leading to the error message ``"could not convert string to float"``
+
+- Connection retry could cause an inifite loop (Issue #145).
+
+- The ``amqp`` alias is now resolved at runtime, so that eventlet detection
+ works even if patching was done later.
+
+.. _version-2.4.0:
+
+2.4.0
+=====
+:release-date: 2012-08-17 08:00 P.M BST
+
+- New experimental :mod:`ZeroMQ <kombu.transport.zmq` transport.
+
+ Contributed by John Watson.
+
+- Redis: Ack timed-out messages were not restored when using the eventloop.
+
+- Now uses pickle protocol 2 by default to be cross-compatible with Python 3.
+
+ The protocol can also now be changed using the :envvar:`PICKLE_PROTOCOL`
+ environment variable.
+
+- Adds ``Transport.supports_ev`` attribute.
+
+- Pika: Queue purge was not working properly.
+
+ Fix contributed by Steeve Morin.
+
+- Pika backend was no longer working since Kombu 2.3
+
+ Fix contributed by Steeve Morin.
+
+.. _version-2.3.2:
+
+2.3.2
+=====
+:release-date: 2012-08-01 06:00 P.M BST
+
+- Fixes problem with deserialization in Python 3.
+
+.. _version-2.3.1:
+
+2.3.1
+=====
+:release-date: 2012-08-01 04:00 P.M BST
+
+- librabbitmq: Can now handle messages that does not have a
+ content_encoding/content_type set (Issue #149).
+
+ Fix contributed by C Anthony Risinger.
+
+- Beanstalk: Now uses localhost by default if the URL does not contain a host.
+
+.. _version-2.3.0:
+
+2.3.0
+=====
+:release-date: 2012-07-24 03:50 P.M BST
+
+- New ``pyamqp://`` transport!
+
+ The new `py-amqp`_ library is a fork of amqplib started with the
+ following goals:
+
+ - Uses AMQP 0.9.1 instead of 0.8
+ - Should support all RabbitMQ extensions
+ - API compatible with :mod:`librabbitmq` so that it can be used
+ as a pure-python replacement in environments where rabbitmq-c cannot
+ be compiled.
+
+ .. _`py-amqp`: http://amqp.readthedocs.org/
+
+ If you start using use py-amqp instead of amqplib you can enjoy many
+ advantages including:
+
+ - Heartbeat support (Issue #79 + Issue #131)
+ - Consumer Cancel Notifications (Issue #131)
+ - Publisher Confirms
+
+ amqplib has not been updated in a long while, so maintaining our own fork
+ ensures that we can quickly roll out new features and fixes without
+ resorting to monkey patching.
+
+ To use the py-amqp transport you must install the :mod:`amqp` library::
+
+ $ pip install amqp
+
+ and change the connection URL to use the correct transport::
+
+ >>> conn = Connection('pyamqp://guest:guest@localhost//')
+
+
+ The ``pyamqp://`` transport will be the default fallback transport
+ in Kombu version 3.0, when :mod:`librabbitmq` is not installed,
+ and librabbitmq will also be updated to support the same features.
+
+- Connection now supports heartbeat argument.
+
+ If enabled you must make sure to manually maintain heartbeats
+ by calling the ``Connection.heartbeat_check`` at twice the rate
+ of the specified heartbeat interval.
+
+ E.g. if you have ``Connection(heartbeat=10)``,
+ then you must call ``Connection.heartbeat_check()`` every 5 seconds.
+
+ if the server has not sent heartbeats at a suitable rate then
+ the heartbeat check method must raise an error that is listed
+ in ``Connection.connection_errors``.
+
+ The attribute ``Connection.supports_heartbeats`` has been added
+ for the ability to inspect if a transport supports heartbeats
+ or not.
+
+ Calling ``heartbeat_check`` on a transport that does
+ not support heartbeats results in a noop operation.
+
+- SQS: Fixed bug with invalid characters in queue names.
+
+ Fix contributed by Zach Smith.
+
+- utils.reprcall: Fixed typo where kwargs argument was an empty tuple by
+ default, and not an empty dict.
+
+.. _version-2.2.6:
+
+2.2.6
+=====
+:release-date: 2012-07-10 17:00 P.M BST
+
+- Adds ``messaging.entry_to_queue`` for compat with previous versions.
+
.. _version-2.2.5:
2.2.5
diff --git a/README.rst b/README.rst
index da2df132..1aa90c5d 100644
--- a/README.rst
+++ b/README.rst
@@ -2,7 +2,7 @@
kombu - Messaging Framework for Python
========================================
-:Version: 2.2.5
+:Version: 2.4.3
`Kombu` is a messaging framework for Python.
diff --git a/docs/reference/index.rst b/docs/reference/index.rst
index 551ea1b7..c6186341 100644
--- a/docs/reference/index.rst
+++ b/docs/reference/index.rst
@@ -22,10 +22,12 @@
kombu.transport
kombu.transport.amqplib
kombu.transport.librabbitmq
+ kombu.transport.pyamqp
kombu.transport.pika
kombu.transport.pika2
kombu.transport.memory
kombu.transport.redis
+ kombu.transport.zmq
kombu.transport.beanstalk
kombu.transport.mongodb
kombu.transport.couchdb
diff --git a/docs/reference/kombu.transport.pyamqp.rst b/docs/reference/kombu.transport.pyamqp.rst
new file mode 100644
index 00000000..33ebf0b4
--- /dev/null
+++ b/docs/reference/kombu.transport.pyamqp.rst
@@ -0,0 +1,36 @@
+.. currentmodule:: kombu.transport.pyamqp
+
+.. automodule:: kombu.transport.pyamqp
+
+ .. contents::
+ :local:
+
+ Transport
+ ---------
+
+ .. autoclass:: Transport
+ :members:
+ :undoc-members:
+
+ Connection
+ ----------
+
+ .. autoclass:: Connection
+ :members:
+ :undoc-members:
+ :inherited-members:
+
+ Channel
+ -------
+
+ .. autoclass:: Channel
+ :members:
+ :undoc-members:
+
+ Message
+ -------
+
+ .. autoclass:: Message
+ :members:
+ :undoc-members:
+
diff --git a/docs/reference/kombu.transport.zmq.rst b/docs/reference/kombu.transport.zmq.rst
new file mode 100644
index 00000000..0735840b
--- /dev/null
+++ b/docs/reference/kombu.transport.zmq.rst
@@ -0,0 +1,9 @@
+.. currentmodule:: kombu.transport.zmq
+
+.. automodule:: kombu.transport.zmq
+
+ .. contents::
+ :local:
+
+ :members:
+ :undoc-members:
diff --git a/docs/userguide/serialization.rst b/docs/userguide/serialization.rst
index c39cc3cc..d07c98a5 100644
--- a/docs/userguide/serialization.rst
+++ b/docs/userguide/serialization.rst
@@ -44,6 +44,10 @@ Each option has its advantages and disadvantages.
smaller messages when sending binary files, and a slight speedup
over `JSON` processing.
+ By default Kombu uses pickle protocol 2, but this can be changed
+ using the :envvar:`PICKLE_PROTOCOL` environment variable or by changing
+ the global :data:`kombu.serialization.pickle_protocol` flag.
+
`yaml` -- YAML has many of the same characteristics as `json`,
except that it natively supports more data types (including dates,
recursive references, etc.)
diff --git a/funtests/tests/test_pyamqp.py b/funtests/tests/test_pyamqp.py
new file mode 100644
index 00000000..8d493a80
--- /dev/null
+++ b/funtests/tests/test_pyamqp.py
@@ -0,0 +1,6 @@
+from funtests import transport
+
+
+class test_pyamqp(transport.TransportCase):
+ transport = "pyamqp"
+ prefix = "pyamqp"
diff --git a/kombu/__init__.py b/kombu/__init__.py
index 21655fa1..6a1e2632 100644
--- a/kombu/__init__.py
+++ b/kombu/__init__.py
@@ -1,7 +1,7 @@
"""Messaging Framework for Python"""
from __future__ import absolute_import
-VERSION = (2, 2, 5)
+VERSION = (2, 4, 3)
__version__ = '.'.join(map(str, VERSION[0:3])) + ''.join(VERSION[3:])
__author__ = 'Ask Solem'
__contact__ = 'ask@celeryproject.org'
diff --git a/kombu/clocks.py b/kombu/clocks.py
index a45bc767..aecb68fb 100644
--- a/kombu/clocks.py
+++ b/kombu/clocks.py
@@ -11,7 +11,7 @@ Logical Clocks and Synchronization.
from __future__ import absolute_import
from __future__ import with_statement
-from threading import Lock
+import threading
__all__ = ['LamportClock']
@@ -57,7 +57,7 @@ class LamportClock(object):
def __init__(self, initial_value=0):
self.value = initial_value
- self.mutex = Lock()
+ self.mutex = threading.Lock()
def adjust(self, other):
with self.mutex:
diff --git a/kombu/compat.py b/kombu/compat.py
index 88874aad..88aedaa9 100644
--- a/kombu/compat.py
+++ b/kombu/compat.py
@@ -19,6 +19,9 @@ from .entity import Exchange, Queue
__all__ = ['Publisher', 'Consumer']
+# XXX compat attribute
+entry_to_queue = Queue.from_dict
+
def _iterconsume(connection, consumer, no_ack=False, limit=None):
consumer.consume(no_ack=no_ack)
diff --git a/kombu/compression.py b/kombu/compression.py
index 5c88fee4..c76adfc8 100644
--- a/kombu/compression.py
+++ b/kombu/compression.py
@@ -38,7 +38,7 @@ def register(encoder, decoder, content_type, aliases=[]):
def encoders():
"""Returns a list of available compression methods."""
- return _encoders.keys()
+ return list(_encoders)
def get_encoder(t):
diff --git a/kombu/connection.py b/kombu/connection.py
index 5c272ddf..ac90fd2d 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -24,16 +24,19 @@ from Queue import Empty
# (Issue #112)
from kombu import exceptions
from .log import get_logger
-from .transport import AMQP_ALIAS, get_transport_cls
+from .transport import get_transport_cls, supports_librabbitmq
from .utils import cached_property, retry_over_time
from .utils.compat import OrderedDict, LifoQueue as _LifoQueue
from .utils.url import parse_url
+RESOLVE_ALIASES = {'amqplib': 'amqp',
+ 'librabbitmq': 'amqp'}
+
_LOG_CONNECTION = os.environ.get('KOMBU_LOG_CONNECTION', False)
_LOG_CHANNEL = os.environ.get('KOMBU_LOG_CHANNEL', False)
__all__ = ['Connection', 'ConnectionPool', 'ChannelPool']
-URI_PASSTHROUGH = frozenset(['sqla', 'sqlalchemy'])
+URI_PASSTHROUGH = frozenset(['sqla', 'sqlalchemy', 'zeromq', 'zmq'])
logger = get_logger(__name__)
@@ -56,7 +59,12 @@ class Connection(object):
:keyword transport_options: A dict of additional connection arguments to
pass to alternate kombu channel implementations. Consult the transport
documentation for available options.
- :keyword insist: *Deprecated*
+ :keyword heartbeat: Heartbeat interval in int/float seconds.
+ Note that if heartbeats are enabled then the :meth:`heartbeat_check`
+ method must be called at an interval twice the frequency of the
+ heartbeat: e.g. if the heartbeat is 10, then the heartbeats must be
+ checked every 5 seconds (the rate can also be controlled by
+ the ``rate`` argument to :meth:`heartbeat_check``).
.. note::
@@ -94,13 +102,13 @@ class Connection(object):
password=None, virtual_host=None, port=None, insist=False,
ssl=False, transport=None, connect_timeout=5,
transport_options=None, login_method=None, uri_prefix=None,
- **kwargs):
+ heartbeat=0, **kwargs):
# have to spell the args out, just to get nice docstrings :(
params = {'hostname': hostname, 'userid': userid,
'password': password, 'virtual_host': virtual_host,
'port': port, 'insist': insist, 'ssl': ssl,
'transport': transport, 'connect_timeout': connect_timeout,
- 'login_method': login_method}
+ 'login_method': login_method, 'heartbeat': heartbeat}
if hostname and '://' in hostname:
if '+' in hostname[:hostname.index('://')]:
# e.g. sqla+mysql://root:masterkey@localhost/
@@ -127,7 +135,10 @@ class Connection(object):
self.declared_entities = set()
def _init_params(self, hostname, userid, password, virtual_host, port,
- insist, ssl, transport, connect_timeout, login_method):
+ insist, ssl, transport, connect_timeout, login_method, heartbeat):
+ transport = transport or 'amqp'
+ if transport == 'amqp' and supports_librabbitmq():
+ transport = 'librabbitmq'
self.hostname = hostname
self.userid = userid
self.password = password
@@ -138,6 +149,7 @@ class Connection(object):
self.connect_timeout = connect_timeout
self.ssl = ssl
self.transport_cls = transport
+ self.heartbeat = heartbeat
def _debug(self, msg, ident='[Kombu connection:0x%(id)x] ', **kwargs):
if self._logger: # pragma: no cover
@@ -159,6 +171,20 @@ class Connection(object):
'[Kombu channel:%(channel_id)s] ')
return chan
+ def heartbeat_check(self, rate=2):
+ """Verify that hartbeats are sent and received.
+
+ If the current transport does not support heartbeats then
+ this is a noop operation.
+
+ :keyword rate: Rate is how often the tick is called
+ compared to the actual heartbeat value. E.g. if
+ the heartbeat is set to 3 seconds, and the tick
+ is called every 3 / 2 seconds, then the rate is 2.
+
+ """
+ return self.transport.heartbeat_check(self.connection, rate=rate)
+
def drain_events(self, **kwargs):
"""Wait for a single event from the server.
@@ -379,11 +405,12 @@ class Connection(object):
def clone(self, **kwargs):
"""Create a copy of the connection with the same connection
settings."""
- return self.__class__(**dict(self._info(), **kwargs))
+ return self.__class__(**dict(self._info(resolve=False), **kwargs))
- def _info(self):
- transport_cls = self.transport_cls or 'amqp'
- transport_cls = {AMQP_ALIAS: 'amqp'}.get(transport_cls, transport_cls)
+ def _info(self, resolve=True):
+ transport_cls = self.transport_cls
+ if resolve:
+ transport_cls = RESOLVE_ALIASES.get(transport_cls, transport_cls)
D = self.transport.default_connection_params
hostname = self.hostname or D.get('hostname')
if self.uri_prefix:
@@ -399,7 +426,8 @@ class Connection(object):
('connect_timeout', self.connect_timeout),
('transport_options', self.transport_options),
('login_method', self.login_method or D.get('login_method')),
- ('uri_prefix', self.uri_prefix))
+ ('uri_prefix', self.uri_prefix),
+ ('heartbeat', self.heartbeat))
return info
def info(self):
@@ -631,8 +659,12 @@ class Connection(object):
return self.transport.eventmap(self.connection)
@property
+ def supports_heartbeats(self):
+ return self.transport.supports_heartbeats
+
+ @property
def is_evented(self):
- return getattr(self.transport, 'on_poll_start', None)
+ return self.transport.supports_ev
BrokerConnection = Connection
diff --git a/kombu/messaging.py b/kombu/messaging.py
index c441df4c..c177c088 100644
--- a/kombu/messaging.py
+++ b/kombu/messaging.py
@@ -20,6 +20,9 @@ from .utils import maybe_list
__all__ = ['Exchange', 'Queue', 'Producer', 'Consumer']
+# XXX compat attribute
+entry_to_queue = Queue.from_dict
+
class Producer(object):
"""Message Producer.
diff --git a/kombu/pidbox.py b/kombu/pidbox.py
index 8859dc35..49b4d4f4 100644
--- a/kombu/pidbox.py
+++ b/kombu/pidbox.py
@@ -101,6 +101,7 @@ class Node(object):
arguments = body.get('arguments')
if not destination or self.hostname in destination:
return self.dispatch(method, arguments, reply_to)
+ dispatch_from_message = handle_message
def reply(self, data, exchange, routing_key, **kwargs):
self.mailbox._publish_reply(data, exchange, routing_key,
diff --git a/kombu/serialization.py b/kombu/serialization.py
index 94a57ae1..5942408b 100644
--- a/kombu/serialization.py
+++ b/kombu/serialization.py
@@ -11,6 +11,7 @@ Serialization utilities.
from __future__ import absolute_import
import codecs
+import os
import sys
import pickle as pypickle
@@ -57,16 +58,24 @@ else:
# cPickle.loads does not support buffer() objects,
# but we can just create a StringIO and use load.
if sys.version_info[0] == 3:
- from io import StringIO
+ from io import BytesIO
else:
try:
- from cStringIO import StringIO # noqa
+ from cStringIO import StringIO as BytesIO # noqa
except ImportError:
- from StringIO import StringIO # noqa
+ from StringIO import StringIO as BytesIO # noqa
+
+#: Kombu requires Python 2.5 or later so we use protocol 2 by default.
+#: There's a new protocol (3) but this is only supported by Python 3.
+pickle_protocol = int(os.environ.get('PICKLE_PROTOCOL', 2))
+
+#: Kombu requires Python 2.5 or later so we use protocol 2 by default.
+#: There's a new protocol (3) but this is only supported by Python 3.
+pickle_protocol = int(os.environ.get('PICKLE_PROTOCOL', 2))
def pickle_loads(s, load=pickle_load):
- return load(StringIO(s))
+ return load(BytesIO(s))
class SerializerRegistry(object):
@@ -327,7 +336,11 @@ else:
def register_pickle():
"""The fastest serialization method, but restricts
you to python clients."""
- registry.register('pickle', pickle.dumps, unpickle,
+
+ def dumps(obj, dumper=pickle.dumps):
+ return dumper(obj, protocol=pickle_protocol)
+
+ registry.register('pickle', dumps, unpickle,
content_type='application/x-python-serialize',
content_encoding='binary')
diff --git a/kombu/tests/test_serialization.py b/kombu/tests/test_serialization.py
index 1a95a661..c4b8bd81 100644
--- a/kombu/tests/test_serialization.py
+++ b/kombu/tests/test_serialization.py
@@ -7,7 +7,7 @@ import sys
from kombu.serialization import (registry, register, SerializerNotInstalled,
raw_encode, register_yaml, register_msgpack,
- decode, bytes_t, pickle,
+ decode, bytes_t, pickle, pickle_protocol,
unregister, register_pickle)
from .utils import TestCase
@@ -37,7 +37,7 @@ json_data = ('{"int": 10, "float": 3.1415926500000002, '
'th\\u00e9 lazy dog"}')
# Pickle serialization tests
-pickle_data = pickle.dumps(py_data)
+pickle_data = pickle.dumps(py_data, protocol=pickle_protocol)
# YAML serialization tests
yaml_data = ('float: 3.1415926500000002\nint: 10\n'
@@ -204,9 +204,9 @@ class test_Serialization(TestCase):
content_encoding='binary'))
def test_pickle_encode(self):
- self.assertEqual(pickle_data,
- registry.encode(py_data,
- serializer='pickle')[-1])
+ self.assertEqual(pickle.loads(pickle_data),
+ pickle.loads(registry.encode(py_data,
+ serializer='pickle')[-1]))
def test_register(self):
register(None, None, None, None)
diff --git a/kombu/tests/transport/test_pyamqp.py b/kombu/tests/transport/test_pyamqp.py
new file mode 100644
index 00000000..b48ec153
--- /dev/null
+++ b/kombu/tests/transport/test_pyamqp.py
@@ -0,0 +1,162 @@
+from __future__ import absolute_import
+
+import sys
+
+from nose import SkipTest
+
+try:
+ import amqp # noqa
+except ImportError:
+ pyamqp = None # noqa
+else:
+ from kombu.transport import pyamqp
+from kombu.connection import Connection
+
+from kombu.tests.utils import TestCase
+from kombu.tests.utils import mask_modules, Mock
+
+
+class MockConnection(dict):
+
+ def __setattr__(self, key, value):
+ self[key] = value
+
+
+class test_Channel(TestCase):
+
+ def setUp(self):
+ if pyamqp is None:
+ raise SkipTest('py-amqp not installed')
+
+ class Channel(pyamqp.Channel):
+ wait_returns = []
+
+ def _x_open(self, *args, **kwargs):
+ pass
+
+ def wait(self, *args, **kwargs):
+ return self.wait_returns
+
+ def _send_method(self, *args, **kwargs):
+ pass
+
+ self.conn = Mock()
+ self.conn.channels = {}
+ self.channel = Channel(self.conn, 0)
+
+ def test_init(self):
+ self.assertFalse(self.channel.no_ack_consumers)
+
+ def test_prepare_message(self):
+ x = self.channel.prepare_message('foobar', 10,
+ 'application/data', 'utf-8',
+ properties={})
+ self.assertTrue(x)
+
+ def test_message_to_python(self):
+ message = Mock()
+ message.headers = {}
+ message.properties = {}
+ self.assertTrue(self.channel.message_to_python(message))
+
+ def test_close_resolves_connection_cycle(self):
+ self.assertIsNotNone(self.channel.connection)
+ self.channel.close()
+ self.assertIsNone(self.channel.connection)
+
+ def test_basic_consume_registers_ack_status(self):
+ self.channel.wait_returns = 'my-consumer-tag'
+ self.channel.basic_consume('foo', no_ack=True)
+ self.assertIn('my-consumer-tag', self.channel.no_ack_consumers)
+
+ self.channel.wait_returns = 'other-consumer-tag'
+ self.channel.basic_consume('bar', no_ack=False)
+ self.assertNotIn('other-consumer-tag', self.channel.no_ack_consumers)
+
+ self.channel.basic_cancel('my-consumer-tag')
+ self.assertNotIn('my-consumer-tag', self.channel.no_ack_consumers)
+
+
+class test_Transport(TestCase):
+
+ def setUp(self):
+ if pyamqp is None:
+ raise SkipTest('py-amqp not installed')
+ self.connection = Connection('pyamqp://')
+ self.transport = self.connection.transport
+
+ def test_create_channel(self):
+ connection = Mock()
+ self.transport.create_channel(connection)
+ connection.channel.assert_called_with()
+
+ def test_drain_events(self):
+ connection = Mock()
+ self.transport.drain_events(connection, timeout=10.0)
+ connection.drain_events.assert_called_with(timeout=10.0)
+
+ def test_dnspython_localhost_resolve_bug(self):
+
+ class Conn(object):
+
+ def __init__(self, **kwargs):
+ vars(self).update(kwargs)
+
+ self.transport.Connection = Conn
+ self.transport.client.hostname = 'localhost'
+ conn1 = self.transport.establish_connection()
+ self.assertEqual(conn1.host, '127.0.0.1:5672')
+
+ self.transport.client.hostname = 'example.com'
+ conn2 = self.transport.establish_connection()
+ self.assertEqual(conn2.host, 'example.com:5672')
+
+ def test_close_connection(self):
+ connection = Mock()
+ connection.client = Mock()
+ self.transport.close_connection(connection)
+
+ self.assertIsNone(connection.client)
+ connection.close.assert_called_with()
+
+ def test_verify_connection(self):
+ connection = Mock()
+ connection.channels = None
+ self.assertFalse(self.transport.verify_connection(connection))
+
+ connection.channels = {1: 1, 2: 2}
+ self.assertTrue(self.transport.verify_connection(connection))
+
+ @mask_modules('ssl')
+ def test_import_no_ssl(self):
+ pm = sys.modules.pop('amqp.connection')
+ try:
+ from amqp.connection import SSLError
+ self.assertEqual(SSLError.__module__, 'amqp.connection')
+ finally:
+ if pm is not None:
+ sys.modules['amqp.connection'] = pm
+
+
+class test_pyamqp(TestCase):
+
+ def setUp(self):
+ if pyamqp is None:
+ raise SkipTest('py-amqp not installed')
+
+ def test_default_port(self):
+
+ class Transport(pyamqp.Transport):
+ Connection = MockConnection
+
+ c = Connection(port=None, transport=Transport).connect()
+ self.assertEqual(c['host'],
+ '127.0.0.1:%s' % (Transport.default_port, ))
+
+ def test_custom_port(self):
+
+ class Transport(pyamqp.Transport):
+ Connection = MockConnection
+
+ c = Connection(port=1337, transport=Transport).connect()
+ self.assertEqual(c['host'], '127.0.0.1:1337')
diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py
index e08c4c14..af716ede 100644
--- a/kombu/tests/transport/virtual/test_base.py
+++ b/kombu/tests/transport/virtual/test_base.py
@@ -66,7 +66,7 @@ class test_QoS(TestCase):
self.q.append(i + 1, uuid())
self.assertFalse(self.q.can_consume())
- tag1 = self.q._delivered.keys()[0]
+ tag1 = iter(self.q._delivered).next()
self.q.ack(tag1)
self.assertTrue(self.q.can_consume())
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py
index bfed8aa0..1795fba3 100644
--- a/kombu/transport/SQS.py
+++ b/kombu/transport/SQS.py
@@ -160,13 +160,14 @@ class Channel(virtual.Channel):
def _new_queue(self, queue, **kwargs):
"""Ensures a queue exists in SQS."""
- queue = self.queue_name_prefix + queue
+ # Translate to SQS name for consistency with initial
+ # _queue_cache population.
+ queue = self.entity_name(self.queue_name_prefix + queue)
try:
return self._queue_cache[queue]
except KeyError:
q = self._queue_cache[queue] = self.sqs.create_queue(
- self.entity_name(queue),
- self.visibility_timeout)
+ queue, self.visibility_timeout)
return q
def _queue_bind(self, *args):
diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py
index f3d85ad5..63de88d8 100644
--- a/kombu/transport/__init__.py
+++ b/kombu/transport/__init__.py
@@ -12,19 +12,17 @@ from __future__ import absolute_import
import sys
-from kombu.syn import detect_environment
+from kombu.syn import _detect_environment
-DEFAULT_TRANSPORT = 'amqp'
-AMQP_TRANSPORT = 'kombu.transport.amqplib.Transport'
-AMQP_ALIAS = 'librabbitmq'
-if detect_environment() == 'default':
- try:
- import librabbitmq # noqa
- AMQP_TRANSPORT = 'kombu.transport.librabbitmq.Transport' # noqa
- AMQP_ALIAS = 'amqp' # noqa
- except ImportError:
- pass
+def supports_librabbitmq():
+ if _detect_environment() == 'default':
+ try:
+ import librabbitmq # noqa
+ return True
+ except ImportError:
+ pass
+ return False
def _ghettoq(name, new, alias=None):
@@ -48,7 +46,8 @@ def _ghettoq(name, new, alias=None):
TRANSPORT_ALIASES = {
- 'amqp': AMQP_TRANSPORT,
+ 'amqp': 'kombu.transport.amqplib.Transport',
+ 'pyamqp': 'kombu.transport.pyamqp.Transport',
'amqplib': 'kombu.transport.amqplib.Transport',
'librabbitmq': 'kombu.transport.librabbitmq.Transport',
'pika': 'kombu.transport.pika2.Transport',
@@ -70,6 +69,8 @@ TRANSPORT_ALIASES = {
'ghettoq.taproot.Beanstalk': _ghettoq('Beanstalk', 'beanstalk'),
'ghettoq.taproot.CouchDB': _ghettoq('CouchDB', 'couchdb'),
'filesystem': 'kombu.transport.filesystem.Transport',
+ 'zeromq': 'kombu.transport.zmq.Transport',
+ 'zmq': 'kombu.transport.zmq.Transport',
}
_transport_cache = {}
@@ -103,7 +104,6 @@ def get_transport_cls(transport=None):
the alias table will be consulted.
"""
- transport = transport or DEFAULT_TRANSPORT
if transport not in _transport_cache:
_transport_cache[transport] = _get_transport_cls(transport)
return _transport_cache[transport]
diff --git a/kombu/transport/amqplib.py b/kombu/transport/amqplib.py
index f87269af..57304254 100644
--- a/kombu/transport/amqplib.py
+++ b/kombu/transport/amqplib.py
@@ -10,6 +10,7 @@ amqplib transport.
"""
from __future__ import absolute_import
+import errno
import socket
try:
@@ -17,6 +18,7 @@ try:
except ImportError:
class SSLError(Exception): # noqa
pass
+from struct import unpack
from amqplib import client_0_8 as amqp
from amqplib.client_0_8 import transport
@@ -40,19 +42,47 @@ transport.AMQP_PROTOCOL_HEADER = str_to_bytes('AMQP\x01\x01\x08\x00')
# - fixes warnings when socket is not connected.
-class _TCPTransport(transport.TCPTransport):
+class TCPTransport(transport.TCPTransport):
+
+ def read_frame(self):
+ frame_type, channel, size = unpack('>BHI', self._read(7, True))
+ payload = self._read(size)
+ ch = ord(self._read(1))
+ if ch == 206: # '\xce'
+ return frame_type, channel, payload
+ else:
+ raise Exception(
+ 'Framing Error, received 0x%02x while expecting 0xce' % ch)
+
+ def _read(self, n, initial=False):
+ while len(self._read_buffer) < n:
+ try:
+ s = self.sock.recv(65536)
+ except socket.error, exc:
+ if not initial and exc.errno in (errno.EAGAIN, errno.EINTR):
+ continue
+ raise
+ if not s:
+ raise IOError('Socket closed')
+ self._read_buffer += s
+
+ result = self._read_buffer[:n]
+ self._read_buffer = self._read_buffer[n:]
+
+ return result
def __del__(self):
try:
- transport._AbstractTransport.__del__(self)
- except socket.error:
+ self.close()
+ except Exception:
pass
finally:
self.sock = None
-transport.TCPTransport = _TCPTransport
+
+transport.TCPTransport = TCPTransport
-class _SSLTransport(transport.SSLTransport):
+class SSLTransport(transport.SSLTransport):
def __init__(self, host, connect_timeout, ssl):
if isinstance(ssl, dict):
@@ -60,7 +90,41 @@ class _SSLTransport(transport.SSLTransport):
self.sslobj = None
transport._AbstractTransport.__init__(self, host, connect_timeout)
-transport.SSLTransport = _SSLTransport
+
+ def read_frame(self):
+ frame_type, channel, size = unpack('>BHI', self._read(7, True))
+ payload = self._read(size)
+ ch = ord(self._read(1))
+ if ch == 206: # '\xce'
+ return frame_type, channel, payload
+ else:
+ raise Exception(
+ 'Framing Error, received 0x%02x while expecting 0xce' % ch)
+
+ def _read(self, n, initial=False):
+ result = ''
+
+ while len(result) < n:
+ try:
+ s = self.sslobj.read(n - len(result))
+ except socket.error, exc:
+ if not initial and exc.errno in (errno.EAGAIN, errno.EINTR):
+ continue
+ raise
+ if not s:
+ raise IOError('Socket closed')
+ result += s
+
+ return result
+
+ def __del__(self):
+ try:
+ self.close()
+ except Exception:
+ pass
+ finally:
+ self.sock = None
+transport.SSLTransport = SSLTransport
class Connection(amqp.Connection): # pragma: no cover
@@ -250,6 +314,7 @@ class Transport(base.Transport):
nb_keep_draining = True
driver_name = "amqplib"
driver_type = "amqp"
+ supports_ev = True
def __init__(self, client, **kwargs):
self.client = client
diff --git a/kombu/transport/base.py b/kombu/transport/base.py
index bc934a84..b4b48687 100644
--- a/kombu/transport/base.py
+++ b/kombu/transport/base.py
@@ -201,6 +201,13 @@ class Transport(object):
#: Name of driver library (e.g. 'amqplib', 'redis', 'beanstalkc').
driver_name = 'N/A'
+ #: Whether this transports support heartbeats,
+ #: and that the :meth:`heartbeat_check` method has any effect.
+ supports_heartbeats = False
+
+ #: Set to true if the transport supports the AIO interface.
+ supports_ev = False
+
def __init__(self, client, **kwargs):
self.client = client
@@ -219,6 +226,9 @@ class Transport(object):
def drain_events(self, connection, **kwargs):
raise NotImplementedError('Subclass responsibility')
+ def heartbeat_check(self, connection, rate=2):
+ pass
+
def driver_version(self):
return 'N/A'
@@ -227,6 +237,15 @@ class Transport(object):
Unconvenient to use, and limited transport support."""
return {}
+ def on_poll_init(self, poller):
+ pass
+
+ def on_poll_start(self):
+ raise NotImplementedError('transport: no eventloop support')
+
+ def on_poll_empty(self):
+ pass
+
def verify_connection(self, connection):
return True
diff --git a/kombu/transport/beanstalk.py b/kombu/transport/beanstalk.py
index 6745bc1e..0cdd2f98 100644
--- a/kombu/transport/beanstalk.py
+++ b/kombu/transport/beanstalk.py
@@ -105,8 +105,9 @@ class Channel(virtual.Channel):
def _open(self):
conninfo = self.connection.client
+ host = conninfo.hostname or 'localhost'
port = conninfo.port or DEFAULT_PORT
- conn = beanstalkc.Connection(host=conninfo.hostname, port=port)
+ conn = beanstalkc.Connection(host=host, port=port)
conn.connect()
return conn
diff --git a/kombu/transport/librabbitmq.py b/kombu/transport/librabbitmq.py
index 62baec96..a3be2d3c 100644
--- a/kombu/transport/librabbitmq.py
+++ b/kombu/transport/librabbitmq.py
@@ -39,9 +39,9 @@ class Message(base.Message):
body=body,
delivery_info=info,
properties=props,
- delivery_tag=info['delivery_tag'],
- content_type=props['content_type'],
- content_encoding=props['content_encoding'],
+ delivery_tag=info.get('delivery_tag'),
+ content_type=props.get('content_type'),
+ content_encoding=props.get('content_encoding'),
headers=props.get('headers'))
@@ -77,6 +77,7 @@ class Transport(base.Transport):
driver_type = 'amqp'
driver_name = 'librabbitmq'
+ supports_ev = True
nb_keep_draining = True
def __init__(self, client, **kwargs):
diff --git a/kombu/transport/pika2.py b/kombu/transport/pika2.py
index bf1ab721..aa5ed029 100644
--- a/kombu/transport/pika2.py
+++ b/kombu/transport/pika2.py
@@ -67,8 +67,8 @@ class Channel(blocking.BlockingChannel, base.StdChannel):
return None, method, method._properties, method._body
def queue_purge(self, queue=None, nowait=False):
- return super(Channel, self).queue_purge(queue=queue,
- nowait=nowait).message_count
+ return super(Channel, self).\
+ queue_purge(queue=queue, nowait=nowait).method.message_count
def basic_publish(self, message, exchange, routing_key, mandatory=False,
immediate=False):
@@ -103,8 +103,7 @@ class Channel(blocking.BlockingChannel, base.StdChannel):
properties = spec.BasicProperties(priority=priority,
content_type=content_type,
content_encoding=content_encoding,
- headers=headers,
- **properties)
+ headers=headers)
return body, properties
def message_to_python(self, raw_message):
diff --git a/kombu/transport/pyamqp.py b/kombu/transport/pyamqp.py
new file mode 100644
index 00000000..2ad938c7
--- /dev/null
+++ b/kombu/transport/pyamqp.py
@@ -0,0 +1,136 @@
+"""
+kombu.transport.pyamqp
+======================
+
+pure python amqp transport.
+
+:copyright: (c) 2009 - 2012 by Ask Solem.
+:license: BSD, see LICENSE for more details.
+
+"""
+from __future__ import absolute_import
+
+import amqp
+
+from kombu.exceptions import StdChannelError
+from kombu.utils.amq_manager import get_manager
+
+from . import base
+
+DEFAULT_PORT = 5672
+
+
+class Message(base.Message):
+
+ def __init__(self, channel, msg, **kwargs):
+ props = msg.properties
+ super(Message, self).__init__(channel,
+ body=msg.body,
+ delivery_tag=msg.delivery_tag,
+ content_type=props.get('content_type'),
+ content_encoding=props.get('content_encoding'),
+ delivery_info=msg.delivery_info,
+ properties=msg.properties,
+ headers=props.get('application_headers') or {},
+ **kwargs)
+
+
+class Channel(amqp.Channel, base.StdChannel):
+ Message = Message
+
+ def prepare_message(self, message_data, priority=None,
+ content_type=None, content_encoding=None, headers=None,
+ properties=None):
+ """Encapsulate data into a AMQP message."""
+ return amqp.Message(message_data, priority=priority,
+ content_type=content_type,
+ content_encoding=content_encoding,
+ application_headers=headers,
+ **properties)
+
+ def message_to_python(self, raw_message):
+ """Convert encoded message body back to a Python value."""
+ return self.Message(self, raw_message)
+
+
+class Connection(amqp.Connection):
+ Channel = Channel
+
+
+class Transport(base.Transport):
+ Connection = Connection
+
+ default_port = DEFAULT_PORT
+
+ # it's very annoying that pyamqp sometimes raises AttributeError
+ # if the connection is lost, but nothing we can do about that here.
+ connection_errors = amqp.Connection.connection_errors
+ channel_errors = (StdChannelError, ) + amqp.Connection.channel_errors
+
+ nb_keep_draining = True
+ driver_name = "py-amqp"
+ driver_type = "amqp"
+ supports_heartbeats = True
+ supports_ev = True
+
+ def __init__(self, client, **kwargs):
+ self.client = client
+ self.default_port = kwargs.get("default_port") or self.default_port
+
+ def create_channel(self, connection):
+ return connection.channel()
+
+ def drain_events(self, connection, **kwargs):
+ return connection.drain_events(**kwargs)
+
+ def establish_connection(self):
+ """Establish connection to the AMQP broker."""
+ conninfo = self.client
+ for name, default_value in self.default_connection_params.items():
+ if not getattr(conninfo, name, None):
+ setattr(conninfo, name, default_value)
+ if conninfo.hostname == 'localhost':
+ conninfo.hostname = '127.0.0.1'
+ conn = self.Connection(host=conninfo.host,
+ userid=conninfo.userid,
+ password=conninfo.password,
+ login_method=conninfo.login_method,
+ virtual_host=conninfo.virtual_host,
+ insist=conninfo.insist,
+ ssl=conninfo.ssl,
+ connect_timeout=conninfo.connect_timeout,
+ heartbeat=conninfo.heartbeat)
+ conn.client = self.client
+ return conn
+
+ def close_connection(self, connection):
+ """Close the AMQP broker connection."""
+ connection.client = None
+ connection.close()
+
+ def is_alive(self, connection):
+ return connection.is_alive()
+
+ def verify_connection(self, connection):
+ return connection.channels is not None and self.is_alive(connection)
+
+ def eventmap(self, connection):
+ return {connection.sock: self.client.drain_nowait}
+
+ def on_poll_init(self, poller):
+ pass
+
+ def on_poll_start(self):
+ return {}
+
+ def heartbeat_check(self, connection, rate=2):
+ return connection.heartbeat_tick(rate=rate)
+
+ @property
+ def default_connection_params(self):
+ return {'userid': 'guest', 'password': 'guest',
+ 'port': self.default_port,
+ 'hostname': 'localhost', 'login_method': 'AMQPLAIN'}
+
+ def get_manager(self, *args, **kwargs):
+ return get_manager(self.client, *args, **kwargs)
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 9125c55e..5b826bfe 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -76,7 +76,7 @@ class QoS(virtual.QoS):
super(QoS, self).append(message, delivery_tag)
def restore_unacked(self):
- for tag in self._delivered.iterkeys():
+ for tag in self._delivered:
self.restore_by_tag(tag)
self._delivered.clear()
@@ -111,7 +111,7 @@ class QoS(virtual.QoS):
M, EX, RK = loads(p)
self.channel._do_restore_message(M, EX, RK)
- @cached_property
+ @property
def client(self):
return self.channel._avail_client
@@ -198,6 +198,17 @@ class MultiChannelPoller(object):
if channel.active_fanout_queues: # LISTEN mode?
self._register_LISTEN(channel)
+ def on_poll_init(self, poller):
+ self.poller = poller
+ for channel in self._channels:
+ channel.qos.restore_visible()
+
+ def on_poll_empty(self):
+ for channel in self._channels:
+ if channel.active_queues:
+ # only need to do this once, as they are not local to channel.
+ return channel.qos.restore_visible()
+
def handle_event(self, fileno, event):
if event & READ:
chan, type = self._fd_to_chan[fileno]
@@ -223,9 +234,8 @@ class MultiChannelPoller(object):
# - no new data, so try to restore messages.
# - reset active redis commands.
- for channel in self._channels:
- if channel.active_queues:
- channel.qos.restore_visible()
+ self.on_poll_empty()
+
raise Empty()
@property
@@ -289,7 +299,7 @@ class Channel(virtual.Channel):
def _restore(self, message, payload=None):
tag = message.delivery_tag
- P, _ = self.client.pipeline() \
+ P, _ = self._avail_client.pipeline() \
.hget(self.unacked_key, tag) \
.hdel(self.unacked_key, tag) \
.execute()
@@ -460,7 +470,7 @@ class Channel(virtual.Channel):
def get_table(self, exchange):
key = self.keyprefix_queue % exchange
- values = self.client.smembers(key)
+ values = self._avail_client.smembers(key)
if not values:
raise InconsistencyError(
'Queue list empty or key does not exist: %r' % (
@@ -481,7 +491,7 @@ class Channel(virtual.Channel):
self.connection.cycle.discard(self)
# delete fanout bindings
- for queue in self._fanout_queues.iterkeys():
+ for queue in self._fanout_queues:
if queue in self.auto_delete_queues:
self.queue_delete(queue)
@@ -599,6 +609,7 @@ class Transport(virtual.Transport):
polling_interval = None # disable sleep between unsuccessful polls.
default_port = DEFAULT_PORT
+ supports_ev = True
driver_type = 'redis'
driver_name = 'redis'
@@ -615,7 +626,7 @@ class Transport(virtual.Transport):
def on_poll_init(self, poller):
"""Called when hub starts."""
- self.cycle.poller = poller
+ self.cycle.on_poll_init(poller)
def on_poll_start(self):
"""Called by hub before each ``poll()``"""
@@ -623,6 +634,9 @@ class Transport(virtual.Transport):
cycle.on_poll_start()
return dict((fd, self.handle_event) for fd in cycle.fds)
+ def on_poll_empty(self):
+ self.cycle.on_poll_empty()
+
def handle_event(self, fileno, event):
"""Handle AIO event for one of our file descriptors."""
ret = self.cycle.handle_event(fileno, event)
diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py
index 6f8087b9..fe2983ed 100644
--- a/kombu/transport/virtual/__init__.py
+++ b/kombu/transport/virtual/__init__.py
@@ -433,7 +433,7 @@ class Channel(AbstractChannel, base.StdChannel):
self._queue_bind(exchange, *meta)
def list_bindings(self):
- for exchange in self.get_exchanges():
+ for exchange in self.state.exchanges:
table = self.get_table(exchange)
for routing_key, pattern, queue in table:
yield queue, exchange, routing_key
@@ -512,7 +512,7 @@ class Channel(AbstractChannel, base.StdChannel):
self.qos.prefetch_count = prefetch_count
def get_exchanges(self):
- return self.state.exchanges.keys()
+ return list(self.state.exchanges)
def get_table(self, exchange):
"""Get table of bindings for `exchange`."""
diff --git a/kombu/transport/zmq.py b/kombu/transport/zmq.py
new file mode 100644
index 00000000..369e8ff8
--- /dev/null
+++ b/kombu/transport/zmq.py
@@ -0,0 +1,290 @@
+"""
+kombu.transport.zmq
+===================
+
+ZeroMQ transport.
+
+"""
+from __future__ import absolute_import
+
+import errno
+import os
+import socket
+
+from cPickle import loads, dumps
+from Queue import Empty
+
+import zmq
+
+from kombu.exceptions import StdChannelError
+from kombu.log import get_logger
+from kombu.utils import cached_property
+from kombu.utils.eventio import poll, READ
+
+from . import virtual
+
+logger = get_logger('kombu.transport.zmq')
+
+DEFAULT_PORT = 5555
+DEFAULT_HWM = 128
+DEFAULT_INCR = 1
+
+
+class MultiChannelPoller(object):
+ eventflags = READ
+
+ def __init__(self):
+ # active channels
+ self._channels = set()
+ # file descriptor -> channel map
+ self._fd_to_chan = {}
+ # poll implementation (epoll/kqueue/select)
+ self.poller = poll()
+
+ def close(self):
+ for fd in self._fd_to_chan:
+ try:
+ self.poller.unregister(fd)
+ except KeyError:
+ pass
+ self._channels.clear()
+ self._fd_to_chan.clear()
+ self.poller = None
+
+ def add(self, channel):
+ self._channels.add(channel)
+
+ def discard(self, channel):
+ self._channels.discard(channel)
+ self._fd_to_chan.pop(channel.client.connection.fd, None)
+
+ def _register(self, channel):
+ conn = channel.client.connection
+ self._fd_to_chan[conn.fd] = channel
+ self.poller.register(conn.fd, self.eventflags)
+
+ def on_poll_start(self):
+ for channel in self._channels:
+ self._register(channel)
+
+ def handle_event(self, fileno, event):
+ chan = self._fd_to_chan[fileno]
+ return (chan.drain_events(), chan)
+
+ def get(self, timeout=None):
+ self.on_poll_start()
+
+ events = self.poller.poll(timeout)
+ for fileno, event in events or []:
+ return self.handle_event(fileno, event)
+
+ raise Empty()
+
+ @property
+ def fds(self):
+ return self._fd_to_chan
+
+
+class Client(object):
+ def __init__(self, uri='tcp://127.0.0.1', port=DEFAULT_PORT,
+ hwm=DEFAULT_HWM, swap_size=None, enable_sink=True, context=None):
+ try:
+ scheme, parts = uri.split('://')
+ except ValueError:
+ scheme = 'tcp'
+ parts = uri
+ endpoints = parts.split(';')
+
+ if scheme != 'tcp':
+ raise NotImplementedError('Currently only TCP can be used')
+
+ self.context = context or zmq.Context.instance()
+
+ if enable_sink:
+ self.sink = self.context.socket(zmq.PULL)
+ self.sink.bind('tcp://*:%s' % port)
+ else:
+ self.sink = None
+
+ self.vent = self.context.socket(zmq.PUSH)
+ self.vent.setsockopt(zmq.HWM, hwm)
+ if swap_size:
+ self.vent.setsockopt(zmq.SWAP, swap_size)
+
+ for endpoint in endpoints:
+ if scheme == 'tcp' and ':' not in endpoint:
+ endpoint += ':' + str(DEFAULT_PORT)
+
+ endpoint = ''.join([scheme, '://', endpoint])
+
+ self.connect(endpoint)
+
+ def connect(self, endpoint):
+ self.vent.connect(endpoint)
+
+ def get(self, queue=None, timeout=None):
+ try:
+ return self.sink.recv(flags=zmq.NOBLOCK)
+ except zmq.ZMQError, e:
+ if e.errno == zmq.EAGAIN:
+ raise socket.error(errno.EAGAIN, e.strerror)
+ else:
+ raise
+
+ def put(self, queue, message, **kwargs):
+ return self.vent.send(message)
+
+ def close(self):
+ if self.sink and not self.sink.closed:
+ self.sink.close()
+ if not self.vent.closed:
+ self.vent.close()
+
+ @property
+ def connection(self):
+ if self.sink:
+ return self.sink
+ return self.vent
+
+
+class Channel(virtual.Channel):
+ Client = Client
+
+ hwm = DEFAULT_HWM
+ swap_size = None
+ enable_sink = True
+ port_incr = DEFAULT_INCR
+
+ from_transport_options = (virtual.Channel.from_transport_options
+ + ('hwm',
+ 'swap_size',
+ 'enable_sink',
+ 'port_incr'))
+
+ def __init__(self, *args, **kwargs):
+ super_ = super(Channel, self)
+ super_.__init__(*args, **kwargs)
+
+ # Evaluate socket
+ self.client.connection.closed
+
+ self.connection.cycle.add(self)
+ self.connection_errors = self.connection.connection_errors
+
+ def _get(self, queue, timeout=None):
+ try:
+ return loads(self.client.get(queue, timeout))
+ except socket.error, exc:
+ if exc.errno == errno.EAGAIN and timeout != 0:
+ raise Empty()
+ else:
+ raise
+
+ def _put(self, queue, message, **kwargs):
+ self.client.put(queue, dumps(message, -1), **kwargs)
+
+ def _purge(self, queue):
+ return 0
+
+ def _poll(self, cycle, timeout=None):
+ return cycle.get(timeout=timeout)
+
+ def close(self):
+ if not self.closed:
+ self.connection.cycle.discard(self)
+ try:
+ self.__dict__['client'].close()
+ except KeyError:
+ pass
+ super(Channel, self).close()
+
+ def _prepare_port(self, port):
+ return (port + self.channel_id - 1) * self.port_incr
+
+ def _create_client(self):
+ conninfo = self.connection.client
+ port = self._prepare_port(conninfo.port or DEFAULT_PORT)
+ return self.Client(uri=conninfo.hostname or 'tcp://127.0.0.1',
+ port=port,
+ hwm=self.hwm,
+ swap_size=self.swap_size,
+ enable_sink=self.enable_sink,
+ context=self.connection.context)
+
+ @cached_property
+ def client(self):
+ return self._create_client()
+
+
+class Transport(virtual.Transport):
+ Channel = Channel
+
+ default_port = DEFAULT_PORT
+ driver_type = 'zeromq'
+ driver_name = 'zmq'
+
+ connection_errors = (zmq.ZMQError,)
+ channel_errors = (zmq.ZMQError, StdChannelError,)
+
+ supports_ev = True
+ polling_interval = None
+ nb_keep_draining = True
+
+ def __init__(self, *args, **kwargs):
+ super(Transport, self).__init__(*args, **kwargs)
+
+ self.cycle = MultiChannelPoller()
+
+ def driver_version(self):
+ return zmq.__version__
+
+ def on_poll_init(self, poller):
+ self.cycle.poller = poller
+
+ def on_poll_start(self):
+ cycle = self.cycle
+ cycle.on_poll_start()
+ return dict((fd, self.handle_event) for fd in cycle.fds)
+
+ def handle_event(self, fileno, event):
+ evt = self.cycle.handle_event(fileno, event)
+ self._handle_event(evt)
+
+ def drain_events(self, connection, timeout=None):
+ more_to_read = False
+ for channel in connection.channels:
+ try:
+ evt = channel.cycle.get(timeout=timeout)
+ except socket.error, e:
+ if e.errno == errno.EAGAIN:
+ continue
+ raise
+ else:
+ connection._handle_event((evt, channel))
+ more_to_read = True
+ if not more_to_read:
+ raise socket.error(errno.EAGAIN, os.strerror(errno.EAGAIN))
+
+ def _handle_event(self, evt):
+ item, channel = evt
+ message, queue = item
+ if not queue or queue not in self._callbacks:
+ raise KeyError(
+ "Received message for queue '%s' without consumers: %s" % (
+ queue, message))
+ self._callbacks[queue](message)
+
+ def establish_connection(self):
+ self.context.closed
+ return super(Transport, self).establish_connection()
+
+ def close_connection(self, connection):
+ super(Transport, self).close_connection(connection)
+ try:
+ connection.__dict__['context'].term()
+ except KeyError:
+ pass
+
+ @cached_property
+ def context(self):
+ return zmq.Context(1)
diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py
index 8e230afe..b402ff6c 100644
--- a/kombu/utils/__init__.py
+++ b/kombu/utils/__init__.py
@@ -153,7 +153,6 @@ def retry_over_time(fun, catch, args=[], kwargs={}, errback=None,
interval_range = fxrange(interval_start,
interval_max + interval_start,
interval_step, repeatlast=True)
-
for retries, interval in enumerate(interval_range): # for infinity
try:
return fun(*args, **kwargs)
@@ -164,10 +163,10 @@ def retry_over_time(fun, catch, args=[], kwargs={}, errback=None,
callback()
if errback:
errback(exc, interval)
- for i in fxrange(stop=interval):
+ for i in fxrange(stop=interval or 1.0):
if i and callback:
callback()
- sleep(1.0)
+ sleep(i)
def emergency_dump_state(state, open_file=open, dump=None):
@@ -264,7 +263,7 @@ def reprkwargs(kwargs, sep=', ', fmt='%s=%s'):
return sep.join(fmt % (k, _safe_repr(v)) for k, v in kwargs.iteritems())
-def reprcall(name, args=(), kwargs=(), sep=', '):
+def reprcall(name, args=(), kwargs={}, sep=', '):
return '%s(%s%s%s)' % (name, sep.join(map(_safe_repr, args or ())),
(args and kwargs) and sep or '',
reprkwargs(kwargs, sep))
diff --git a/requirements/test-ci.txt b/requirements/test-ci.txt
new file mode 100644
index 00000000..19049d44
--- /dev/null
+++ b/requirements/test-ci.txt
@@ -0,0 +1,3 @@
+coverage>=3.0
+PyYAML
+msgpack-python<0.2.0 # 0.2.0 dropped 2.5 support
diff --git a/requirements/test-jython.txt b/requirements/test-jython.txt
deleted file mode 100644
index 8931aca4..00000000
--- a/requirements/test-jython.txt
+++ /dev/null
@@ -1,6 +0,0 @@
-nose
-nose-cover3
-unittest2>=0.5.0
-coverage>=3.0
-mock>=0.7.0
-simplejson
diff --git a/requirements/test-py3k.txt b/requirements/test-py3k.txt
index 87e69702..3b76ba50 100644
--- a/requirements/test-py3k.txt
+++ b/requirements/test-py3k.txt
@@ -1,5 +1,3 @@
distribute
nose
-nose-cover3
-coverage>=3.0
mock>=0.7.0
diff --git a/requirements/test-pypy.txt b/requirements/test-pypy.txt
deleted file mode 100644
index c0af83aa..00000000
--- a/requirements/test-pypy.txt
+++ /dev/null
@@ -1,7 +0,0 @@
-nose
-nose-cover3
-unittest2>=0.5.0
-coverage>=3.0
-mock>=0.7.0
-simplejson
-PyYAML==3.09 # 3.10 dropped 2.4 support
diff --git a/requirements/test.txt b/requirements/test.txt
index 75350c84..5cf3124c 100644
--- a/requirements/test.txt
+++ b/requirements/test.txt
@@ -1,8 +1,4 @@
nose
nose-cover3
unittest2>=0.5.0
-coverage>=3.0
mock
-simplejson
-PyYAML
-msgpack-python<0.2.0 # 0.2.0 dropped 2.5 support
diff --git a/tox.ini b/tox.ini
index d7ea538d..de05bad6 100644
--- a/tox.ini
+++ b/tox.ini
@@ -12,6 +12,7 @@ recreate = True
basepython = python3.2
changedir = .tox
deps = -r{toxinidir}/requirements/default.txt
+ -r{toxinidir}/requirements/test-ci.txt
commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
{envbindir}/easy_install -U distribute
{envbindir}/pip install \
@@ -39,6 +40,7 @@ commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
basepython = python2.7
deps = -r{toxinidir}/requirements/default.txt
-r{toxinidir}/requirements/test.txt
+ -r{toxinidir}/requirements/test-ci.txt
commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
nosetests --with-xunit \
--xunit-file={toxinidir}/nosetests.xml \
@@ -50,6 +52,7 @@ commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
basepython = python2.6
deps = -r{toxinidir}/requirements/default.txt
-r{toxinidir}/requirements/test.txt
+ -r{toxinidir}/requirements/test-ci.txt
commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
nosetests --with-xunit \
--xunit-file={toxinidir}/nosetests.xml \
@@ -61,6 +64,8 @@ commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
basepython = python2.5
deps = -r{toxinidir}/requirements/default.txt
-r{toxinidir}/requirements/test.txt
+ -r{toxinidir}/requirements/test-py25.txt
+ -r{toxinidir}/requirements/test-ci.txt
commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
nosetests --with-xunit \
--xunit-file={toxinidir}/nosetests.xml \
@@ -71,7 +76,8 @@ commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
[testenv:pypy]
basepython = pypy
deps = -r{toxinidir}/requirements/default.txt
- -r{toxinidir}/requirements/test-pypy.txt
+ -r{toxinidir}/requirements/test.txt
+ -r{toxinidir}/requirements/test-ci.txt
commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
nosetests --with-xunit \
--xunit-file={toxinidir}/nosetests.xml \
@@ -84,6 +90,8 @@ basepython = jython
recreate = True
where = .tox
deps = -r{toxinidir}/requirements/default.txt
- -r{toxinidir}/requirements/test-jython.txt
+ -r{toxinidir}/requirements/test.txt
+ -r{toxinidir}/requirements/test-py25.txt
+ -r{toxinidir}/requirements/test-ci.txt
commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
{toxinidir}/extra/release/jython-run-tests {toxinidir}