summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2013-04-17 15:19:29 +0100
committerAsk Solem <ask@celeryproject.org>2013-04-17 15:19:29 +0100
commit4146118de3bcbb63af226aef5887b931e0f10adc (patch)
treeaed26321402b43faeb95705dbb547451448c67c9
parent970e27db8e5d05a31574f4b90c151832d620f772 (diff)
parentb639fef1f55b39d1c862a0b57d9e4ab06ec77ad4 (diff)
downloadkombu-4146118de3bcbb63af226aef5887b931e0f10adc.tar.gz
Merge branch '2.5'
Conflicts: Changelog kombu/utils/eventio.py
-rw-r--r--AUTHORS2
-rw-r--r--Changelog21
-rw-r--r--funtests/setup.py54
-rw-r--r--funtests/tests/__init__.py2
-rw-r--r--funtests/tests/test_SQS.py12
-rw-r--r--funtests/tests/test_amqplib.py4
-rw-r--r--funtests/tests/test_beanstalk.py4
-rw-r--r--funtests/tests/test_couchdb.py4
-rw-r--r--funtests/tests/test_django.py20
-rw-r--r--funtests/tests/test_mongodb.py18
-rw-r--r--funtests/tests/test_pyamqp.py4
-rw-r--r--funtests/tests/test_redis.py4
-rw-r--r--funtests/tests/test_sqla.py8
-rw-r--r--funtests/tests/test_zookeeper.py4
-rw-r--r--funtests/transport.py56
-rw-r--r--kombu/entity.py13
-rw-r--r--kombu/messaging.py35
-rw-r--r--kombu/tests/__init__.py16
-rw-r--r--kombu/tests/test_entities.py5
-rw-r--r--kombu/tests/test_messaging.py6
-rw-r--r--kombu/transport/librabbitmq.py6
-rw-r--r--kombu/transport/redis.py2
-rw-r--r--kombu/utils/eventio.py32
23 files changed, 217 insertions, 115 deletions
diff --git a/AUTHORS b/AUTHORS
index f71d658e..54fe0a17 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -20,6 +20,7 @@ C Anthony Risinger <anthony+corvisa.com@xtfx.me>
Christophe Chauvet <christophe.chauvet@gmail.com>
Christopher Grebs <cg@webshox.org>
Clay Gerrard <clay.gerrard@gmail.com>
+Dan LaMotte <lamotte85@gmail.com>
Dan McGee <dan@archlinux.org>
Dane Guempel <daneguempel@gmail.com>
David Clymer <david@zettazebra.com>
@@ -61,6 +62,7 @@ Petar Radosevic <petar@wunki.org>
Peter Hoffmann <tosh54@gmail.com>
Pierre Riteau <priteau@ci.uchicago.edu>
Rafael Duran Castaneda <rafadurancastaneda@gmail.com>
+Rafal Malinowski <malinowski@red-sky.pl>
Ralf Nyren <ralf-github@nyren.net>
Rob Ottaway <robottaway@gmail.com>
Rumyana Neykova <rumi.neykova@gmail.com>
diff --git a/Changelog b/Changelog
index 3c89d029..51fbc9a9 100644
--- a/Changelog
+++ b/Changelog
@@ -19,6 +19,27 @@
>>> from kombu import enable_insecure_serializers
>>> enable_insecure_serializers()
+.. _version-2.5.11:
+
+2.5.11
+======
+:release-date: TBA
+
+- Adds ``passive`` option to :class:`~kombu.Exchange`.
+
+ Contributed by Rafal Malinowski
+
+- librabbitmq: Now raises :exc:`NotImplementedError`
+ if ``ssl`` option enabled.
+
+ The librabbitmq library does not support ssl,
+ but you can use stunnel or change to the ``pyamqp://`` transport
+ instead.
+
+ Fix contributed by Dan LaMotte.
+
+- eventio: select implementation now removes bad file descriptors.
+
.. _version-2.5.10:
2.5.10
diff --git a/funtests/setup.py b/funtests/setup.py
index a7ad8b4e..9bea0b97 100644
--- a/funtests/setup.py
+++ b/funtests/setup.py
@@ -31,38 +31,38 @@ But you can execute the tests by running the command:
setup(
name='kombu-funtests',
- version="DEV",
- description="Functional test suite for Kombu",
- author="Ask Solem",
- author_email="ask@celeryproject.org",
- url="http://github.com/celery/kombu",
- platforms=["any"],
+ version='DEV',
+ description='Functional test suite for Kombu',
+ author='Ask Solem',
+ author_email='ask@celeryproject.org',
+ url='http://github.com/celery/kombu',
+ platforms=['any'],
packages=[],
data_files=[],
zip_safe=False,
- cmdclass={"install": no_install},
- test_suite="nose.collector",
+ cmdclass={'install': no_install},
+ test_suite='nose.collector',
build_requires=[
- "nose",
- "nose-cover3",
- "unittest2",
- "coverage>=3.0",
- "simplejson",
- "PyYAML",
- "msgpack-python",
- "pymongo",
- "couchdb",
- "kazoo",
- "beanstalkc",
- "kombu-sqlalchemy",
- "django",
- "django-kombu",
+ 'nose',
+ 'nose-cover3',
+ 'unittest2',
+ 'coverage>=3.0',
+ 'simplejson',
+ 'PyYAML',
+ 'msgpack-python',
+ 'pymongo',
+ 'couchdb',
+ 'kazoo',
+ 'beanstalkc',
+ 'kombu-sqlalchemy',
+ 'django',
+ 'django-kombu',
],
classifiers=[
- "Operating System :: OS Independent",
- "Programming Language :: Python",
- "License :: OSI Approved :: BSD License",
- "Intended Audience :: Developers",
+ 'Operating System :: OS Independent',
+ 'Programming Language :: Python',
+ 'License :: OSI Approved :: BSD License',
+ 'Intended Audience :: Developers',
],
- long_description="Do not install this package",
+ long_description='Do not install this package',
)
diff --git a/funtests/tests/__init__.py b/funtests/tests/__init__.py
index 0300fda8..41cbef67 100644
--- a/funtests/tests/__init__.py
+++ b/funtests/tests/__init__.py
@@ -1,8 +1,6 @@
import os
import sys
-print("HELLO")
-
sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))
print(sys.path[0])
sys.path.insert(0, os.getcwd())
diff --git a/funtests/tests/test_SQS.py b/funtests/tests/test_SQS.py
index c063a161..ebcbce2b 100644
--- a/funtests/tests/test_SQS.py
+++ b/funtests/tests/test_SQS.py
@@ -6,8 +6,8 @@ from funtests import transport
class test_SQS(transport.TransportCase):
- transport = "SQS"
- prefix = "sqs"
+ transport = 'SQS'
+ prefix = 'sqs'
event_loop_max = 100
message_size_limit = 4192 # SQS max body size / 2.
reliable_purge = False
@@ -15,10 +15,10 @@ class test_SQS(transport.TransportCase):
# even in simple cases.
def before_connect(self):
- if "AWS_ACCESS_KEY_ID" not in os.environ:
- raise SkipTest("Missing envvar AWS_ACCESS_KEY_ID")
- if "AWS_SECRET_ACCESS_KEY" not in os.environ:
- raise SkipTest("Missing envvar AWS_SECRET_ACCESS_KEY")
+ if 'AWS_ACCESS_KEY_ID' not in os.environ:
+ raise SkipTest('Missing envvar AWS_ACCESS_KEY_ID')
+ if 'AWS_SECRET_ACCESS_KEY' not in os.environ:
+ raise SkipTest('Missing envvar AWS_SECRET_ACCESS_KEY')
def after_connect(self, connection):
connection.channel().sqs
diff --git a/funtests/tests/test_amqplib.py b/funtests/tests/test_amqplib.py
index ec91250d..fc5b0d35 100644
--- a/funtests/tests/test_amqplib.py
+++ b/funtests/tests/test_amqplib.py
@@ -2,5 +2,5 @@ from funtests import transport
class test_amqplib(transport.TransportCase):
- transport = "amqplib"
- prefix = "amqplib"
+ transport = 'amqplib'
+ prefix = 'amqplib'
diff --git a/funtests/tests/test_beanstalk.py b/funtests/tests/test_beanstalk.py
index 5af87f7f..b0601edd 100644
--- a/funtests/tests/test_beanstalk.py
+++ b/funtests/tests/test_beanstalk.py
@@ -2,8 +2,8 @@ from funtests import transport
class test_beanstalk(transport.TransportCase):
- transport = "beanstalk"
- prefix = "beanstalk"
+ transport = 'beanstalk'
+ prefix = 'beanstalk'
event_loop_max = 10
message_size_limit = 47662
diff --git a/funtests/tests/test_couchdb.py b/funtests/tests/test_couchdb.py
index a6c5d6df..633e17b6 100644
--- a/funtests/tests/test_couchdb.py
+++ b/funtests/tests/test_couchdb.py
@@ -2,8 +2,8 @@ from funtests import transport
class test_couchdb(transport.TransportCase):
- transport = "couchdb"
- prefix = "couchdb"
+ transport = 'couchdb'
+ prefix = 'couchdb'
event_loop_max = 100
def after_connect(self, connection):
diff --git a/funtests/tests/test_django.py b/funtests/tests/test_django.py
index 86f551f3..7f509ea7 100644
--- a/funtests/tests/test_django.py
+++ b/funtests/tests/test_django.py
@@ -6,8 +6,8 @@ from funtests import transport
class test_django(transport.TransportCase):
- transport = "django"
- prefix = "django"
+ transport = 'django'
+ prefix = 'django'
event_loop_max = 10
def before_connect(self):
@@ -17,16 +17,16 @@ class test_django(transport.TransportCase):
try:
import djkombu # noqa
except ImportError:
- raise SkipTest("django-kombu not installed")
+ raise SkipTest('django-kombu not installed')
from django.conf import settings
if not settings.configured:
- settings.configure(DATABASE_ENGINE="sqlite3",
- DATABASE_NAME=":memory:",
- DATABASES={"default": {
- "ENGINE": "django.db.backends.sqlite3",
- "NAME": ":memory:"}},
- INSTALLED_APPS=("djkombu", ))
+ settings.configure(DATABASE_ENGINE='sqlite3',
+ DATABASE_NAME=':memory:',
+ DATABASES={'default': {
+ 'ENGINE': 'django.db.backends.sqlite3',
+ 'NAME': ':memory:'}},
+ INSTALLED_APPS=('djkombu', ))
from django.core.management import call_command
- call_command("syncdb")
+ call_command('syncdb')
setup_django()
diff --git a/funtests/tests/test_mongodb.py b/funtests/tests/test_mongodb.py
index c51e3a5a..630c1c59 100644
--- a/funtests/tests/test_mongodb.py
+++ b/funtests/tests/test_mongodb.py
@@ -5,8 +5,8 @@ from funtests import transport
class test_mongodb(transport.TransportCase):
- transport = "mongodb"
- prefix = "mongodb"
+ transport = 'mongodb'
+ prefix = 'mongodb'
event_loop_max = 100
def after_connect(self, connection):
@@ -14,11 +14,11 @@ class test_mongodb(transport.TransportCase):
self.c = self.connection # shortcut
- def test_fanout(self, name="test_mongodb_fanout"):
+ def test_fanout(self, name='test_mongodb_fanout'):
c = self.connection
- self.e = Exchange(name, type="fanout")
+ self.e = Exchange(name, type='fanout')
self.q = Queue(name, exchange=self.e, routing_key=name)
- self.q2 = Queue(name + "2", exchange=self.e, routing_key=name + "2")
+ self.q2 = Queue(name + '2', exchange=self.e, routing_key=name + '2')
channel = c.default_channel
producer = Producer(channel, self.e)
@@ -27,9 +27,9 @@ class test_mongodb(transport.TransportCase):
self.q2(channel).declare()
for i in xrange(10):
- producer.publish({"foo": i}, routing_key=name)
+ producer.publish({'foo': i}, routing_key=name)
for i in xrange(10):
- producer.publish({"foo": i}, routing_key=name + "2")
+ producer.publish({'foo': i}, routing_key=name + '2')
_received1 = []
_received2 = []
@@ -55,7 +55,7 @@ class test_mongodb(transport.TransportCase):
# queue.delete
for i in xrange(10):
- producer.publish({"foo": i}, routing_key=name)
+ producer.publish({'foo': i}, routing_key=name)
self.assertTrue(self.q(channel).get())
self.q(channel).delete()
self.q(channel).declare()
@@ -63,7 +63,7 @@ class test_mongodb(transport.TransportCase):
# queue.purge
for i in xrange(10):
- producer.publish({"foo": i}, routing_key=name + "2")
+ producer.publish({'foo': i}, routing_key=name + '2')
self.assertTrue(self.q2(channel).get())
self.q2(channel).purge()
self.assertIsNone(self.q2(channel).get())
diff --git a/funtests/tests/test_pyamqp.py b/funtests/tests/test_pyamqp.py
index 8d493a80..5ca0c2e0 100644
--- a/funtests/tests/test_pyamqp.py
+++ b/funtests/tests/test_pyamqp.py
@@ -2,5 +2,5 @@ from funtests import transport
class test_pyamqp(transport.TransportCase):
- transport = "pyamqp"
- prefix = "pyamqp"
+ transport = 'pyamqp'
+ prefix = 'pyamqp'
diff --git a/funtests/tests/test_redis.py b/funtests/tests/test_redis.py
index 83aef8b8..50d84e2d 100644
--- a/funtests/tests/test_redis.py
+++ b/funtests/tests/test_redis.py
@@ -2,8 +2,8 @@ from funtests import transport
class test_redis(transport.TransportCase):
- transport = "redis"
- prefix = "redis"
+ transport = 'redis'
+ prefix = 'redis'
def after_connect(self, connection):
client = connection.channel().client
diff --git a/funtests/tests/test_sqla.py b/funtests/tests/test_sqla.py
index 2a01a4a2..49d3ea1a 100644
--- a/funtests/tests/test_sqla.py
+++ b/funtests/tests/test_sqla.py
@@ -4,13 +4,13 @@ from funtests import transport
class test_sqla(transport.TransportCase):
- transport = "sqlalchemy"
- prefix = "sqlalchemy"
+ transport = 'sqlalchemy'
+ prefix = 'sqlalchemy'
event_loop_max = 10
- connection_options = {"hostname": "sqlite://"}
+ connection_options = {'hostname': 'sqlite://'}
def before_connect(self):
try:
import sqlakombu # noqa
except ImportError:
- raise SkipTest("kombu-sqlalchemy not installed")
+ raise SkipTest('kombu-sqlalchemy not installed')
diff --git a/funtests/tests/test_zookeeper.py b/funtests/tests/test_zookeeper.py
index 0d1274ef..b30eee38 100644
--- a/funtests/tests/test_zookeeper.py
+++ b/funtests/tests/test_zookeeper.py
@@ -2,8 +2,8 @@ from funtests import transport
class test_zookeeper(transport.TransportCase):
- transport = "zookeeper"
- prefix = "zookeeper"
+ transport = 'zookeeper'
+ prefix = 'zookeeper'
event_loop_max = 100
def after_connect(self, connection):
diff --git a/funtests/transport.py b/funtests/transport.py
index 1b4e65ef..434fb2b7 100644
--- a/funtests/transport.py
+++ b/funtests/transport.py
@@ -20,7 +20,7 @@ else:
def say(msg):
- sys.stderr.write(unicode(msg) + "\n")
+ sys.stderr.write(unicode(msg) + '\n')
def consumeN(conn, consumer, n=1, timeout=30):
@@ -39,7 +39,7 @@ def consumeN(conn, consumer, n=1, timeout=30):
conn.drain_events(timeout=1)
except socket.timeout:
seconds += 1
- msg = "Received %s/%s messages. %s seconds passed." % (
+ msg = 'Received %s/%s messages. %s seconds passed.' % (
len(messages), n, seconds)
if seconds >= timeout:
raise socket.timeout(msg)
@@ -83,7 +83,7 @@ class TransportCase(unittest.TestCase):
self.skip_test_reason = str(exc)
else:
self.do_connect()
- self.exchange = Exchange(self.prefix, "direct")
+ self.exchange = Exchange(self.prefix, 'direct')
self.queue = Queue(self.prefix, self.exchange, self.prefix)
def purge(self, names):
@@ -101,9 +101,9 @@ class TransportCase(unittest.TestCase):
def get_connection(self, **options):
if self.userid:
- options.setdefault("userid", self.userid)
+ options.setdefault('userid', self.userid)
if self.password:
- options.setdefault("password", self.password)
+ options.setdefault('password', self.password)
return Connection(transport=self.transport, **options)
def do_connect(self):
@@ -112,7 +112,7 @@ class TransportCase(unittest.TestCase):
self.connection.connect()
self.after_connect(self.connection)
except self.connection.connection_errors:
- self.skip_test_reason = "%s transport can't connect" % (
+ self.skip_test_reason = '%s transport cannot connect' % (
self.transport, )
else:
self.connected = True
@@ -133,9 +133,9 @@ class TransportCase(unittest.TestCase):
consumer = chan1.Consumer(self.queue)
self.purge_consumer(consumer)
producer = chan1.Producer(self.exchange)
- producer.publish({"foo": "bar"}, routing_key=self.prefix)
+ producer.publish({'foo': 'bar'}, routing_key=self.prefix)
message = consumeN(self.connection, consumer)
- self.assertDictEqual(message[0], {"foo": "bar"})
+ self.assertDictEqual(message[0], {'foo': 'bar'})
chan1.close()
self.purge([self.queue.name])
@@ -148,7 +148,7 @@ class TransportCase(unittest.TestCase):
producer = chan1.Producer(self.exchange)
for i in xrange(10):
- producer.publish({"foo": "bar"}, routing_key=self.prefix)
+ producer.publish({'foo': 'bar'}, routing_key=self.prefix)
if self.reliable_purge:
self.assertEqual(consumer.purge(), 10)
self.assertEqual(consumer.purge(), 0)
@@ -167,8 +167,8 @@ class TransportCase(unittest.TestCase):
if not self.verify_alive():
return
bytes = min(filter(None, [bytes, self.message_size_limit]))
- messages = ["".join(random.choice(charset)
- for j in xrange(bytes)) + "--%s" % n
+ messages = [''.join(random.choice(charset)
+ for j in xrange(bytes)) + '--%s' % n
for i in xrange(n)]
digests = []
chan1 = self.connection.channel()
@@ -176,22 +176,22 @@ class TransportCase(unittest.TestCase):
self.purge_consumer(consumer)
producer = chan1.Producer(self.exchange)
for i, message in enumerate(messages):
- producer.publish({"text": message,
- "i": i}, routing_key=self.prefix)
+ producer.publish({'text': message,
+ 'i': i}, routing_key=self.prefix)
digests.append(self._digest(message))
- received = [(msg["i"], msg["text"])
+ received = [(msg['i'], msg['text'])
for msg in consumeN(self.connection, consumer, n)]
self.assertEqual(len(received), n)
ordering = [i for i, _ in received]
if ordering != range(n) and not self.suppress_disorder_warning:
warnings.warn(
- "%s did not deliver messages in FIFO order: %r" % (
+ '%s did not deliver messages in FIFO order: %r' % (
self.transport, ordering))
for i, text in received:
if text != messages[i]:
- raise AssertionError("%i: %r is not %r" % (
+ raise AssertionError('%i: %r is not %r' % (
i, text[-100:], messages[i][-100:]))
self.assertEqual(self._digest(text), digests[i])
@@ -199,30 +199,30 @@ class TransportCase(unittest.TestCase):
self.purge([self.queue.name])
def P(self, rest):
- return "%s%s%s" % (self.prefix, self.sep, rest)
+ return '%s%s%s' % (self.prefix, self.sep, rest)
def test_produce__consume_multiple(self):
if not self.verify_alive():
return
chan1 = self.connection.channel()
producer = chan1.Producer(self.exchange)
- b1 = Queue(self.P("b1"), self.exchange, "b1")(chan1)
- b2 = Queue(self.P("b2"), self.exchange, "b2")(chan1)
- b3 = Queue(self.P("b3"), self.exchange, "b3")(chan1)
+ b1 = Queue(self.P('b1'), self.exchange, 'b1')(chan1)
+ b2 = Queue(self.P('b2'), self.exchange, 'b2')(chan1)
+ b3 = Queue(self.P('b3'), self.exchange, 'b3')(chan1)
[q.declare() for q in (b1, b2, b3)]
self.purge([b1.name, b2.name, b3.name])
- producer.publish("b1", routing_key="b1")
- producer.publish("b2", routing_key="b2")
- producer.publish("b3", routing_key="b3")
+ producer.publish('b1', routing_key='b1')
+ producer.publish('b2', routing_key='b2')
+ producer.publish('b3', routing_key='b3')
chan1.close()
chan2 = self.connection.channel()
consumer = chan2.Consumer([b1, b2, b3])
messages = consumeN(self.connection, consumer, 3)
- self.assertItemsEqual(messages, ["b1", "b2", "b3"])
+ self.assertItemsEqual(messages, ['b1', 'b2', 'b3'])
chan2.close()
- self.purge([self.P("b1"), self.P("b2"), self.P("b3")])
+ self.purge([self.P('b1'), self.P('b2'), self.P('b3')])
def test_timeout(self):
if not self.verify_alive():
@@ -242,10 +242,10 @@ class TransportCase(unittest.TestCase):
chan1 = self.connection.channel()
producer = chan1.Producer(self.exchange)
chan2 = self.connection.channel()
- queue = Queue(self.P("basic_get"), self.exchange, "basic_get")
+ queue = Queue(self.P('basic_get'), self.exchange, 'basic_get')
queue = queue(chan2)
queue.declare()
- producer.publish({"basic.get": "this"}, routing_key="basic_get")
+ producer.publish({'basic.get': 'this'}, routing_key='basic_get')
chan1.close()
for i in range(self.event_loop_max):
@@ -253,7 +253,7 @@ class TransportCase(unittest.TestCase):
if m:
break
time.sleep(0.1)
- self.assertEqual(m.payload, {"basic.get": "this"})
+ self.assertEqual(m.payload, {'basic.get': 'this'})
self.purge([queue.name])
chan2.close()
diff --git a/kombu/entity.py b/kombu/entity.py
index 32b199c3..682a2628 100644
--- a/kombu/entity.py
+++ b/kombu/entity.py
@@ -123,6 +123,7 @@ class Exchange(MaybeChannelBound):
type = 'direct'
durable = True
auto_delete = False
+ passive = False
delivery_mode = PERSISTENT_DELIVERY_MODE
attrs = (
@@ -130,6 +131,7 @@ class Exchange(MaybeChannelBound):
('type', None),
('arguments', None),
('durable', bool),
+ ('passive', bool),
('auto_delete', bool),
('delivery_mode', lambda m: DELIVERY_MODES.get(m) or m),
)
@@ -143,7 +145,7 @@ class Exchange(MaybeChannelBound):
def __hash__(self):
return hash('E|%s' % (self.name, ))
- def declare(self, nowait=False, passive=False):
+ def declare(self, nowait=False, passive=None):
"""Declare the exchange.
Creates the exchange on the broker.
@@ -152,6 +154,7 @@ class Exchange(MaybeChannelBound):
response will not be waited for. Default is :const:`False`.
"""
+ passive = self.passive if passive is None else passive
if self.name:
return self.channel.exchange_declare(
exchange=self.name, type=self.type, durable=self.durable,
@@ -541,8 +544,8 @@ class Queue(MaybeChannelBound):
Returns the message instance if a message was available,
or :const:`None` otherwise.
- :keyword no_ack: If set messages received does not have to
- be acknowledged.
+ :keyword no_ack: If enabled the broker will automatically
+ ack messages.
This method provides direct access to the messages in a
queue using a synchronous dialogue, designed for
@@ -575,8 +578,8 @@ class Queue(MaybeChannelBound):
can use the same consumer tags. If this field is empty
the server will generate a unique tag.
- :keyword no_ack: If set messages received does not have to
- be acknowledged.
+ :keyword no_ack: If enabled the broker will automatically ack
+ messages.
:keyword nowait: Do not wait for a reply.
diff --git a/kombu/messaging.py b/kombu/messaging.py
index a235da2e..b7de2396 100644
--- a/kombu/messaging.py
+++ b/kombu/messaging.py
@@ -280,8 +280,12 @@ class Consumer(object):
#: consume from.
queues = None
- #: Flag for message acknowledgment disabled/enabled.
- #: Enabled by default.
+ #: Flag for automatic message acknowledgment.
+ #: If enabled the messages are automatically acknowledged by the
+ #: broker. This can increase performance but means that you
+ #: have no control of when the message is removed.
+ #:
+ #: Disabled by default.
no_ack = None
#: By default all entities will be declared at instantiation, if you
@@ -403,6 +407,12 @@ class Consumer(object):
pass
def add_queue(self, queue):
+ """Add a queue to the list of queues to consume from.
+
+ This will not start consuming from the queue,
+ for that you will have to call :meth:`consume` after.
+
+ """
queue = queue(self.channel)
if self.auto_declare:
queue.declare()
@@ -410,9 +420,26 @@ class Consumer(object):
return queue
def add_queue_from_dict(self, queue, **options):
+ """This method is deprecated.
+
+ Instead please use::
+
+ consumer.add_queue(Queue.from_dict(d))
+
+ """
return self.add_queue(Queue.from_dict(queue, **options))
def consume(self, no_ack=None):
+ """Start consuming messages.
+
+ Can be called multiple times, but note that while it
+ will consume from new queues added since the last call,
+ it will not cancel consuming from removed queues (
+ use :meth:`cancel_by_queue`).
+
+ :param no_ack: See :attr:`no_ack`.
+
+ """
if self.queues:
no_ack = self.no_ack if no_ack is None else no_ack
@@ -445,10 +472,12 @@ class Consumer(object):
self.channel.basic_cancel(tag)
def consuming_from(self, queue):
+ """Returns :const:`True` if the consumer is currently
+ consuming from queue'."""
name = queue
if isinstance(queue, Queue):
name = queue.name
- return any(q.name == name for q in self.queues)
+ return name in self._active_tags
def purge(self):
"""Purge messages from all queues.
diff --git a/kombu/tests/__init__.py b/kombu/tests/__init__.py
index ad8a62c3..82bf6bb1 100644
--- a/kombu/tests/__init__.py
+++ b/kombu/tests/__init__.py
@@ -1,6 +1,7 @@
from __future__ import absolute_import
import anyjson
+import atexit
import os
import sys
@@ -16,6 +17,21 @@ except ImportError:
anyjson.force_implementation('simplejson')
+def teardown():
+ # Workaround for multiprocessing bug where logging
+ # is attempted after global already collected at shutdown.
+ cancelled = set()
+ try:
+ import multiprocessing.util
+ cancelled.add(multiprocessing.util._exit_function)
+ except (AttributeError, ImportError):
+ pass
+
+ atexit._exithandlers[:] = [
+ e for e in atexit._exithandlers if e[0] not in cancelled
+ ]
+
+
def find_distribution_modules(name=__name__, file=__file__):
current_dist_depth = len(name.split('.')) - 1
current_dist = os.path.join(os.path.dirname(file),
diff --git a/kombu/tests/test_entities.py b/kombu/tests/test_entities.py
index efd27e8c..20f4446e 100644
--- a/kombu/tests/test_entities.py
+++ b/kombu/tests/test_entities.py
@@ -15,7 +15,6 @@ from .utils import Mock
def get_conn():
return Connection(transport=Transport)
-
class test_binding(TestCase):
def test_constructor(self):
@@ -129,6 +128,10 @@ class test_Exchange(TestCase):
exc = Exchange('foo', 'direct', delivery_mode='transient')
self.assertEqual(exc.delivery_mode, Exchange.TRANSIENT_DELIVERY_MODE)
+ def test_set_passive_mode(self):
+ exc = Exchange('foo', 'direct', passive=True)
+ self.assertTrue(exc.passive)
+
def test_set_persistent_delivery_mode(self):
exc = Exchange('foo', 'direct', delivery_mode='persistent')
self.assertEqual(exc.delivery_mode, Exchange.PERSISTENT_DELIVERY_MODE)
diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py
index 296b02a2..9069d6fe 100644
--- a/kombu/tests/test_messaging.py
+++ b/kombu/tests/test_messaging.py
@@ -258,9 +258,13 @@ class test_Consumer(TestCase):
def test_consuming_from(self):
consumer = self.connection.Consumer()
- consumer.queues[:] = [Queue('a'), Queue('b')]
+ consumer.queues[:] = [Queue('a'), Queue('b'), Queue('d')]
+ consumer._active_tags = {'a': 1, 'b': 2}
+
self.assertFalse(consumer.consuming_from(Queue('c')))
self.assertFalse(consumer.consuming_from('c'))
+ self.assertFalse(consumer.consuming_from(Queue('d')))
+ self.assertFalse(consumer.consuming_from('d'))
self.assertTrue(consumer.consuming_from(Queue('a')))
self.assertTrue(consumer.consuming_from(Queue('b')))
self.assertTrue(consumer.consuming_from('b'))
diff --git a/kombu/transport/librabbitmq.py b/kombu/transport/librabbitmq.py
index e3211d65..ee6246c1 100644
--- a/kombu/transport/librabbitmq.py
+++ b/kombu/transport/librabbitmq.py
@@ -29,6 +29,10 @@ from . import base
DEFAULT_PORT = 5672
+NO_SSL_ERROR = """\
+ssl not supported by librabbitmq, please use pyamqp:// or stunnel\
+"""
+
class Message(base.Message):
@@ -99,6 +103,8 @@ class Transport(base.Transport):
for name, default_value in items(self.default_connection_params):
if not getattr(conninfo, name, None):
setattr(conninfo, name, default_value)
+ if conninfo.ssl:
+ raise NotImplementedError(NO_SSL_ERROR)
conn = self.Connection(host=conninfo.host,
userid=conninfo.userid,
password=conninfo.password,
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index a6fed4d9..7c9c0606 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -205,7 +205,7 @@ class MultiChannelPoller(object):
for fd in values(self._chan_to_sock):
try:
self.poller.unregister(fd)
- except KeyError:
+ except (KeyError, ValueError):
pass
self._channels.clear()
self._fd_to_chan.clear()
diff --git a/kombu/utils/eventio.py b/kombu/utils/eventio.py
index 2bd23c5b..0cebee2f 100644
--- a/kombu/utils/eventio.py
+++ b/kombu/utils/eventio.py
@@ -10,7 +10,7 @@ from __future__ import absolute_import
import errno
import socket
-from select import select as _selectf
+from select import select as _selectf, error as _selecterr
try:
from select import epoll
@@ -53,6 +53,11 @@ READ = POLL_READ = 0x001
WRITE = POLL_WRITE = 0x004
ERR = POLL_ERR = 0x008 | 0x010
+try:
+ SELECT_BAD_FD = set((errno.EBADF, errno.WSAENOTSOCK))
+except AttributeError:
+ SELECT_BAD_FD = set((errno.EBADF,))
+
class Poller(object):
@@ -79,11 +84,9 @@ class _epoll(Poller):
def unregister(self, fd):
try:
self._epoll.unregister(fd)
- except socket.error:
- pass
- except ValueError:
+ except (socket.error, ValueError, KeyError):
pass
- except IOError as exc:
+ except (IOError, OSError) as exc:
if get_errno(exc) != errno.ENOENT:
raise
@@ -191,13 +194,30 @@ class _select(Poller):
if events & READ:
self._rfd.add(fd)
+ def _remove_bad(self):
+ for fd in self._rfd | self._wfd | self._efd:
+ try:
+ _selectf([fd], [], [], 0)
+ except _selecterr, exc:
+ if get_errno(exc) in SELECT_BAD_FD:
+ self.unregister(fd)
+
def unregister(self, fd):
self._rfd.discard(fd)
self._wfd.discard(fd)
self._efd.discard(fd)
def _poll(self, timeout):
- read, write, error = _selectf(self._rfd, self._wfd, self._efd, timeout)
+ try:
+ read, write, error = _selectf(
+ self._rfd, self._wfd, self._efd, timeout,
+ )
+ except _selecterr, exc:
+ if get_errno(exc) == errno.EINTR:
+ return
+ elif get_errno(exc) in SELECT_BAD_FD:
+ self._remove_bad()
+
events = {}
for fd in read:
if not isinstance(fd, int):