From 897ee5c3e17c225b24465e9691bae689b4955831 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Tue, 7 Jun 2011 15:20:09 +0100 Subject: Passes functional tests --- funtests/tests/test_SQS.py | 1 + funtests/transport.py | 58 +++++++++++++++++++++++++++++++++++----------- 2 files changed, 46 insertions(+), 13 deletions(-) (limited to 'funtests') diff --git a/funtests/tests/test_SQS.py b/funtests/tests/test_SQS.py index 4fc31a0a..56a4cefe 100644 --- a/funtests/tests/test_SQS.py +++ b/funtests/tests/test_SQS.py @@ -10,6 +10,7 @@ class test_SQS(transport.TransportCase): prefix = "sqs" event_loop_max = 100 message_size_limit = 4192 # SQS max body size / 2. + reliable_purge = False suppress_disorder_warning = True # does not guarantee FIFO order, # even in simple cases. diff --git a/funtests/transport.py b/funtests/transport.py index ced4cca1..add18647 100644 --- a/funtests/transport.py +++ b/funtests/transport.py @@ -62,6 +62,7 @@ class TransportCase(unittest.TestCase): event_loop_max = 100 connection_options = {} suppress_disorder_warning = False + reliable_purge = True connected = False skip_test_reason = None @@ -87,7 +88,16 @@ class TransportCase(unittest.TestCase): def purge(self, names): chan = self.connection.channel() - map(chan.queue_purge, names) + total = 0 + for queue in names: + while 1: + # ensure the queue is completly empty + purged = chan.queue_purge(queue=queue) + if not purged: + break + total += purged + chan.close() + return total def get_connection(self, **options): if self.userid: @@ -113,19 +123,40 @@ class TransportCase(unittest.TestCase): raise SkipTest(self.skip_test_reason) return True + def purge_consumer(self, consumer): + return self.purge([queue.name for queue in consumer.queues]) + def test_produce__consume(self): if not self.verify_alive(): return chan1 = self.connection.channel() - consumer = Consumer(chan1, self.queue) - consumer.queues[0].purge() - producer = Producer(chan1, self.exchange) + consumer = chan1.Consumer(self.queue) + self.purge_consumer(consumer) + producer = chan1.Producer(self.exchange) producer.publish({"foo": "bar"}, routing_key=self.prefix) message = consumeN(self.connection, consumer) self.assertDictEqual(message[0], {"foo": "bar"}) chan1.close() self.purge([self.queue.name]) + def test_purge(self): + if not self.verify_alive(): + return + chan1 = self.connection.channel() + consumer = chan1.Consumer(self.queue) + self.purge_consumer(consumer) + + producer = chan1.Producer(self.exchange) + for i in xrange(10): + producer.publish({"foo": "bar"}, routing_key=self.prefix) + if self.reliable_purge: + self.assertEqual(consumer.purge(), 10) + self.assertEqual(consumer.purge(), 0) + else: + purged = 0 + while purged < 9: + purged += self.purge_consumer(consumer) + def _digest(self, data): return _digest(data).hexdigest() @@ -140,10 +171,9 @@ class TransportCase(unittest.TestCase): for i in xrange(n)] digests = [] chan1 = self.connection.channel() - consumer = Consumer(chan1, self.queue) - for queue in consumer.queues: - queue.purge() - producer = Producer(chan1, self.exchange) + consumer = chan1.Consumer(self.queue) + self.purge_consumer(consumer) + producer = chan1.Producer(self.exchange) for i, message in enumerate(messages): producer.publish({"text": message, "i": i}, routing_key=self.prefix) @@ -174,12 +204,12 @@ class TransportCase(unittest.TestCase): if not self.verify_alive(): return chan1 = self.connection.channel() - producer = Producer(chan1, self.exchange) + producer = chan1.Producer(self.exchange) b1 = Queue(self.P("b1"), self.exchange, "b1")(chan1) b2 = Queue(self.P("b2"), self.exchange, "b2")(chan1) b3 = Queue(self.P("b3"), self.exchange, "b3")(chan1) [q.declare() for q in (b1, b2, b3)] - [q.purge() for q in (b1, b2, b3)] + self.purge([b1.name, b2.name, b3.name]) producer.publish("b1", routing_key="b1") producer.publish("b2", routing_key="b2") @@ -187,7 +217,7 @@ class TransportCase(unittest.TestCase): chan1.close() chan2 = self.connection.channel() - consumer = Consumer(chan2, [b1, b2, b3]) + consumer = chan2.Consumer([b1, b2, b3]) messages = consumeN(self.connection, consumer, 3) self.assertItemsEqual(messages, ["b1", "b2", "b3"]) chan2.close() @@ -198,16 +228,17 @@ class TransportCase(unittest.TestCase): return chan = self.connection.channel() self.purge([self.queue.name]) - consumer = Consumer(chan, self.queue) + consumer = chan.Consumer(self.queue) self.assertRaises(socket.timeout, self.connection.drain_events, timeout=0.3) consumer.cancel() + chan.close() def test_basic_get(self): if not self.verify_alive(): return chan1 = self.connection.channel() - producer = Producer(chan1, self.exchange) + producer = chan1.Producer(self.exchange) chan2 = self.connection.channel() queue = Queue(self.P("basic_get"), self.exchange, "basic_get") queue = queue(chan2) @@ -221,6 +252,7 @@ class TransportCase(unittest.TestCase): break time.sleep(0.1) self.assertEqual(m.payload, {"basic.get": "this"}) + self.purge([queue.name]) chan2.close() def test_cyclic_reference_transport(self): -- cgit v1.2.1 From 70e2c6b68cd8151c285e45a9121113dd98580d7e Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Tue, 7 Jun 2011 15:46:11 +0100 Subject: PEP8ify --- funtests/transport.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'funtests') diff --git a/funtests/transport.py b/funtests/transport.py index add18647..765e9d41 100644 --- a/funtests/transport.py +++ b/funtests/transport.py @@ -10,7 +10,7 @@ import weakref from nose import SkipTest from kombu import BrokerConnection -from kombu import Producer, Consumer, Exchange, Queue +from kombu import Exchange, Queue from kombu.tests.utils import skip_if_quick if sys.version_info >= (2, 5): -- cgit v1.2.1