diff options
author | Gordon Sim <gsim@apache.org> | 2006-10-10 10:06:36 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-10-10 10:06:36 +0000 |
commit | 3a0eda85d85b3185d3fec6dec6700c6ca1fe3818 (patch) | |
tree | d4671a2fbb8ad69a0cc734134b43b28152c3e5b4 /python/tests/basic.py | |
parent | 14654e5360b72adf1704838b3820c7d1fc860e8e (diff) | |
download | qpid-python-3a0eda85d85b3185d3fec6dec6700c6ca1fe3818.tar.gz |
Implementation and tests for basic_qos (i.e. prefetching)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@454677 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/tests/basic.py')
-rw-r--r-- | python/tests/basic.py | 97 |
1 files changed, 94 insertions, 3 deletions
diff --git a/python/tests/basic.py b/python/tests/basic.py index 9ffcd11b95..50004614eb 100644 --- a/python/tests/basic.py +++ b/python/tests/basic.py @@ -113,7 +113,7 @@ class BasicTests(TestBase): except Closed, e: self.assertConnectionException(530, e.args[0]) - def test_basic_cancel(self): + def test_cancel(self): """ Test compliance of the basic.cancel method """ @@ -139,7 +139,7 @@ class BasicTests(TestBase): channel.basic_cancel(consumer_tag="this-never-existed") - def test_basic_ack(self): + def test_ack(self): """ Test basic ack/recover behaviour """ @@ -183,7 +183,7 @@ class BasicTests(TestBase): self.fail("Got unexpected message: " + extra.content.body) except Empty: None - def test_basic_recover_requeue(self): + def test_recover_requeue(self): """ Test requeing on recovery """ @@ -238,3 +238,94 @@ class BasicTests(TestBase): self.fail("Got unexpected message in original queue: " + extra.content.body) except Empty: None + + def test_qos_prefetch_count(self): + """ + Test that the prefetch count specified is honoured + """ + #setup: declare queue and subscribe + channel = self.channel + channel.queue_declare(queue="test-prefetch-count", exclusive=True) + subscription = channel.basic_consume(queue="test-prefetch-count", no_ack=False) + queue = self.client.queue(subscription.consumer_tag) + + #set prefetch to 5: + channel.basic_qos(prefetch_count=5) + + #publish 10 messages: + for i in range(1, 11): + channel.basic_publish(routing_key="test-prefetch-count", content=Content("Message %d" % i)) + + #only 5 messages should have been delivered: + for i in range(1, 6): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 6th message in original queue: " + extra.content.body) + except Empty: None + + #ack messages and check that the next set arrive ok: + channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + + for i in range(6, 11): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 11th message in original queue: " + extra.content.body) + except Empty: None + + + + def test_qos_prefetch_size(self): + """ + Test that the prefetch size specified is honoured + """ + #setup: declare queue and subscribe + channel = self.channel + channel.queue_declare(queue="test-prefetch-size", exclusive=True) + subscription = channel.basic_consume(queue="test-prefetch-size", no_ack=False) + queue = self.client.queue(subscription.consumer_tag) + + #set prefetch to 50 bytes (each message is 9 or 10 bytes): + channel.basic_qos(prefetch_size=50) + + #publish 10 messages: + for i in range(1, 11): + channel.basic_publish(routing_key="test-prefetch-size", content=Content("Message %d" % i)) + + #only 5 messages should have been delivered (i.e. 45 bytes worth): + for i in range(1, 6): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 6th message in original queue: " + extra.content.body) + except Empty: None + + #ack messages and check that the next set arrive ok: + channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + + for i in range(6, 11): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 11th message in original queue: " + extra.content.body) + except Empty: None + + #make sure that a single oversized message still gets delivered + large = "abcdefghijklmnopqrstuvwxyz" + large = large + "-" + large; + channel.basic_publish(routing_key="test-prefetch-size", content=Content(large)) + msg = queue.get(timeout=1) + self.assertEqual(large, msg.content.body) |