summaryrefslogtreecommitdiff
path: root/funtests
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2012-02-03 17:21:27 +0000
committerAsk Solem <ask@celeryproject.org>2012-02-03 17:21:27 +0000
commit2b140266c5eaed0d53445867732b64bbafdb35e4 (patch)
treeb3a92241a460b4f1c2d327cec58fe36c0f45dcb1 /funtests
parentcfea2062b060c32cc035e3f025e5b96805738bd9 (diff)
downloadkombu-2b140266c5eaed0d53445867732b64bbafdb35e4.tar.gz
Moves MongoDB functional tests to funtests/
Diffstat (limited to 'funtests')
-rw-r--r--funtests/tests/test_mongodb.py61
1 files changed, 60 insertions, 1 deletions
diff --git a/funtests/tests/test_mongodb.py b/funtests/tests/test_mongodb.py
index 8d6353c2..83932364 100644
--- a/funtests/tests/test_mongodb.py
+++ b/funtests/tests/test_mongodb.py
@@ -1,3 +1,6 @@
+from kombu import Consumer, Producer, Exchange, Queue
+from kombu.utils import nested
+
from funtests import transport
@@ -7,4 +10,60 @@ class test_mongodb(transport.TransportCase):
event_loop_max = 100
def after_connect(self, connection):
- connection.channel().client
+ connection.channel().client # evaluate connection.
+
+ self.c = self.connection # shortcut
+
+ def test_fanout(self, name="test_mongodb_fanout"):
+ c = self.connection
+ e = Exchange(name, type="fanout")
+ q = Queue(name, exchange=e, routing_key=name)
+ q2 = Queue(name + "2", exchange=e, routing_key=name + "2")
+
+ channel = c.default_channel
+ producer = Producer(channel, e)
+ consumer1 = Consumer(channel, q)
+ consumer2 = Consumer(channel, q2)
+ self.q2(channel).declare()
+
+ for i in xrange(10):
+ producer.publish({"foo": i}, routing_key=name)
+ for i in xrange(10):
+ producer.publish({"foo": i}, routing_key=name + "2")
+
+ _received1 = []
+ _received2 = []
+
+ def callback1(message_data, message):
+ _received1.append(message)
+ message.ack()
+
+ def callback2(message_data, message):
+ _received2.append(message)
+ message.ack()
+
+ consumer1.register_callback(callback1)
+ consumer2.register_callback(callback2)
+
+ with nested(consumer1, consumer2):
+
+ while 1:
+ if len(_received1) + len(_received2) == 20:
+ break
+ c.drain_events(timeout=60)
+ self.assertEqual(len(_received1) + len(_received2), 20)
+
+ # queue.delete
+ for i in xrange(10):
+ producer.publish({"foo": i}, routing_key=name)
+ self.assertTrue(self.q(channel).get())
+ self.q(channel).delete()
+ self.q(channel).declare()
+ self.assertIsNone(self.q(channel).get())
+
+ # queue.purge
+ for i in xrange(10):
+ producer.publish({"foo": i}, routing_key=name + "2")
+ self.assertTrue(self.q2(channel).get())
+ self.q2(channel).purge()
+ self.assertIsNone(self.q2(channel).get())