summaryrefslogtreecommitdiff
path: root/python/tests/basic.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/tests/basic.py')
-rw-r--r--python/tests/basic.py115
1 files changed, 115 insertions, 0 deletions
diff --git a/python/tests/basic.py b/python/tests/basic.py
new file mode 100644
index 0000000000..b912fc40be
--- /dev/null
+++ b/python/tests/basic.py
@@ -0,0 +1,115 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class BasicTests(TestBase):
+ """Tests for 'methods' on the amqp basic 'class'"""
+
+ def test_consume_no_local(self):
+ """
+ Test that the no_local flag is honoured in the consume method
+ """
+ channel = self.channel
+ #setup, declare two queues:
+ channel.queue_declare(queue="test-queue-1a", exclusive=True)
+ channel.queue_declare(queue="test-queue-1b", exclusive=True)
+ #establish two consumers one of which excludes delivery of locally sent messages
+ channel.basic_consume(consumer_tag="local_included", queue="test-queue-1a")
+ channel.basic_consume(consumer_tag="local_excluded", queue="test-queue-1b", no_local=True)
+
+ #send a message
+ channel.basic_publish(exchange="amq.direct", routing_key="test-queue-1a", content=Content("consume_no_local"))
+ channel.basic_publish(exchange="amq.direct", routing_key="test-queue-1b", content=Content("consume_no_local"))
+
+ #check the queues of the two consumers
+ excluded = self.client.queue("local_excluded")
+ included = self.client.queue("local_included")
+ msg = included.get(timeout=1)
+ self.assertEqual("consume_no_local", msg.content.body)
+ try:
+ excluded.get(timeout=1)
+ self.fail("Received locally published message though no_local=true")
+ except Empty: None
+
+
+ def test_consume_exclusive(self):
+ """
+ Test that the exclusive flag is honoured in the consume method
+ """
+ channel = self.channel
+ #setup, declare a queue:
+ channel.queue_declare(queue="test-queue-2", exclusive=True)
+
+ #check that an exclusive consumer prevents other consumer being created:
+ channel.basic_consume(consumer_tag="first", queue="test-queue-2", exclusive=True)
+ try:
+ channel.basic_consume(consumer_tag="second", queue="test-queue-2")
+ self.fail("Expected consume request to fail due to previous exclusive consumer")
+ except Closed, e:
+ self.assertChannelException(403, e.args[0])
+
+ #open new channel and cleanup last consumer:
+ channel = self.client.channel(2)
+ channel.channel_open()
+
+ #check that an exclusive consumer cannot be created if a consumer already exists:
+ channel.basic_consume(consumer_tag="first", queue="test-queue-2")
+ try:
+ channel.basic_consume(consumer_tag="second", queue="test-queue-2", exclusive=True)
+ self.fail("Expected exclusive consume request to fail due to previous consumer")
+ except Closed, e:
+ self.assertChannelException(403, e.args[0])
+
+ def test_consume_queue_errors(self):
+ """
+ Test error conditions associated with the queue field of the consume method:
+ """
+ channel = self.channel
+ try:
+ #queue specified but doesn't exist:
+ channel.basic_consume(queue="invalid-queue")
+ self.fail("Expected failure when consuming from non-existent queue")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+ channel = self.client.channel(2)
+ channel.channel_open()
+ try:
+ #queue not specified and none previously declared for channel:
+ channel.basic_consume(queue="")
+ self.fail("Expected failure when consuming from unspecified queue")
+ except Closed, e:
+ self.assertConnectionException(530, e.args[0])
+
+ def test_consume_unique_consumers(self):
+ """
+ Ensure unique consumer tags are enforced
+ """
+ channel = self.channel
+ #setup, declare a queue:
+ channel.queue_declare(queue="test-queue-3", exclusive=True)
+
+ #check that attempts to use duplicate tags are detected and prevented:
+ channel.basic_consume(consumer_tag="first", queue="test-queue-3")
+ try:
+ channel.basic_consume(consumer_tag="first", queue="test-queue-3")
+ self.fail("Expected consume request to fail due to non-unique tag")
+ except Closed, e:
+ self.assertConnectionException(530, e.args[0])
+