summaryrefslogtreecommitdiff
path: root/tests/test_functional.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_functional.py')
-rw-r--r--tests/test_functional.py189
1 files changed, 189 insertions, 0 deletions
diff --git a/tests/test_functional.py b/tests/test_functional.py
new file mode 100644
index 0000000..03d6276
--- /dev/null
+++ b/tests/test_functional.py
@@ -0,0 +1,189 @@
+from __future__ import absolute_import
+
+from six.moves import xrange
+
+import socket
+import unittest
+
+from librabbitmq import Message, Connection, ConnectionError, ChannelError
+TEST_QUEUE = 'pyrabbit.testq'
+
+
+class test_Channel(unittest.TestCase):
+
+ def setUp(self):
+ self.connection = Connection(host='localhost:5672', userid='guest',
+ password='guest', virtual_host='/')
+ self.channel = self.connection.channel()
+ self.channel.queue_delete(TEST_QUEUE)
+ self._queue_declare()
+
+ def test_send_message(self):
+ message = Message(
+ channel=self.channel,
+ body='the quick brown fox jumps over the lazy dog',
+ properties=dict(content_type='application/json',
+ content_encoding='utf-8'))
+ self.channel.basic_publish(message, TEST_QUEUE, TEST_QUEUE)
+ self.channel.basic_publish(message, TEST_QUEUE, TEST_QUEUE)
+ self.channel.basic_publish(message, TEST_QUEUE, TEST_QUEUE)
+ self.assertGreater(self.channel.queue_purge(TEST_QUEUE), 2)
+ self.channel.basic_publish(message, TEST_QUEUE, TEST_QUEUE)
+ self.channel.basic_publish(message, TEST_QUEUE, TEST_QUEUE)
+ self.channel.basic_publish(message, TEST_QUEUE, TEST_QUEUE)
+
+ def _queue_declare(self):
+ self.channel.exchange_declare(TEST_QUEUE, 'direct')
+ x = self.channel.queue_declare(TEST_QUEUE)
+ self.assertEqual(x.message_count, x[1])
+ self.assertEqual(x.consumer_count, x[2])
+ self.assertEqual(x.queue, TEST_QUEUE)
+ self.channel.queue_bind(TEST_QUEUE, TEST_QUEUE, TEST_QUEUE)
+
+ def test_basic_get_ack(self):
+ message = Message(
+ channel=self.channel,
+ body='the quick brown fox jumps over the lazy dog',
+ properties=dict(content_type='application/json',
+ content_encoding='utf-8'))
+ self.channel.basic_publish(message, TEST_QUEUE, TEST_QUEUE)
+ self.channel.basic_publish(message, TEST_QUEUE, TEST_QUEUE)
+ self.channel.basic_publish(message, TEST_QUEUE, TEST_QUEUE)
+ while True:
+ x = self.channel.basic_get(TEST_QUEUE)
+ if x:
+ break
+ self.assertIs(self.channel, x.channel)
+ self.assertIn('message_count', x.delivery_info)
+ self.assertIn('redelivered', x.delivery_info)
+ self.assertEqual(x.delivery_info['routing_key'], TEST_QUEUE)
+ self.assertEqual(x.delivery_info['exchange'], TEST_QUEUE)
+ self.assertTrue(x.delivery_info['delivery_tag'])
+ self.assertTrue(x.properties['content_type'])
+ self.assertTrue(x.body)
+ x.ack()
+
+ def test_timeout_burst(self):
+ """Check that if we have a large burst of messages in our queue
+ that we can fetch them with a timeout without needing to receive
+ any more messages."""
+
+ message = Message(
+ channel=self.channel,
+ body='the quick brown fox jumps over the lazy dog',
+ properties=dict(content_type='application/json',
+ content_encoding='utf-8'))
+
+ for i in xrange(100):
+ self.channel.basic_publish(message, TEST_QUEUE, TEST_QUEUE)
+
+ messages = []
+
+ def cb(x):
+ messages.append(x)
+ x.ack()
+
+ self.channel.basic_consume(TEST_QUEUE, callback=cb)
+ for i in xrange(100):
+ self.connection.drain_events(timeout=0.2)
+
+ self.assertEqual(len(messages), 100)
+
+ def test_timeout(self):
+ """Check that our ``drain_events`` call actually times out if
+ there are no messages."""
+ message = Message(
+ channel=self.channel,
+ body='the quick brown fox jumps over the lazy dog',
+ properties=dict(content_type='application/json',
+ content_encoding='utf-8'))
+
+ self.channel.basic_publish(message, TEST_QUEUE, TEST_QUEUE)
+
+ messages = []
+
+ def cb(x):
+ messages.append(x)
+ x.ack()
+
+ self.channel.basic_consume(TEST_QUEUE, callback=cb)
+ self.connection.drain_events(timeout=0.1)
+
+ with self.assertRaises(socket.timeout):
+ self.connection.drain_events(timeout=0.1)
+ self.assertEqual(len(messages), 1)
+
+ def tearDown(self):
+ if self.channel and self.connection.connected:
+ self.channel.queue_purge(TEST_QUEUE)
+ self.channel.close()
+ if self.connection:
+ try:
+ self.connection.close()
+ except ConnectionError:
+ pass
+
+
+class test_Delete(unittest.TestCase):
+
+ def setUp(self):
+ self.connection = Connection(host='localhost:5672', userid='guest',
+ password='guest', virtual_host='/')
+ self.channel = self.connection.channel()
+ self.TEST_QUEUE = 'pyrabbitmq.testq2'
+ self.channel.queue_delete(self.TEST_QUEUE)
+
+ def test_delete(self):
+ """Test that we can declare a channel delete it, and then declare with
+ different properties"""
+
+ self.channel.exchange_declare(self.TEST_QUEUE, 'direct')
+ self.channel.queue_declare(self.TEST_QUEUE)
+ self.channel.queue_bind(
+ self.TEST_QUEUE, self.TEST_QUEUE, self.TEST_QUEUE,
+ )
+
+ # Delete the queue
+ self.channel.queue_delete(self.TEST_QUEUE)
+
+ # Declare it again
+ x = self.channel.queue_declare(self.TEST_QUEUE, durable=True)
+ self.assertEqual(x.queue, self.TEST_QUEUE)
+
+ self.channel.queue_delete(self.TEST_QUEUE)
+
+ def test_delete_empty(self):
+ """Test that the queue doesn't get deleted if it is not empty"""
+ self.channel.exchange_declare(self.TEST_QUEUE, 'direct')
+ self.channel.queue_declare(self.TEST_QUEUE)
+ self.channel.queue_bind(self.TEST_QUEUE, self.TEST_QUEUE,
+ self.TEST_QUEUE)
+
+ message = Message(
+ channel=self.channel,
+ body='the quick brown fox jumps over the lazy dog',
+ properties=dict(content_type='application/json',
+ content_encoding='utf-8'))
+
+ self.channel.basic_publish(message, self.TEST_QUEUE, self.TEST_QUEUE)
+
+ with self.assertRaises(ChannelError):
+ self.channel.queue_delete(self.TEST_QUEUE, if_empty=True)
+
+ # We need to make a new channel after a ChannelError
+ self.channel = self.connection.channel()
+
+ x = self.channel.basic_get(self.TEST_QUEUE)
+ self.assertTrue(x.body)
+
+ self.channel.queue_delete(self.TEST_QUEUE, if_empty=True)
+
+ def tearDown(self):
+ if self.channel and self.connection.connected:
+ self.channel.queue_purge(TEST_QUEUE)
+ self.channel.close()
+ if self.connection:
+ try:
+ self.connection.close()
+ except ConnectionError:
+ pass