summaryrefslogtreecommitdiff
path: root/python/tests/basic.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/tests/basic.py')
-rw-r--r--python/tests/basic.py97
1 files changed, 94 insertions, 3 deletions
diff --git a/python/tests/basic.py b/python/tests/basic.py
index 9ffcd11b95..50004614eb 100644
--- a/python/tests/basic.py
+++ b/python/tests/basic.py
@@ -113,7 +113,7 @@ class BasicTests(TestBase):
except Closed, e:
self.assertConnectionException(530, e.args[0])
- def test_basic_cancel(self):
+ def test_cancel(self):
"""
Test compliance of the basic.cancel method
"""
@@ -139,7 +139,7 @@ class BasicTests(TestBase):
channel.basic_cancel(consumer_tag="this-never-existed")
- def test_basic_ack(self):
+ def test_ack(self):
"""
Test basic ack/recover behaviour
"""
@@ -183,7 +183,7 @@ class BasicTests(TestBase):
self.fail("Got unexpected message: " + extra.content.body)
except Empty: None
- def test_basic_recover_requeue(self):
+ def test_recover_requeue(self):
"""
Test requeing on recovery
"""
@@ -238,3 +238,94 @@ class BasicTests(TestBase):
self.fail("Got unexpected message in original queue: " + extra.content.body)
except Empty: None
+
+ def test_qos_prefetch_count(self):
+ """
+ Test that the prefetch count specified is honoured
+ """
+ #setup: declare queue and subscribe
+ channel = self.channel
+ channel.queue_declare(queue="test-prefetch-count", exclusive=True)
+ subscription = channel.basic_consume(queue="test-prefetch-count", no_ack=False)
+ queue = self.client.queue(subscription.consumer_tag)
+
+ #set prefetch to 5:
+ channel.basic_qos(prefetch_count=5)
+
+ #publish 10 messages:
+ for i in range(1, 11):
+ channel.basic_publish(routing_key="test-prefetch-count", content=Content("Message %d" % i))
+
+ #only 5 messages should have been delivered:
+ for i in range(1, 6):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
+ except Empty: None
+
+ #ack messages and check that the next set arrive ok:
+ channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+ for i in range(6, 11):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
+ except Empty: None
+
+
+
+ def test_qos_prefetch_size(self):
+ """
+ Test that the prefetch size specified is honoured
+ """
+ #setup: declare queue and subscribe
+ channel = self.channel
+ channel.queue_declare(queue="test-prefetch-size", exclusive=True)
+ subscription = channel.basic_consume(queue="test-prefetch-size", no_ack=False)
+ queue = self.client.queue(subscription.consumer_tag)
+
+ #set prefetch to 50 bytes (each message is 9 or 10 bytes):
+ channel.basic_qos(prefetch_size=50)
+
+ #publish 10 messages:
+ for i in range(1, 11):
+ channel.basic_publish(routing_key="test-prefetch-size", content=Content("Message %d" % i))
+
+ #only 5 messages should have been delivered (i.e. 45 bytes worth):
+ for i in range(1, 6):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
+ except Empty: None
+
+ #ack messages and check that the next set arrive ok:
+ channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+ for i in range(6, 11):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
+ except Empty: None
+
+ #make sure that a single oversized message still gets delivered
+ large = "abcdefghijklmnopqrstuvwxyz"
+ large = large + "-" + large;
+ channel.basic_publish(routing_key="test-prefetch-size", content=Content(large))
+ msg = queue.get(timeout=1)
+ self.assertEqual(large, msg.content.body)