summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2013-06-21 17:06:57 +0000
committerAlex Rudyy <orudyy@apache.org>2013-06-21 17:06:57 +0000
commite409124b9f3a7423fe4ab04e7ce3e446244d04e3 (patch)
tree95eb9be13518f19536314f7c0993fe40d84c70c9 /qpid/java/systests/src
parent8bdb080ef1f4afb1727dc3fc5f2666bdfd982107 (diff)
downloadqpid-python-e409124b9f3a7423fe4ab04e7ce3e446244d04e3.tar.gz
QPID-4943: Introduce a feature for 0-8/0-9/0-9-1 protocols to close a connection on receiving a mandatory unroutable message in a transacted session
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1495511 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests/src')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java9
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java34
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/CloseOnNoRouteForMandatoryMessageTest.java223
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java93
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/UnroutableMessageTestExceptionListener.java178
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java5
-rwxr-xr-xqpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java22
8 files changed, 488 insertions, 80 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java
index 8324ac74a5..00c85e80c8 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java
@@ -19,6 +19,8 @@
package org.apache.qpid.server.security.acl;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.URLSyntaxException;
@@ -306,7 +308,11 @@ public class ExternalACLTest extends AbstractACLTestCase
conn.start();
- MessageProducer sender = sess.createProducer(sess.createQueue("example.RequestQueue"));
+ Queue queue = sess.createQueue("example.RequestQueue");
+
+ ((AMQSession<?,?>)sess).declareAndBind((AMQDestination)queue);
+
+ MessageProducer sender = sess.createProducer(queue);
sender.send(sess.createTextMessage("test"));
@@ -316,7 +322,6 @@ public class ExternalACLTest extends AbstractACLTestCase
conn.close();
}
-
public void setUpRequestResponseSuccess() throws Exception
{
// The group "messaging-users", referenced in the ACL below, is currently defined
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java
index 99ee99e8c5..cc662bddca 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java
@@ -14,6 +14,7 @@ import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.management.common.mbeans.ManagedBroker;
import org.apache.qpid.management.common.mbeans.ManagedExchange;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -40,6 +41,9 @@ public class ExchangeManagementTest extends QpidBrokerTestCase
{
getBrokerConfiguration().addJmxManagementConfiguration();
+ // to test exchange selectors the publishing of unroutable messages should be allowed
+ getBrokerConfiguration().setBrokerAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, false);
+
_jmxUtils = new JMXTestUtils(this);
super.setUp();
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java
index 0d40eca745..1d2f6e3427 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java
@@ -27,6 +27,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.server.configuration.BrokerConfigurationStoreCreator;
import org.apache.qpid.server.model.Broker;
@@ -35,7 +40,7 @@ import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.MessageStoreCreator;
-import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.test.client.UnroutableMessageTestExceptionListener;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
public class BrokerRestTest extends QpidRestTestCase
@@ -160,6 +165,33 @@ public class BrokerRestTest extends QpidRestTestCase
assertEquals("Unexpected update response for flow resume size > flow size", 409, response);
}
+ public void testSetCloseOnNoRoute() throws Exception
+ {
+ Map<String, Object> brokerDetails = getRestTestHelper().getJsonAsSingletonList("/rest/broker");
+ assertTrue("closeOnNoRoute should be true", (Boolean)brokerDetails.get(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE));
+
+ Map<String, Object> brokerAttributes = new HashMap<String, Object>();
+ brokerAttributes.put(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, false);
+
+ int response = getRestTestHelper().submitRequest("/rest/broker", "PUT", brokerAttributes);
+ assertEquals("Unexpected update response", 200, response);
+
+ brokerDetails = getRestTestHelper().getJsonAsSingletonList("/rest/broker");
+ assertFalse("closeOnNoRoute should be false", (Boolean)brokerDetails.get(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE));
+
+ Connection connection = getConnection();
+ UnroutableMessageTestExceptionListener exceptionListener = new UnroutableMessageTestExceptionListener();
+ connection.setExceptionListener(exceptionListener);
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(getTestQueue());
+ TextMessage message = session.createTextMessage("Test");
+ producer.send(message);
+
+ session.commit();
+
+ exceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
+ }
+
private Map<String, Object> getValidBrokerAttributes()
{
Map<String, Object> brokerAttributes = new HashMap<String, Object>();
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CloseOnNoRouteForMandatoryMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CloseOnNoRouteForMandatoryMessageTest.java
new file mode 100644
index 0000000000..a8a72c20fc
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CloseOnNoRouteForMandatoryMessageTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.test.client;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.naming.NamingException;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * Tests the broker's connection-closing behaviour when it receives an unroutable message
+ * on a transactional session.
+ *
+ * @see ImmediateAndMandatoryPublishingTest for more general tests of mandatory and immediate publishing
+ */
+public class CloseOnNoRouteForMandatoryMessageTest extends QpidBrokerTestCase
+{
+ private Connection _connection;
+ private UnroutableMessageTestExceptionListener _testExceptionListener = new UnroutableMessageTestExceptionListener();
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ public void testNoRoute_brokerClosesConnection() throws Exception
+ {
+ createConnectionWithCloseWhenNoRoute(true);
+
+ Session transactedSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ String testQueueName = getTestQueueName();
+ MessageProducer mandatoryProducer = ((AMQSession<?, ?>) transactedSession).createProducer(
+ transactedSession.createQueue(testQueueName),
+ true, // mandatory
+ false); // immediate
+
+ Message message = transactedSession.createMessage();
+ mandatoryProducer.send(message);
+ try
+ {
+ transactedSession.commit();
+ fail("Expected exception not thrown");
+ }
+ catch(JMSException e)
+ {
+ _testExceptionListener.assertNoRoute(e, testQueueName);
+ }
+ _testExceptionListener.assertReceivedNoRoute(testQueueName);
+
+ forgetConnection(_connection);
+ }
+
+ public void testCloseOnNoRouteWhenExceptionMessageLengthIsGreater255() throws Exception
+ {
+ createConnectionWithCloseWhenNoRoute(true);
+
+ AMQSession<?, ?> transactedSession = (AMQSession<?, ?>) _connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ StringBuilder longExchangeName = getLongExchangeName();
+
+ AMQShortString exchangeName = new AMQShortString(longExchangeName.toString());
+ transactedSession.declareExchange(exchangeName, new AMQShortString("direct"), false);
+
+ Destination testQueue = new AMQQueue(exchangeName, getTestQueueName());
+ MessageProducer mandatoryProducer = transactedSession.createProducer(
+ testQueue,
+ true, // mandatory
+ false); // immediate
+
+ Message message = transactedSession.createMessage();
+ mandatoryProducer.send(message);
+ try
+ {
+ transactedSession.commit();
+ fail("Expected exception not thrown");
+ }
+ catch (JMSException e)
+ {
+ AMQException noRouteException = (AMQException) e.getLinkedException();
+ assertNotNull("AMQException should be linked to JMSException", noRouteException);
+
+ assertEquals(AMQConstant.NO_ROUTE, noRouteException.getErrorCode());
+ String expectedMessage = "Error: No route for message [Exchange: " + longExchangeName.substring(0, 220) + "...";
+ assertEquals("Unexpected exception message: " + noRouteException.getMessage(), expectedMessage,
+ noRouteException.getMessage());
+ }
+ finally
+ {
+ forgetConnection(_connection);
+ }
+ }
+
+ public void testNoRouteMessageReurnedWhenExceptionMessageLengthIsGreater255() throws Exception
+ {
+ createConnectionWithCloseWhenNoRoute(false);
+
+ AMQSession<?, ?> transactedSession = (AMQSession<?, ?>) _connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ StringBuilder longExchangeName = getLongExchangeName();
+
+ AMQShortString exchangeName = new AMQShortString(longExchangeName.toString());
+ transactedSession.declareExchange(exchangeName, new AMQShortString("direct"), false);
+
+ AMQQueue testQueue = new AMQQueue(exchangeName, getTestQueueName());
+ MessageProducer mandatoryProducer = transactedSession.createProducer(
+ testQueue,
+ true, // mandatory
+ false); // immediate
+
+ Message message = transactedSession.createMessage();
+ mandatoryProducer.send(message);
+ transactedSession.commit();
+ _testExceptionListener.assertReceivedReturnedMessageWithLongExceptionMessage(message, testQueue);
+ }
+
+ private StringBuilder getLongExchangeName()
+ {
+ StringBuilder longExchangeName = new StringBuilder();
+ for (int i = 0; i < 50; i++)
+ {
+ longExchangeName.append("abcde");
+ }
+ return longExchangeName;
+ }
+
+ public void testNoRouteForNonMandatoryMessage_brokerKeepsConnectionOpenAndCallsExceptionListener() throws Exception
+ {
+ createConnectionWithCloseWhenNoRoute(true);
+
+ Session transactedSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ String testQueueName = getTestQueueName();
+ MessageProducer nonMandatoryProducer = ((AMQSession<?, ?>) transactedSession).createProducer(
+ transactedSession.createQueue(testQueueName),
+ false, // mandatory
+ false); // immediate
+
+ Message message = transactedSession.createMessage();
+ nonMandatoryProducer.send(message);
+
+ // should succeed - the message is simply discarded
+ transactedSession.commit();
+
+ _testExceptionListener.assertNoException();
+ }
+
+
+ public void testNoRouteOnNonTransactionalSession_brokerKeepsConnectionOpenAndCallsExceptionListener() throws Exception
+ {
+ createConnectionWithCloseWhenNoRoute(true);
+
+ Session nonTransactedSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String testQueueName = getTestQueueName();
+ MessageProducer mandatoryProducer = ((AMQSession<?, ?>) nonTransactedSession).createProducer(
+ nonTransactedSession.createQueue(testQueueName),
+ true, // mandatory
+ false); // immediate
+
+ Message message = nonTransactedSession.createMessage();
+ mandatoryProducer.send(message);
+
+ // should succeed - the message is asynchronously bounced back to the exception listener
+ message.acknowledge();
+
+ _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
+ }
+
+ public void testClientDisablesCloseOnNoRoute_brokerKeepsConnectionOpenAndCallsExceptionListener() throws Exception
+ {
+ createConnectionWithCloseWhenNoRoute(false);
+
+ Session transactedSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ String testQueueName = getTestQueueName();
+ MessageProducer mandatoryProducer = ((AMQSession<?, ?>) transactedSession).createProducer(
+ transactedSession.createQueue(testQueueName),
+ true, // mandatory
+ false); // immediate
+
+ Message message = transactedSession.createMessage();
+ mandatoryProducer.send(message);
+ transactedSession.commit();
+ _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
+ }
+
+ private void createConnectionWithCloseWhenNoRoute(boolean closeWhenNoRoute) throws URLSyntaxException, NamingException, JMSException
+ {
+ Map<String, String> options = new HashMap<String, String>();
+ options.put(ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE, Boolean.toString(closeWhenNoRoute));
+ _connection = getConnectionWithOptions(options);
+ _connection.setExceptionListener(_testExceptionListener);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java
index b746a5b09e..d012b9abbb 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java
@@ -18,34 +18,34 @@
*/
package org.apache.qpid.test.client;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
-import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
-import org.apache.qpid.client.AMQNoConsumersException;
-import org.apache.qpid.client.AMQNoRouteException;
+
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
-public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase implements ExceptionListener
+/**
+ * @see CloseOnNoRouteForMandatoryMessageTest for related tests
+ */
+public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase
{
private Connection _connection;
- private BlockingQueue<JMSException> _exceptions;
+ private UnroutableMessageTestExceptionListener _testExceptionListener = new UnroutableMessageTestExceptionListener();
+ @Override
public void setUp() throws Exception
{
+ getBrokerConfiguration().setBrokerAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, false);
super.setUp();
- _exceptions = new ArrayBlockingQueue<JMSException>(1);
_connection = getConnection();
- _connection.setExceptionListener(this);
+ _connection.setExceptionListener(_testExceptionListener);
}
public void testPublishP2PWithNoConsumerAndImmediateOnAndAutoAck() throws Exception
@@ -103,14 +103,7 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase impl
consumerCreateAndClose(true, false);
Message message = produceMessage(Session.AUTO_ACKNOWLEDGE, true, false, true);
-
- JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS);
- assertNotNull("JMSException is expected", exception);
- AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException();
- assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException);
- Message bounceMessage = (Message) noRouteException.getUndeliveredMessage();
- assertNotNull("Bounced Message is expected", bounceMessage);
- assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID());
+ _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
}
private void publishIntoExistingDestinationWithNoConsumerAndImmediateOn(int acknowledgeMode, boolean pubSub)
@@ -120,27 +113,14 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase impl
Message message = produceMessage(acknowledgeMode, pubSub, false, true);
- JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS);
- assertNotNull("JMSException is expected", exception);
- AMQNoConsumersException noConsumerException = (AMQNoConsumersException) exception.getLinkedException();
- assertNotNull("AMQNoConsumersException should be linked to JMSEXception", noConsumerException);
- Message bounceMessage = (Message) noConsumerException.getUndeliveredMessage();
- assertNotNull("Bounced Message is expected", bounceMessage);
- assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID());
+ _testExceptionListener.assertReceivedNoConsumersWithReturnedMessage(message);
}
private void publishWithMandatoryOnImmediateOff(int acknowledgeMode, boolean pubSub) throws JMSException,
InterruptedException
{
Message message = produceMessage(acknowledgeMode, pubSub, true, false);
-
- JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS);
- assertNotNull("JMSException is expected", exception);
- AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException();
- assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException);
- Message bounceMessage = (Message) noRouteException.getUndeliveredMessage();
- assertNotNull("Bounced Message is expected", bounceMessage);
- assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID());
+ _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
}
private void publishWithMandatoryOffImmediateOff(int acknowledgeMode, boolean pubSub) throws JMSException,
@@ -148,8 +128,7 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase impl
{
produceMessage(acknowledgeMode, pubSub, false, false);
- JMSException exception = _exceptions.poll(1, TimeUnit.SECONDS);
- assertNull("Unexpected JMSException", exception);
+ _testExceptionListener.assertNoException();
}
private void consumerCreateAndClose(boolean pubSub, boolean durable) throws JMSException
@@ -210,41 +189,26 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase impl
Message message = session.createMessage();
producer.send(message);
- JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS);
- assertNotNull("JMSException is expected", exception);
- AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException();
- assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException);
- Message bounceMessage = (Message) noRouteException.getUndeliveredMessage();
- assertNotNull("Bounced Message is expected", bounceMessage);
- assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID());
+ _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
producer = session.createProducer(null);
message = session.createMessage();
producer.send(session.createQueue(getTestQueueName()), message);
- exception = _exceptions.poll(10, TimeUnit.SECONDS);
- assertNotNull("JMSException is expected", exception);
- noRouteException = (AMQNoRouteException) exception.getLinkedException();
- assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException);
- bounceMessage = (Message) noRouteException.getUndeliveredMessage();
- assertNotNull("Bounced Message is expected", bounceMessage);
- assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID());
-
+ _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
// publish to non-existent topic - should get no failure
producer = session.createProducer(session.createTopic(getTestQueueName()));
message = session.createMessage();
producer.send(message);
- exception = _exceptions.poll(1, TimeUnit.SECONDS);
- assertNull("Unexpected JMSException", exception);
+ _testExceptionListener.assertNoException();
producer = session.createProducer(null);
message = session.createMessage();
producer.send(session.createTopic(getTestQueueName()), message);
- exception = _exceptions.poll(1, TimeUnit.SECONDS);
- assertNull("Unexpected JMSException", exception);
+ _testExceptionListener.assertNoException();
session.close();
}
@@ -260,13 +224,7 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase impl
Message message = session.createMessage();
producer.send(message);
- JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS);
- assertNotNull("JMSException is expected", exception);
- AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException();
- assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException);
- Message bounceMessage = (Message) noRouteException.getUndeliveredMessage();
- assertNotNull("Bounced Message is expected", bounceMessage);
- assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID());
+ _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
// now set topic specific system property to false - should no longer get mandatory failure on new producer
setTestClientSystemProperty("qpid.default_mandatory_topic","false");
@@ -274,17 +232,6 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase impl
message = session.createMessage();
producer.send(session.createTopic(getTestQueueName()), message);
- exception = _exceptions.poll(1, TimeUnit.SECONDS);
- if(exception != null)
- {
- exception.printStackTrace();
- }
- assertNull("Unexpected JMSException", exception);
-
- }
-
- public void onException(JMSException exception)
- {
- _exceptions.add(exception);
+ _testExceptionListener.assertNoException();
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/UnroutableMessageTestExceptionListener.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/UnroutableMessageTestExceptionListener.java
new file mode 100644
index 0000000000..7ba0bd17f3
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/UnroutableMessageTestExceptionListener.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.test.client;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertNull;
+import static junit.framework.Assert.assertTrue;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.protocol.AMQConstant;
+
+/**
+ * Provides utility methods for checking exceptions that are thrown on the client side when a message is
+ * not routable.
+ *
+ * Exception objects are passed either explicitly as method parameters or implicitly
+ * by previously doing {@link Connection#setExceptionListener(ExceptionListener)}.
+ */
+public class UnroutableMessageTestExceptionListener implements ExceptionListener
+{
+ private static final Logger _logger = Logger.getLogger(UnroutableMessageTestExceptionListener.class);
+
+ /**
+ * Number of seconds to check for an event that should should NOT happen
+ */
+ private static final int NEGATIVE_TIMEOUT = 2;
+
+ /**
+ * Number of seconds to keep checking for an event that should should happen
+ */
+ private static final int POSITIVE_TIMEOUT = 30;
+
+ private BlockingQueue<JMSException> _exceptions = new ArrayBlockingQueue<JMSException>(1);
+
+ @Override
+ public void onException(JMSException e)
+ {
+ _logger.info("Received exception " + e);
+ _exceptions.add(e);
+ }
+
+ public void assertReceivedNoRouteWithReturnedMessage(Message message, String intendedQueueName)
+ {
+ JMSException exception = getReceivedException();
+ assertNoRouteExceptionWithReturnedMessage(exception, message, intendedQueueName);
+ }
+
+ public void assertReceivedNoRoute(String intendedQueueName)
+ {
+ JMSException exception = getReceivedException();
+ assertNoRoute(exception, intendedQueueName);
+ }
+
+ public void assertReceivedNoConsumersWithReturnedMessage(Message message)
+ {
+ JMSException exception = getReceivedException();
+ AMQNoConsumersException noConsumersException = (AMQNoConsumersException) exception.getLinkedException();
+ assertNotNull("AMQNoConsumersException should be linked to JMSException", noConsumersException);
+ Message bounceMessage = (Message) noConsumersException.getUndeliveredMessage();
+ assertNotNull("Bounced Message is expected", bounceMessage);
+
+ try
+ {
+ assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID());
+ }
+ catch (JMSException e)
+ {
+ throw new RuntimeException("Couldn't check exception", e);
+ }
+ }
+
+ public void assertReceivedReturnedMessageWithLongExceptionMessage(Message message, AMQQueue queue)
+ {
+ JMSException exception = getReceivedException();
+ assertNoRouteException(exception, message);
+ AMQShortString exchangeName = queue.getExchangeName();
+ String expectedMessage = "Error: No Route for message [Exchange: " + exchangeName.asString().substring(0, 220) + "...";
+ assertTrue("Unexpected exception message: " + exception.getMessage(), exception.getMessage().contains(expectedMessage));
+ }
+
+ public void assertNoRouteExceptionWithReturnedMessage(
+ JMSException exception, Message message, String intendedQueueName)
+ {
+ assertNoRoute(exception, intendedQueueName);
+
+ assertNoRouteException(exception, message);
+ }
+
+ private void assertNoRouteException(JMSException exception, Message message)
+ {
+ AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException();
+ assertNotNull("AMQNoRouteException should be linked to JMSException", noRouteException);
+ Message bounceMessage = (Message) noRouteException.getUndeliveredMessage();
+ assertNotNull("Bounced Message is expected", bounceMessage);
+
+ try
+ {
+ assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID());
+ }
+ catch (JMSException e)
+ {
+ throw new RuntimeException("Couldn't check exception", e);
+ }
+ }
+
+ public void assertNoRoute(JMSException exception, String intendedQueueName)
+ {
+ assertTrue(
+ exception + " message should contain intended queue name",
+ exception.getMessage().contains(intendedQueueName));
+
+ AMQException noRouteException = (AMQException) exception.getLinkedException();
+ assertNotNull("AMQException should be linked to JMSException", noRouteException);
+
+ assertEquals(AMQConstant.NO_ROUTE, noRouteException.getErrorCode());
+ assertTrue(
+ "Linked exception " + noRouteException + " message should contain intended queue name",
+ noRouteException.getMessage().contains(intendedQueueName));
+ }
+
+
+ public void assertNoException()
+ {
+ try
+ {
+ assertNull("Unexpected JMSException", _exceptions.poll(NEGATIVE_TIMEOUT, TimeUnit.SECONDS));
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException("Couldn't check exception", e);
+ }
+ }
+
+ private JMSException getReceivedException()
+ {
+ try
+ {
+ JMSException exception = _exceptions.poll(POSITIVE_TIMEOUT, TimeUnit.SECONDS);
+ assertNotNull("JMSException is expected", exception);
+ return exception;
+ }
+ catch(InterruptedException e)
+ {
+ throw new RuntimeException("Couldn't check exception", e);
+ }
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
index 4dc26847da..f5a234163d 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
@@ -26,6 +26,8 @@ import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
+import org.apache.qpid.server.model.Broker;
+
/**
* This tests the behaviour of transactional sessions when the {@code transactionTimeout} configuration
* is set for a virtual host.
@@ -39,6 +41,9 @@ public class TransactionTimeoutTest extends TransactionTimeoutTestCase
protected void configure() throws Exception
{
+ // switch off connection close in order to test timeout on publishing of unroutable messages
+ getBrokerConfiguration().setBrokerAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, false);
+
// Setup housekeeping every 100ms
setVirtualHostConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.checkPeriod", "100");
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
index e61efd3e32..67e3e3b8f1 100755
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
@@ -1123,7 +1123,7 @@ public class QpidBrokerTestCase extends QpidTestCase
public Connection getConnection(ConnectionURL url) throws JMSException
{
- _logger.info(url.getURL());
+ _logger.debug("get connection for " + url.getURL());
Connection connection = new AMQConnectionFactory(url).createConnection(url.getUsername(), url.getPassword());
_connections.add(connection);
@@ -1143,16 +1143,16 @@ public class QpidBrokerTestCase extends QpidTestCase
*/
public Connection getConnection(String username, String password) throws JMSException, NamingException
{
- _logger.info("get connection");
+ _logger.debug("get connection for username " + username);
Connection con = getConnectionFactory().createConnection(username, password);
//add the connection in the list of connections
_connections.add(con);
return con;
}
- public Connection getClientConnection(String username, String password, String id) throws JMSException, URLSyntaxException, AMQException, NamingException
+ protected Connection getClientConnection(String username, String password, String id) throws JMSException, URLSyntaxException, AMQException, NamingException
{
- _logger.info("get Connection");
+ _logger.debug("get connection for id " + id);
Connection con = getConnectionFactory().createConnection(username, password, id);
//add the connection in the list of connections
_connections.add(con);
@@ -1160,6 +1160,19 @@ public class QpidBrokerTestCase extends QpidTestCase
}
/**
+ * Useful, for example, to avoid the connection being automatically closed in {@link #tearDown()}
+ * if it has deliberately been put into an error state already.
+ */
+ protected void forgetConnection(Connection connection)
+ {
+ _logger.debug("Forgetting about connection " + connection);
+ boolean removed = _connections.remove(connection);
+ assertTrue(
+ "The supplied connection " + connection + " should have been one that I already know about",
+ removed);
+ }
+
+ /**
* Return a uniqueName for this test.
* In this case it returns a queue Named by the TestCase and TestName
*
@@ -1190,6 +1203,7 @@ public class QpidBrokerTestCase extends QpidTestCase
return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, getTestQueueName());
}
+ @Override
protected void tearDown() throws java.lang.Exception
{
super.tearDown();