diff options
| author | Alex Rudyy <orudyy@apache.org> | 2013-06-21 17:06:57 +0000 |
|---|---|---|
| committer | Alex Rudyy <orudyy@apache.org> | 2013-06-21 17:06:57 +0000 |
| commit | e409124b9f3a7423fe4ab04e7ce3e446244d04e3 (patch) | |
| tree | 95eb9be13518f19536314f7c0993fe40d84c70c9 /qpid/java/systests/src | |
| parent | 8bdb080ef1f4afb1727dc3fc5f2666bdfd982107 (diff) | |
| download | qpid-python-e409124b9f3a7423fe4ab04e7ce3e446244d04e3.tar.gz | |
QPID-4943: Introduce a feature for 0-8/0-9/0-9-1 protocols to close a connection on receiving a mandatory unroutable message in a transacted session
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1495511 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests/src')
8 files changed, 488 insertions, 80 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java index 8324ac74a5..00c85e80c8 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java @@ -19,6 +19,8 @@ package org.apache.qpid.server.security.acl; import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.URLSyntaxException; @@ -306,7 +308,11 @@ public class ExternalACLTest extends AbstractACLTestCase conn.start(); - MessageProducer sender = sess.createProducer(sess.createQueue("example.RequestQueue")); + Queue queue = sess.createQueue("example.RequestQueue"); + + ((AMQSession<?,?>)sess).declareAndBind((AMQDestination)queue); + + MessageProducer sender = sess.createProducer(queue); sender.send(sess.createTextMessage("test")); @@ -316,7 +322,6 @@ public class ExternalACLTest extends AbstractACLTestCase conn.close(); } - public void setUpRequestResponseSuccess() throws Exception { // The group "messaging-users", referenced in the ACL below, is currently defined diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java index 99ee99e8c5..cc662bddca 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java @@ -14,6 +14,7 @@ import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.management.common.mbeans.ManagedBroker; import org.apache.qpid.management.common.mbeans.ManagedExchange; +import org.apache.qpid.server.model.Broker; import org.apache.qpid.test.utils.JMXTestUtils; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -40,6 +41,9 @@ public class ExchangeManagementTest extends QpidBrokerTestCase { getBrokerConfiguration().addJmxManagementConfiguration(); + // to test exchange selectors the publishing of unroutable messages should be allowed + getBrokerConfiguration().setBrokerAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, false); + _jmxUtils = new JMXTestUtils(this); super.setUp(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java index 0d40eca745..1d2f6e3427 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java @@ -27,6 +27,11 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import javax.jms.Connection; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + import org.apache.qpid.common.QpidProperties; import org.apache.qpid.server.configuration.BrokerConfigurationStoreCreator; import org.apache.qpid.server.model.Broker; @@ -35,7 +40,7 @@ import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.MessageStoreCreator; -import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.test.client.UnroutableMessageTestExceptionListener; import org.apache.qpid.test.utils.TestBrokerConfiguration; public class BrokerRestTest extends QpidRestTestCase @@ -160,6 +165,33 @@ public class BrokerRestTest extends QpidRestTestCase assertEquals("Unexpected update response for flow resume size > flow size", 409, response); } + public void testSetCloseOnNoRoute() throws Exception + { + Map<String, Object> brokerDetails = getRestTestHelper().getJsonAsSingletonList("/rest/broker"); + assertTrue("closeOnNoRoute should be true", (Boolean)brokerDetails.get(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE)); + + Map<String, Object> brokerAttributes = new HashMap<String, Object>(); + brokerAttributes.put(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, false); + + int response = getRestTestHelper().submitRequest("/rest/broker", "PUT", brokerAttributes); + assertEquals("Unexpected update response", 200, response); + + brokerDetails = getRestTestHelper().getJsonAsSingletonList("/rest/broker"); + assertFalse("closeOnNoRoute should be false", (Boolean)brokerDetails.get(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE)); + + Connection connection = getConnection(); + UnroutableMessageTestExceptionListener exceptionListener = new UnroutableMessageTestExceptionListener(); + connection.setExceptionListener(exceptionListener); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(getTestQueue()); + TextMessage message = session.createTextMessage("Test"); + producer.send(message); + + session.commit(); + + exceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName()); + } + private Map<String, Object> getValidBrokerAttributes() { Map<String, Object> brokerAttributes = new HashMap<String, Object>(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CloseOnNoRouteForMandatoryMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CloseOnNoRouteForMandatoryMessageTest.java new file mode 100644 index 0000000000..a8a72c20fc --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CloseOnNoRouteForMandatoryMessageTest.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.test.client; + +import java.util.HashMap; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.naming.NamingException; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.url.URLSyntaxException; + +/** + * Tests the broker's connection-closing behaviour when it receives an unroutable message + * on a transactional session. + * + * @see ImmediateAndMandatoryPublishingTest for more general tests of mandatory and immediate publishing + */ +public class CloseOnNoRouteForMandatoryMessageTest extends QpidBrokerTestCase +{ + private Connection _connection; + private UnroutableMessageTestExceptionListener _testExceptionListener = new UnroutableMessageTestExceptionListener(); + + @Override + public void setUp() throws Exception + { + super.setUp(); + } + + public void testNoRoute_brokerClosesConnection() throws Exception + { + createConnectionWithCloseWhenNoRoute(true); + + Session transactedSession = _connection.createSession(true, Session.SESSION_TRANSACTED); + String testQueueName = getTestQueueName(); + MessageProducer mandatoryProducer = ((AMQSession<?, ?>) transactedSession).createProducer( + transactedSession.createQueue(testQueueName), + true, // mandatory + false); // immediate + + Message message = transactedSession.createMessage(); + mandatoryProducer.send(message); + try + { + transactedSession.commit(); + fail("Expected exception not thrown"); + } + catch(JMSException e) + { + _testExceptionListener.assertNoRoute(e, testQueueName); + } + _testExceptionListener.assertReceivedNoRoute(testQueueName); + + forgetConnection(_connection); + } + + public void testCloseOnNoRouteWhenExceptionMessageLengthIsGreater255() throws Exception + { + createConnectionWithCloseWhenNoRoute(true); + + AMQSession<?, ?> transactedSession = (AMQSession<?, ?>) _connection.createSession(true, Session.SESSION_TRANSACTED); + + StringBuilder longExchangeName = getLongExchangeName(); + + AMQShortString exchangeName = new AMQShortString(longExchangeName.toString()); + transactedSession.declareExchange(exchangeName, new AMQShortString("direct"), false); + + Destination testQueue = new AMQQueue(exchangeName, getTestQueueName()); + MessageProducer mandatoryProducer = transactedSession.createProducer( + testQueue, + true, // mandatory + false); // immediate + + Message message = transactedSession.createMessage(); + mandatoryProducer.send(message); + try + { + transactedSession.commit(); + fail("Expected exception not thrown"); + } + catch (JMSException e) + { + AMQException noRouteException = (AMQException) e.getLinkedException(); + assertNotNull("AMQException should be linked to JMSException", noRouteException); + + assertEquals(AMQConstant.NO_ROUTE, noRouteException.getErrorCode()); + String expectedMessage = "Error: No route for message [Exchange: " + longExchangeName.substring(0, 220) + "..."; + assertEquals("Unexpected exception message: " + noRouteException.getMessage(), expectedMessage, + noRouteException.getMessage()); + } + finally + { + forgetConnection(_connection); + } + } + + public void testNoRouteMessageReurnedWhenExceptionMessageLengthIsGreater255() throws Exception + { + createConnectionWithCloseWhenNoRoute(false); + + AMQSession<?, ?> transactedSession = (AMQSession<?, ?>) _connection.createSession(true, Session.SESSION_TRANSACTED); + + StringBuilder longExchangeName = getLongExchangeName(); + + AMQShortString exchangeName = new AMQShortString(longExchangeName.toString()); + transactedSession.declareExchange(exchangeName, new AMQShortString("direct"), false); + + AMQQueue testQueue = new AMQQueue(exchangeName, getTestQueueName()); + MessageProducer mandatoryProducer = transactedSession.createProducer( + testQueue, + true, // mandatory + false); // immediate + + Message message = transactedSession.createMessage(); + mandatoryProducer.send(message); + transactedSession.commit(); + _testExceptionListener.assertReceivedReturnedMessageWithLongExceptionMessage(message, testQueue); + } + + private StringBuilder getLongExchangeName() + { + StringBuilder longExchangeName = new StringBuilder(); + for (int i = 0; i < 50; i++) + { + longExchangeName.append("abcde"); + } + return longExchangeName; + } + + public void testNoRouteForNonMandatoryMessage_brokerKeepsConnectionOpenAndCallsExceptionListener() throws Exception + { + createConnectionWithCloseWhenNoRoute(true); + + Session transactedSession = _connection.createSession(true, Session.SESSION_TRANSACTED); + String testQueueName = getTestQueueName(); + MessageProducer nonMandatoryProducer = ((AMQSession<?, ?>) transactedSession).createProducer( + transactedSession.createQueue(testQueueName), + false, // mandatory + false); // immediate + + Message message = transactedSession.createMessage(); + nonMandatoryProducer.send(message); + + // should succeed - the message is simply discarded + transactedSession.commit(); + + _testExceptionListener.assertNoException(); + } + + + public void testNoRouteOnNonTransactionalSession_brokerKeepsConnectionOpenAndCallsExceptionListener() throws Exception + { + createConnectionWithCloseWhenNoRoute(true); + + Session nonTransactedSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String testQueueName = getTestQueueName(); + MessageProducer mandatoryProducer = ((AMQSession<?, ?>) nonTransactedSession).createProducer( + nonTransactedSession.createQueue(testQueueName), + true, // mandatory + false); // immediate + + Message message = nonTransactedSession.createMessage(); + mandatoryProducer.send(message); + + // should succeed - the message is asynchronously bounced back to the exception listener + message.acknowledge(); + + _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName()); + } + + public void testClientDisablesCloseOnNoRoute_brokerKeepsConnectionOpenAndCallsExceptionListener() throws Exception + { + createConnectionWithCloseWhenNoRoute(false); + + Session transactedSession = _connection.createSession(true, Session.SESSION_TRANSACTED); + String testQueueName = getTestQueueName(); + MessageProducer mandatoryProducer = ((AMQSession<?, ?>) transactedSession).createProducer( + transactedSession.createQueue(testQueueName), + true, // mandatory + false); // immediate + + Message message = transactedSession.createMessage(); + mandatoryProducer.send(message); + transactedSession.commit(); + _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName()); + } + + private void createConnectionWithCloseWhenNoRoute(boolean closeWhenNoRoute) throws URLSyntaxException, NamingException, JMSException + { + Map<String, String> options = new HashMap<String, String>(); + options.put(ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE, Boolean.toString(closeWhenNoRoute)); + _connection = getConnectionWithOptions(options); + _connection.setExceptionListener(_testExceptionListener); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java index b746a5b09e..d012b9abbb 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java @@ -18,34 +18,34 @@ */ package org.apache.qpid.test.client; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.Destination; -import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.Topic; -import org.apache.qpid.client.AMQNoConsumersException; -import org.apache.qpid.client.AMQNoRouteException; + import org.apache.qpid.client.AMQSession; +import org.apache.qpid.server.model.Broker; import org.apache.qpid.test.utils.QpidBrokerTestCase; -public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase implements ExceptionListener +/** + * @see CloseOnNoRouteForMandatoryMessageTest for related tests + */ +public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase { private Connection _connection; - private BlockingQueue<JMSException> _exceptions; + private UnroutableMessageTestExceptionListener _testExceptionListener = new UnroutableMessageTestExceptionListener(); + @Override public void setUp() throws Exception { + getBrokerConfiguration().setBrokerAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, false); super.setUp(); - _exceptions = new ArrayBlockingQueue<JMSException>(1); _connection = getConnection(); - _connection.setExceptionListener(this); + _connection.setExceptionListener(_testExceptionListener); } public void testPublishP2PWithNoConsumerAndImmediateOnAndAutoAck() throws Exception @@ -103,14 +103,7 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase impl consumerCreateAndClose(true, false); Message message = produceMessage(Session.AUTO_ACKNOWLEDGE, true, false, true); - - JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS); - assertNotNull("JMSException is expected", exception); - AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException(); - assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException); - Message bounceMessage = (Message) noRouteException.getUndeliveredMessage(); - assertNotNull("Bounced Message is expected", bounceMessage); - assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID()); + _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName()); } private void publishIntoExistingDestinationWithNoConsumerAndImmediateOn(int acknowledgeMode, boolean pubSub) @@ -120,27 +113,14 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase impl Message message = produceMessage(acknowledgeMode, pubSub, false, true); - JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS); - assertNotNull("JMSException is expected", exception); - AMQNoConsumersException noConsumerException = (AMQNoConsumersException) exception.getLinkedException(); - assertNotNull("AMQNoConsumersException should be linked to JMSEXception", noConsumerException); - Message bounceMessage = (Message) noConsumerException.getUndeliveredMessage(); - assertNotNull("Bounced Message is expected", bounceMessage); - assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID()); + _testExceptionListener.assertReceivedNoConsumersWithReturnedMessage(message); } private void publishWithMandatoryOnImmediateOff(int acknowledgeMode, boolean pubSub) throws JMSException, InterruptedException { Message message = produceMessage(acknowledgeMode, pubSub, true, false); - - JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS); - assertNotNull("JMSException is expected", exception); - AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException(); - assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException); - Message bounceMessage = (Message) noRouteException.getUndeliveredMessage(); - assertNotNull("Bounced Message is expected", bounceMessage); - assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID()); + _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName()); } private void publishWithMandatoryOffImmediateOff(int acknowledgeMode, boolean pubSub) throws JMSException, @@ -148,8 +128,7 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase impl { produceMessage(acknowledgeMode, pubSub, false, false); - JMSException exception = _exceptions.poll(1, TimeUnit.SECONDS); - assertNull("Unexpected JMSException", exception); + _testExceptionListener.assertNoException(); } private void consumerCreateAndClose(boolean pubSub, boolean durable) throws JMSException @@ -210,41 +189,26 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase impl Message message = session.createMessage(); producer.send(message); - JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS); - assertNotNull("JMSException is expected", exception); - AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException(); - assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException); - Message bounceMessage = (Message) noRouteException.getUndeliveredMessage(); - assertNotNull("Bounced Message is expected", bounceMessage); - assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID()); + _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName()); producer = session.createProducer(null); message = session.createMessage(); producer.send(session.createQueue(getTestQueueName()), message); - exception = _exceptions.poll(10, TimeUnit.SECONDS); - assertNotNull("JMSException is expected", exception); - noRouteException = (AMQNoRouteException) exception.getLinkedException(); - assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException); - bounceMessage = (Message) noRouteException.getUndeliveredMessage(); - assertNotNull("Bounced Message is expected", bounceMessage); - assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID()); - + _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName()); // publish to non-existent topic - should get no failure producer = session.createProducer(session.createTopic(getTestQueueName())); message = session.createMessage(); producer.send(message); - exception = _exceptions.poll(1, TimeUnit.SECONDS); - assertNull("Unexpected JMSException", exception); + _testExceptionListener.assertNoException(); producer = session.createProducer(null); message = session.createMessage(); producer.send(session.createTopic(getTestQueueName()), message); - exception = _exceptions.poll(1, TimeUnit.SECONDS); - assertNull("Unexpected JMSException", exception); + _testExceptionListener.assertNoException(); session.close(); } @@ -260,13 +224,7 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase impl Message message = session.createMessage(); producer.send(message); - JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS); - assertNotNull("JMSException is expected", exception); - AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException(); - assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException); - Message bounceMessage = (Message) noRouteException.getUndeliveredMessage(); - assertNotNull("Bounced Message is expected", bounceMessage); - assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID()); + _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName()); // now set topic specific system property to false - should no longer get mandatory failure on new producer setTestClientSystemProperty("qpid.default_mandatory_topic","false"); @@ -274,17 +232,6 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase impl message = session.createMessage(); producer.send(session.createTopic(getTestQueueName()), message); - exception = _exceptions.poll(1, TimeUnit.SECONDS); - if(exception != null) - { - exception.printStackTrace(); - } - assertNull("Unexpected JMSException", exception); - - } - - public void onException(JMSException exception) - { - _exceptions.add(exception); + _testExceptionListener.assertNoException(); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/UnroutableMessageTestExceptionListener.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/UnroutableMessageTestExceptionListener.java new file mode 100644 index 0000000000..7ba0bd17f3 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/UnroutableMessageTestExceptionListener.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.test.client; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertNull; +import static junit.framework.Assert.assertTrue; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQNoConsumersException; +import org.apache.qpid.client.AMQNoRouteException; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.protocol.AMQConstant; + +/** + * Provides utility methods for checking exceptions that are thrown on the client side when a message is + * not routable. + * + * Exception objects are passed either explicitly as method parameters or implicitly + * by previously doing {@link Connection#setExceptionListener(ExceptionListener)}. + */ +public class UnroutableMessageTestExceptionListener implements ExceptionListener +{ + private static final Logger _logger = Logger.getLogger(UnroutableMessageTestExceptionListener.class); + + /** + * Number of seconds to check for an event that should should NOT happen + */ + private static final int NEGATIVE_TIMEOUT = 2; + + /** + * Number of seconds to keep checking for an event that should should happen + */ + private static final int POSITIVE_TIMEOUT = 30; + + private BlockingQueue<JMSException> _exceptions = new ArrayBlockingQueue<JMSException>(1); + + @Override + public void onException(JMSException e) + { + _logger.info("Received exception " + e); + _exceptions.add(e); + } + + public void assertReceivedNoRouteWithReturnedMessage(Message message, String intendedQueueName) + { + JMSException exception = getReceivedException(); + assertNoRouteExceptionWithReturnedMessage(exception, message, intendedQueueName); + } + + public void assertReceivedNoRoute(String intendedQueueName) + { + JMSException exception = getReceivedException(); + assertNoRoute(exception, intendedQueueName); + } + + public void assertReceivedNoConsumersWithReturnedMessage(Message message) + { + JMSException exception = getReceivedException(); + AMQNoConsumersException noConsumersException = (AMQNoConsumersException) exception.getLinkedException(); + assertNotNull("AMQNoConsumersException should be linked to JMSException", noConsumersException); + Message bounceMessage = (Message) noConsumersException.getUndeliveredMessage(); + assertNotNull("Bounced Message is expected", bounceMessage); + + try + { + assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID()); + } + catch (JMSException e) + { + throw new RuntimeException("Couldn't check exception", e); + } + } + + public void assertReceivedReturnedMessageWithLongExceptionMessage(Message message, AMQQueue queue) + { + JMSException exception = getReceivedException(); + assertNoRouteException(exception, message); + AMQShortString exchangeName = queue.getExchangeName(); + String expectedMessage = "Error: No Route for message [Exchange: " + exchangeName.asString().substring(0, 220) + "..."; + assertTrue("Unexpected exception message: " + exception.getMessage(), exception.getMessage().contains(expectedMessage)); + } + + public void assertNoRouteExceptionWithReturnedMessage( + JMSException exception, Message message, String intendedQueueName) + { + assertNoRoute(exception, intendedQueueName); + + assertNoRouteException(exception, message); + } + + private void assertNoRouteException(JMSException exception, Message message) + { + AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException(); + assertNotNull("AMQNoRouteException should be linked to JMSException", noRouteException); + Message bounceMessage = (Message) noRouteException.getUndeliveredMessage(); + assertNotNull("Bounced Message is expected", bounceMessage); + + try + { + assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID()); + } + catch (JMSException e) + { + throw new RuntimeException("Couldn't check exception", e); + } + } + + public void assertNoRoute(JMSException exception, String intendedQueueName) + { + assertTrue( + exception + " message should contain intended queue name", + exception.getMessage().contains(intendedQueueName)); + + AMQException noRouteException = (AMQException) exception.getLinkedException(); + assertNotNull("AMQException should be linked to JMSException", noRouteException); + + assertEquals(AMQConstant.NO_ROUTE, noRouteException.getErrorCode()); + assertTrue( + "Linked exception " + noRouteException + " message should contain intended queue name", + noRouteException.getMessage().contains(intendedQueueName)); + } + + + public void assertNoException() + { + try + { + assertNull("Unexpected JMSException", _exceptions.poll(NEGATIVE_TIMEOUT, TimeUnit.SECONDS)); + } + catch (InterruptedException e) + { + throw new RuntimeException("Couldn't check exception", e); + } + } + + private JMSException getReceivedException() + { + try + { + JMSException exception = _exceptions.poll(POSITIVE_TIMEOUT, TimeUnit.SECONDS); + assertNotNull("JMSException is expected", exception); + return exception; + } + catch(InterruptedException e) + { + throw new RuntimeException("Couldn't check exception", e); + } + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java index 4dc26847da..f5a234163d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java @@ -26,6 +26,8 @@ import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Queue; +import org.apache.qpid.server.model.Broker; + /** * This tests the behaviour of transactional sessions when the {@code transactionTimeout} configuration * is set for a virtual host. @@ -39,6 +41,9 @@ public class TransactionTimeoutTest extends TransactionTimeoutTestCase protected void configure() throws Exception { + // switch off connection close in order to test timeout on publishing of unroutable messages + getBrokerConfiguration().setBrokerAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, false); + // Setup housekeeping every 100ms setVirtualHostConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.checkPeriod", "100"); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java index e61efd3e32..67e3e3b8f1 100755 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -1123,7 +1123,7 @@ public class QpidBrokerTestCase extends QpidTestCase public Connection getConnection(ConnectionURL url) throws JMSException { - _logger.info(url.getURL()); + _logger.debug("get connection for " + url.getURL()); Connection connection = new AMQConnectionFactory(url).createConnection(url.getUsername(), url.getPassword()); _connections.add(connection); @@ -1143,16 +1143,16 @@ public class QpidBrokerTestCase extends QpidTestCase */ public Connection getConnection(String username, String password) throws JMSException, NamingException { - _logger.info("get connection"); + _logger.debug("get connection for username " + username); Connection con = getConnectionFactory().createConnection(username, password); //add the connection in the list of connections _connections.add(con); return con; } - public Connection getClientConnection(String username, String password, String id) throws JMSException, URLSyntaxException, AMQException, NamingException + protected Connection getClientConnection(String username, String password, String id) throws JMSException, URLSyntaxException, AMQException, NamingException { - _logger.info("get Connection"); + _logger.debug("get connection for id " + id); Connection con = getConnectionFactory().createConnection(username, password, id); //add the connection in the list of connections _connections.add(con); @@ -1160,6 +1160,19 @@ public class QpidBrokerTestCase extends QpidTestCase } /** + * Useful, for example, to avoid the connection being automatically closed in {@link #tearDown()} + * if it has deliberately been put into an error state already. + */ + protected void forgetConnection(Connection connection) + { + _logger.debug("Forgetting about connection " + connection); + boolean removed = _connections.remove(connection); + assertTrue( + "The supplied connection " + connection + " should have been one that I already know about", + removed); + } + + /** * Return a uniqueName for this test. * In this case it returns a queue Named by the TestCase and TestName * @@ -1190,6 +1203,7 @@ public class QpidBrokerTestCase extends QpidTestCase return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, getTestQueueName()); } + @Override protected void tearDown() throws java.lang.Exception { super.tearDown(); |
