diff options
author | Ask Solem <ask@celeryproject.org> | 2012-02-03 17:21:27 +0000 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2012-02-03 17:21:27 +0000 |
commit | 2b140266c5eaed0d53445867732b64bbafdb35e4 (patch) | |
tree | b3a92241a460b4f1c2d327cec58fe36c0f45dcb1 /funtests | |
parent | cfea2062b060c32cc035e3f025e5b96805738bd9 (diff) | |
download | kombu-2b140266c5eaed0d53445867732b64bbafdb35e4.tar.gz |
Moves MongoDB functional tests to funtests/
Diffstat (limited to 'funtests')
-rw-r--r-- | funtests/tests/test_mongodb.py | 61 |
1 files changed, 60 insertions, 1 deletions
diff --git a/funtests/tests/test_mongodb.py b/funtests/tests/test_mongodb.py index 8d6353c2..83932364 100644 --- a/funtests/tests/test_mongodb.py +++ b/funtests/tests/test_mongodb.py @@ -1,3 +1,6 @@ +from kombu import Consumer, Producer, Exchange, Queue +from kombu.utils import nested + from funtests import transport @@ -7,4 +10,60 @@ class test_mongodb(transport.TransportCase): event_loop_max = 100 def after_connect(self, connection): - connection.channel().client + connection.channel().client # evaluate connection. + + self.c = self.connection # shortcut + + def test_fanout(self, name="test_mongodb_fanout"): + c = self.connection + e = Exchange(name, type="fanout") + q = Queue(name, exchange=e, routing_key=name) + q2 = Queue(name + "2", exchange=e, routing_key=name + "2") + + channel = c.default_channel + producer = Producer(channel, e) + consumer1 = Consumer(channel, q) + consumer2 = Consumer(channel, q2) + self.q2(channel).declare() + + for i in xrange(10): + producer.publish({"foo": i}, routing_key=name) + for i in xrange(10): + producer.publish({"foo": i}, routing_key=name + "2") + + _received1 = [] + _received2 = [] + + def callback1(message_data, message): + _received1.append(message) + message.ack() + + def callback2(message_data, message): + _received2.append(message) + message.ack() + + consumer1.register_callback(callback1) + consumer2.register_callback(callback2) + + with nested(consumer1, consumer2): + + while 1: + if len(_received1) + len(_received2) == 20: + break + c.drain_events(timeout=60) + self.assertEqual(len(_received1) + len(_received2), 20) + + # queue.delete + for i in xrange(10): + producer.publish({"foo": i}, routing_key=name) + self.assertTrue(self.q(channel).get()) + self.q(channel).delete() + self.q(channel).declare() + self.assertIsNone(self.q(channel).get()) + + # queue.purge + for i in xrange(10): + producer.publish({"foo": i}, routing_key=name + "2") + self.assertTrue(self.q2(channel).get()) + self.q2(channel).purge() + self.assertIsNone(self.q2(channel).get()) |