# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # from qpid.client import Client, Closed from qpid.queue import Empty from qpid.content import Content from qpid.testlib import testrunner, TestBase class QueueTests(TestBase): """Tests for 'methods' on the amqp queue 'class'""" def test_purge(self): """ Test that the purge method removes messages from the queue """ channel = self.channel #setup, declare a queue and add some messages to it: channel.exchange_declare(exchange="test-exchange", type="direct") channel.queue_declare(queue="test-queue", exclusive=True) channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") channel.message_transfer(destination="test-exchange", routing_key="key", body="one") channel.message_transfer(destination="test-exchange", routing_key="key", body="two") channel.message_transfer(destination="test-exchange", routing_key="key", body="three") #check that the queue now reports 3 messages: reply = channel.queue_declare(queue="test-queue") self.assertEqual(3, reply.message_count) #now do the purge, then test that three messages are purged and the count drops to 0 reply = channel.queue_purge(queue="test-queue"); self.assertEqual(3, reply.message_count) reply = channel.queue_declare(queue="test-queue") self.assertEqual(0, reply.message_count) #send a further message and consume it, ensuring that the other messages are really gone channel.message_transfer(destination="test-exchange", routing_key="key", body="four") channel.message_consume(queue="test-queue", destination="tag", no_ack=True) queue = self.client.queue("tag") msg = queue.get(timeout=1) self.assertEqual("four", msg.body) #check error conditions (use new channels): channel = self.client.channel(2) channel.channel_open() try: #queue specified but doesn't exist: channel.queue_purge(queue="invalid-queue") self.fail("Expected failure when purging non-existent queue") except Closed, e: self.assertChannelException(404, e.args[0]) channel = self.client.channel(3) channel.channel_open() try: #queue not specified and none previously declared for channel: channel.queue_purge() self.fail("Expected failure when purging unspecified queue") except Closed, e: self.assertConnectionException(530, e.args[0]) #cleanup other = self.connect() channel = other.channel(1) channel.channel_open() channel.exchange_delete(exchange="test-exchange") def test_declare_exclusive(self): """ Test that the exclusive field is honoured in queue.declare """ # TestBase.setUp has already opened channel(1) c1 = self.channel # Here we open a second separate connection: other = self.connect() c2 = other.channel(1) c2.channel_open() #declare an exclusive queue: c1.queue_declare(queue="exclusive-queue", exclusive="True") try: #other connection should not be allowed to declare this: c2.queue_declare(queue="exclusive-queue", exclusive="True") self.fail("Expected second exclusive queue_declare to raise a channel exception") except Closed, e: self.assertChannelException(405, e.args[0]) def test_declare_passive(self): """ Test that the passive field is honoured in queue.declare """ channel = self.channel #declare an exclusive queue: channel.queue_declare(queue="passive-queue-1", exclusive="True") channel.queue_declare(queue="passive-queue-1", passive="True") try: #other connection should not be allowed to declare this: channel.queue_declare(queue="passive-queue-2", passive="True") self.fail("Expected passive declaration of non-existant queue to raise a channel exception") except Closed, e: self.assertChannelException(404, e.args[0]) def test_bind(self): """ Test various permutations of the queue.bind method """ channel = self.channel channel.queue_declare(queue="queue-1", exclusive="True") #straightforward case, both exchange & queue exist so no errors expected: channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1") #bind the default queue for the channel (i.e. last one declared): channel.queue_bind(exchange="amq.direct", routing_key="key2") #use the queue name where neither routing key nor queue are specified: channel.queue_bind(exchange="amq.direct") #try and bind to non-existant exchange try: channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1") self.fail("Expected bind to non-existant exchange to fail") except Closed, e: self.assertChannelException(404, e.args[0]) #need to reopen a channel: channel = self.client.channel(2) channel.channel_open() #try and bind non-existant queue: try: channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1") self.fail("Expected bind of non-existant queue to fail") except Closed, e: self.assertChannelException(404, e.args[0]) def test_unbind_direct(self): self.unbind_test(exchange="amq.direct", routing_key="key") def test_unbind_topic(self): self.unbind_test(exchange="amq.topic", routing_key="key") def test_unbind_fanout(self): self.unbind_test(exchange="amq.fanout") def test_unbind_headers(self): self.unbind_test(exchange="amq.match", args={ "x-match":"all", "a":"b"}, headers={"a":"b"}) def unbind_test(self, exchange, routing_key="", args=None, headers={}): #bind two queues and consume from them channel = self.channel channel.queue_declare(queue="queue-1", exclusive="True") channel.queue_declare(queue="queue-2", exclusive="True") channel.message_consume(queue="queue-1", destination="queue-1", no_ack=True) channel.message_consume(queue="queue-2", destination="queue-2", no_ack=True) queue1 = self.client.queue("queue-1") queue2 = self.client.queue("queue-2") channel.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) channel.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args) #send a message that will match both bindings channel.message_transfer(destination=exchange, routing_key=routing_key, application_headers=headers, body="one") #unbind first queue channel.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) #send another message channel.message_transfer(destination=exchange, routing_key=routing_key, application_headers=headers, body="two") #check one queue has both messages and the other has only one self.assertEquals("one", queue1.get(timeout=1).body) try: msg = queue1.get(timeout=1) self.fail("Got extra message: %s" % msg.body) except Empty: pass self.assertEquals("one", queue2.get(timeout=1).body) self.assertEquals("two", queue2.get(timeout=1).body) try: msg = queue2.get(timeout=1) self.fail("Got extra message: " + msg) except Empty: pass def test_delete_simple(self): """ Test core queue deletion behaviour """ channel = self.channel #straight-forward case: channel.queue_declare(queue="delete-me") channel.message_transfer(routing_key="delete-me", body="a") channel.message_transfer(routing_key="delete-me", body="b") channel.message_transfer(routing_key="delete-me", body="c") reply = channel.queue_delete(queue="delete-me") self.assertEqual(3, reply.message_count) #check that it has gone be declaring passively try: channel.queue_declare(queue="delete-me", passive="True") self.fail("Queue has not been deleted") except Closed, e: self.assertChannelException(404, e.args[0]) #check attempted deletion of non-existant queue is handled correctly: channel = self.client.channel(2) channel.channel_open() try: channel.queue_delete(queue="i-dont-exist", if_empty="True") self.fail("Expected delete of non-existant queue to fail") except Closed, e: self.assertChannelException(404, e.args[0]) def test_delete_ifempty(self): """ Test that if_empty field of queue_delete is honoured """ channel = self.channel #create a queue and add a message to it (use default binding): channel.queue_declare(queue="delete-me-2") channel.queue_declare(queue="delete-me-2", passive="True") channel.message_transfer(routing_key="delete-me-2", body="message") #try to delete, but only if empty: try: channel.queue_delete(queue="delete-me-2", if_empty="True") self.fail("Expected delete if_empty to fail for non-empty queue") except Closed, e: self.assertChannelException(406, e.args[0]) #need new channel now: channel = self.client.channel(2) channel.channel_open() #empty queue: channel.message_consume(destination="consumer_tag", queue="delete-me-2", no_ack=True) queue = self.client.queue("consumer_tag") msg = queue.get(timeout=1) self.assertEqual("message", msg.body) channel.message_cancel(destination="consumer_tag") #retry deletion on empty queue: channel.queue_delete(queue="delete-me-2", if_empty="True") #check that it has gone by declaring passively: try: channel.queue_declare(queue="delete-me-2", passive="True") self.fail("Queue has not been deleted") except Closed, e: self.assertChannelException(404, e.args[0]) def test_delete_ifunused(self): """ Test that if_unused field of queue_delete is honoured """ channel = self.channel #create a queue and register a consumer: channel.queue_declare(queue="delete-me-3") channel.queue_declare(queue="delete-me-3", passive="True") channel.message_consume(destination="consumer_tag", queue="delete-me-3", no_ack=True) #need new channel now: channel2 = self.client.channel(2) channel2.channel_open() #try to delete, but only if empty: try: channel2.queue_delete(queue="delete-me-3", if_unused="True") self.fail("Expected delete if_unused to fail for queue with existing consumer") except Closed, e: self.assertChannelException(406, e.args[0]) channel.message_cancel(destination="consumer_tag") channel.queue_delete(queue="delete-me-3", if_unused="True") #check that it has gone by declaring passively: try: channel.queue_declare(queue="delete-me-3", passive="True") self.fail("Queue has not been deleted") except Closed, e: self.assertChannelException(404, e.args[0])