summaryrefslogtreecommitdiff
path: root/funtests
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2011-06-13 12:16:26 +0100
committerAsk Solem <ask@celeryproject.org>2011-06-13 12:16:26 +0100
commitc83caeb1afafa793e8c597cef904f446412925b9 (patch)
treed807304ab4cd2186a1ec7a5563f81b6c675775d8 /funtests
parent91bc3eb946b13a8be83e56e376529df782e49ab3 (diff)
parenta9bd9ebc8b97cdbf155bdde3e13cb859834ddc23 (diff)
downloadkombu-c83caeb1afafa793e8c597cef904f446412925b9.tar.gz
Merge branch 'master' into SQS-SDB
Diffstat (limited to 'funtests')
-rw-r--r--funtests/tests/test_SQS.py1
-rw-r--r--funtests/transport.py60
2 files changed, 47 insertions, 14 deletions
diff --git a/funtests/tests/test_SQS.py b/funtests/tests/test_SQS.py
index f178a719..c063a161 100644
--- a/funtests/tests/test_SQS.py
+++ b/funtests/tests/test_SQS.py
@@ -10,6 +10,7 @@ class test_SQS(transport.TransportCase):
prefix = "sqs"
event_loop_max = 100
message_size_limit = 4192 # SQS max body size / 2.
+ reliable_purge = False
suppress_disorder_warning = True # does not guarantee FIFO order,
# even in simple cases.
diff --git a/funtests/transport.py b/funtests/transport.py
index ced4cca1..765e9d41 100644
--- a/funtests/transport.py
+++ b/funtests/transport.py
@@ -10,7 +10,7 @@ import weakref
from nose import SkipTest
from kombu import BrokerConnection
-from kombu import Producer, Consumer, Exchange, Queue
+from kombu import Exchange, Queue
from kombu.tests.utils import skip_if_quick
if sys.version_info >= (2, 5):
@@ -62,6 +62,7 @@ class TransportCase(unittest.TestCase):
event_loop_max = 100
connection_options = {}
suppress_disorder_warning = False
+ reliable_purge = True
connected = False
skip_test_reason = None
@@ -87,7 +88,16 @@ class TransportCase(unittest.TestCase):
def purge(self, names):
chan = self.connection.channel()
- map(chan.queue_purge, names)
+ total = 0
+ for queue in names:
+ while 1:
+ # ensure the queue is completly empty
+ purged = chan.queue_purge(queue=queue)
+ if not purged:
+ break
+ total += purged
+ chan.close()
+ return total
def get_connection(self, **options):
if self.userid:
@@ -113,19 +123,40 @@ class TransportCase(unittest.TestCase):
raise SkipTest(self.skip_test_reason)
return True
+ def purge_consumer(self, consumer):
+ return self.purge([queue.name for queue in consumer.queues])
+
def test_produce__consume(self):
if not self.verify_alive():
return
chan1 = self.connection.channel()
- consumer = Consumer(chan1, self.queue)
- consumer.queues[0].purge()
- producer = Producer(chan1, self.exchange)
+ consumer = chan1.Consumer(self.queue)
+ self.purge_consumer(consumer)
+ producer = chan1.Producer(self.exchange)
producer.publish({"foo": "bar"}, routing_key=self.prefix)
message = consumeN(self.connection, consumer)
self.assertDictEqual(message[0], {"foo": "bar"})
chan1.close()
self.purge([self.queue.name])
+ def test_purge(self):
+ if not self.verify_alive():
+ return
+ chan1 = self.connection.channel()
+ consumer = chan1.Consumer(self.queue)
+ self.purge_consumer(consumer)
+
+ producer = chan1.Producer(self.exchange)
+ for i in xrange(10):
+ producer.publish({"foo": "bar"}, routing_key=self.prefix)
+ if self.reliable_purge:
+ self.assertEqual(consumer.purge(), 10)
+ self.assertEqual(consumer.purge(), 0)
+ else:
+ purged = 0
+ while purged < 9:
+ purged += self.purge_consumer(consumer)
+
def _digest(self, data):
return _digest(data).hexdigest()
@@ -140,10 +171,9 @@ class TransportCase(unittest.TestCase):
for i in xrange(n)]
digests = []
chan1 = self.connection.channel()
- consumer = Consumer(chan1, self.queue)
- for queue in consumer.queues:
- queue.purge()
- producer = Producer(chan1, self.exchange)
+ consumer = chan1.Consumer(self.queue)
+ self.purge_consumer(consumer)
+ producer = chan1.Producer(self.exchange)
for i, message in enumerate(messages):
producer.publish({"text": message,
"i": i}, routing_key=self.prefix)
@@ -174,12 +204,12 @@ class TransportCase(unittest.TestCase):
if not self.verify_alive():
return
chan1 = self.connection.channel()
- producer = Producer(chan1, self.exchange)
+ producer = chan1.Producer(self.exchange)
b1 = Queue(self.P("b1"), self.exchange, "b1")(chan1)
b2 = Queue(self.P("b2"), self.exchange, "b2")(chan1)
b3 = Queue(self.P("b3"), self.exchange, "b3")(chan1)
[q.declare() for q in (b1, b2, b3)]
- [q.purge() for q in (b1, b2, b3)]
+ self.purge([b1.name, b2.name, b3.name])
producer.publish("b1", routing_key="b1")
producer.publish("b2", routing_key="b2")
@@ -187,7 +217,7 @@ class TransportCase(unittest.TestCase):
chan1.close()
chan2 = self.connection.channel()
- consumer = Consumer(chan2, [b1, b2, b3])
+ consumer = chan2.Consumer([b1, b2, b3])
messages = consumeN(self.connection, consumer, 3)
self.assertItemsEqual(messages, ["b1", "b2", "b3"])
chan2.close()
@@ -198,16 +228,17 @@ class TransportCase(unittest.TestCase):
return
chan = self.connection.channel()
self.purge([self.queue.name])
- consumer = Consumer(chan, self.queue)
+ consumer = chan.Consumer(self.queue)
self.assertRaises(socket.timeout, self.connection.drain_events,
timeout=0.3)
consumer.cancel()
+ chan.close()
def test_basic_get(self):
if not self.verify_alive():
return
chan1 = self.connection.channel()
- producer = Producer(chan1, self.exchange)
+ producer = chan1.Producer(self.exchange)
chan2 = self.connection.channel()
queue = Queue(self.P("basic_get"), self.exchange, "basic_get")
queue = queue(chan2)
@@ -221,6 +252,7 @@ class TransportCase(unittest.TestCase):
break
time.sleep(0.1)
self.assertEqual(m.payload, {"basic.get": "this"})
+ self.purge([queue.name])
chan2.close()
def test_cyclic_reference_transport(self):