diff options
author | Ask Solem <ask@celeryproject.org> | 2011-04-06 15:59:16 +0200 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2011-04-06 15:59:16 +0200 |
commit | 348babd81b2a33c39fc128aa5e5ed570d6fa9eb1 (patch) | |
tree | b76adb188ec3582e104ccb29a66bd2bc4c9ac452 /funtests | |
parent | ee2ae37640d6f224d300d868580c2a5f8a19f1fd (diff) | |
download | kombu-348babd81b2a33c39fc128aa5e5ed570d6fa9eb1.tar.gz |
Redis: Fixes serious issue where larger messages may be lost when consuming using the Redis transport (ask/celery issue #318). Thanks to Simon Zimmerman, Andy McCurdy and Honza Král!
Diffstat (limited to 'funtests')
-rw-r--r-- | funtests/tests/test_beanstalk.py | 1 | ||||
-rw-r--r-- | funtests/tests/test_pika.py | 6 | ||||
-rw-r--r-- | funtests/transport.py | 86 |
3 files changed, 81 insertions, 12 deletions
diff --git a/funtests/tests/test_beanstalk.py b/funtests/tests/test_beanstalk.py index d882cc25..5af87f7f 100644 --- a/funtests/tests/test_beanstalk.py +++ b/funtests/tests/test_beanstalk.py @@ -5,6 +5,7 @@ class test_beanstalk(transport.TransportCase): transport = "beanstalk" prefix = "beanstalk" event_loop_max = 10 + message_size_limit = 47662 def after_connect(self, connection): connection.channel().client diff --git a/funtests/tests/test_pika.py b/funtests/tests/test_pika.py index 02e51303..807df1c3 100644 --- a/funtests/tests/test_pika.py +++ b/funtests/tests/test_pika.py @@ -1,11 +1,17 @@ from funtests import transport +from nose import SkipTest class test_pika_blocking(transport.TransportCase): transport = "syncpika" prefix = "syncpika" + def test_produce__consume_large_messages(self, *args, **kwargs): + raise SkipTest("test currently fails for sync pika") class test_pika_async(transport.TransportCase): transport = "pika" prefix = "pika" + + def test_produce__consume_large_messages(self, *args, **kwargs): + raise SkipTest("test currently fails for async pika") diff --git a/funtests/transport.py b/funtests/transport.py index d44201da..a25dafb3 100644 --- a/funtests/transport.py +++ b/funtests/transport.py @@ -1,14 +1,27 @@ +import random import socket +import string +import sys import time import unittest2 as unittest +import warnings from nose import SkipTest from kombu import BrokerConnection from kombu import Producer, Consumer, Exchange, Queue +if sys.version_info >= (2, 5): + from hashlib import sha256 as _digest +else: + from sha import new as _digest -def consumeN(conn, consumer, n=1): + +def say(msg): + sys.stderr.write(unicode(msg) + "\n") + + +def consumeN(conn, consumer, n=1, timeout=30): messages = [] def callback(message_data, message): @@ -18,11 +31,18 @@ def consumeN(conn, consumer, n=1): prev, consumer.callbacks = consumer.callbacks, [callback] consumer.consume() + seconds = 0 while True: try: conn.drain_events(timeout=1) except socket.timeout: - pass + seconds += 1 + msg = "Received %s/%s messages. %s seconds passed." % ( + len(messages), n, seconds) + if seconds >= timeout: + raise socket.timeout(msg) + if seconds > 1: + say(msg) if len(messages) >= n: break @@ -40,6 +60,8 @@ class TransportCase(unittest.TestCase): connected = False skip_test_reason = None + message_size_limit = None + def before_connect(self): pass @@ -76,13 +98,14 @@ class TransportCase(unittest.TestCase): self.connected = True def verify_alive(self): - if not self.connected: - raise SkipTest(self.skip_test_reason) + if self.transport: + if not self.connected: + raise SkipTest(self.skip_test_reason) + return True def test_produce__consume(self): - if not self.transport: + if not self.verify_alive(): return - self.verify_alive() chan1 = self.connection.channel() consumer = Consumer(chan1, self.queue) consumer.queues[0].purge() @@ -93,13 +116,54 @@ class TransportCase(unittest.TestCase): chan1.close() self.purge([self.queue.name]) + def _digest(self, data): + return _digest(data).hexdigest() + + def test_produce__consume_large_messages(self, bytes=1048576, n=10, + charset=string.punctuation + string.letters + string.digits): + if not self.verify_alive(): + return + bytes = min(filter(None, [bytes, self.message_size_limit])) + messages = ["".join(random.choice(charset) + for j in xrange(bytes)) + "--%s" % n + for i in xrange(n)] + digests = [] + chan1 = self.connection.channel() + consumer = Consumer(chan1, self.queue) + for queue in consumer.queues: + queue.purge() + producer = Producer(chan1, self.exchange) + for i, message in enumerate(messages): + producer.publish({"text": message, + "i": i}, routing_key=self.prefix) + digests.append(self._digest(message)) + + received = [(msg["i"], msg["text"]) + for msg in consumeN(self.connection, consumer, n)] + self.assertEqual(len(received), n) + ordering = [i for i, _ in received] + if ordering != range(n): + warnings.warn( + "%s did not deliver messages in FIFO order: %r" % ( + self.transport, ordering)) + + for i, text in received: + if text != messages[i]: + raise AssertionError("%i: %r is not %r" % ( + i, text[-100:], messages[i][-100:])) + self.assertEqual(self._digest(text), digests[i]) + + chan1.close() + self.purge([self.queue.name]) + + + def P(self, rest): return "%s.%s" % (self.prefix, rest) def test_produce__consume_multiple(self): - if not self.transport: + if not self.verify_alive(): return - self.verify_alive() chan1 = self.connection.channel() producer = Producer(chan1, self.exchange) b1 = Queue(self.P("b1"), self.exchange, "b1")(chan1) @@ -121,9 +185,8 @@ class TransportCase(unittest.TestCase): self.purge([self.P("b1"), self.P("b2"), self.P("b3")]) def test_timeout(self): - if not self.transport: + if not self.verify_alive(): return - self.verify_alive() chan = self.connection.channel() self.purge([self.queue.name]) consumer = Consumer(chan, self.queue) @@ -132,9 +195,8 @@ class TransportCase(unittest.TestCase): consumer.cancel() def test_basic_get(self): - if not self.transport: + if not self.verify_alive(): return - self.verify_alive() chan1 = self.connection.channel() producer = Producer(chan1, self.exchange) chan2 = self.connection.channel() |