diff options
author | Ask Solem <ask@celeryproject.org> | 2011-06-13 12:16:26 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2011-06-13 12:16:26 +0100 |
commit | c83caeb1afafa793e8c597cef904f446412925b9 (patch) | |
tree | d807304ab4cd2186a1ec7a5563f81b6c675775d8 /funtests | |
parent | 91bc3eb946b13a8be83e56e376529df782e49ab3 (diff) | |
parent | a9bd9ebc8b97cdbf155bdde3e13cb859834ddc23 (diff) | |
download | kombu-c83caeb1afafa793e8c597cef904f446412925b9.tar.gz |
Merge branch 'master' into SQS-SDB
Diffstat (limited to 'funtests')
-rw-r--r-- | funtests/tests/test_SQS.py | 1 | ||||
-rw-r--r-- | funtests/transport.py | 60 |
2 files changed, 47 insertions, 14 deletions
diff --git a/funtests/tests/test_SQS.py b/funtests/tests/test_SQS.py index f178a719..c063a161 100644 --- a/funtests/tests/test_SQS.py +++ b/funtests/tests/test_SQS.py @@ -10,6 +10,7 @@ class test_SQS(transport.TransportCase): prefix = "sqs" event_loop_max = 100 message_size_limit = 4192 # SQS max body size / 2. + reliable_purge = False suppress_disorder_warning = True # does not guarantee FIFO order, # even in simple cases. diff --git a/funtests/transport.py b/funtests/transport.py index ced4cca1..765e9d41 100644 --- a/funtests/transport.py +++ b/funtests/transport.py @@ -10,7 +10,7 @@ import weakref from nose import SkipTest from kombu import BrokerConnection -from kombu import Producer, Consumer, Exchange, Queue +from kombu import Exchange, Queue from kombu.tests.utils import skip_if_quick if sys.version_info >= (2, 5): @@ -62,6 +62,7 @@ class TransportCase(unittest.TestCase): event_loop_max = 100 connection_options = {} suppress_disorder_warning = False + reliable_purge = True connected = False skip_test_reason = None @@ -87,7 +88,16 @@ class TransportCase(unittest.TestCase): def purge(self, names): chan = self.connection.channel() - map(chan.queue_purge, names) + total = 0 + for queue in names: + while 1: + # ensure the queue is completly empty + purged = chan.queue_purge(queue=queue) + if not purged: + break + total += purged + chan.close() + return total def get_connection(self, **options): if self.userid: @@ -113,19 +123,40 @@ class TransportCase(unittest.TestCase): raise SkipTest(self.skip_test_reason) return True + def purge_consumer(self, consumer): + return self.purge([queue.name for queue in consumer.queues]) + def test_produce__consume(self): if not self.verify_alive(): return chan1 = self.connection.channel() - consumer = Consumer(chan1, self.queue) - consumer.queues[0].purge() - producer = Producer(chan1, self.exchange) + consumer = chan1.Consumer(self.queue) + self.purge_consumer(consumer) + producer = chan1.Producer(self.exchange) producer.publish({"foo": "bar"}, routing_key=self.prefix) message = consumeN(self.connection, consumer) self.assertDictEqual(message[0], {"foo": "bar"}) chan1.close() self.purge([self.queue.name]) + def test_purge(self): + if not self.verify_alive(): + return + chan1 = self.connection.channel() + consumer = chan1.Consumer(self.queue) + self.purge_consumer(consumer) + + producer = chan1.Producer(self.exchange) + for i in xrange(10): + producer.publish({"foo": "bar"}, routing_key=self.prefix) + if self.reliable_purge: + self.assertEqual(consumer.purge(), 10) + self.assertEqual(consumer.purge(), 0) + else: + purged = 0 + while purged < 9: + purged += self.purge_consumer(consumer) + def _digest(self, data): return _digest(data).hexdigest() @@ -140,10 +171,9 @@ class TransportCase(unittest.TestCase): for i in xrange(n)] digests = [] chan1 = self.connection.channel() - consumer = Consumer(chan1, self.queue) - for queue in consumer.queues: - queue.purge() - producer = Producer(chan1, self.exchange) + consumer = chan1.Consumer(self.queue) + self.purge_consumer(consumer) + producer = chan1.Producer(self.exchange) for i, message in enumerate(messages): producer.publish({"text": message, "i": i}, routing_key=self.prefix) @@ -174,12 +204,12 @@ class TransportCase(unittest.TestCase): if not self.verify_alive(): return chan1 = self.connection.channel() - producer = Producer(chan1, self.exchange) + producer = chan1.Producer(self.exchange) b1 = Queue(self.P("b1"), self.exchange, "b1")(chan1) b2 = Queue(self.P("b2"), self.exchange, "b2")(chan1) b3 = Queue(self.P("b3"), self.exchange, "b3")(chan1) [q.declare() for q in (b1, b2, b3)] - [q.purge() for q in (b1, b2, b3)] + self.purge([b1.name, b2.name, b3.name]) producer.publish("b1", routing_key="b1") producer.publish("b2", routing_key="b2") @@ -187,7 +217,7 @@ class TransportCase(unittest.TestCase): chan1.close() chan2 = self.connection.channel() - consumer = Consumer(chan2, [b1, b2, b3]) + consumer = chan2.Consumer([b1, b2, b3]) messages = consumeN(self.connection, consumer, 3) self.assertItemsEqual(messages, ["b1", "b2", "b3"]) chan2.close() @@ -198,16 +228,17 @@ class TransportCase(unittest.TestCase): return chan = self.connection.channel() self.purge([self.queue.name]) - consumer = Consumer(chan, self.queue) + consumer = chan.Consumer(self.queue) self.assertRaises(socket.timeout, self.connection.drain_events, timeout=0.3) consumer.cancel() + chan.close() def test_basic_get(self): if not self.verify_alive(): return chan1 = self.connection.channel() - producer = Producer(chan1, self.exchange) + producer = chan1.Producer(self.exchange) chan2 = self.connection.channel() queue = Queue(self.P("basic_get"), self.exchange, "basic_get") queue = queue(chan2) @@ -221,6 +252,7 @@ class TransportCase(unittest.TestCase): break time.sleep(0.1) self.assertEqual(m.payload, {"basic.get": "this"}) + self.purge([queue.name]) chan2.close() def test_cyclic_reference_transport(self): |