summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java76
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java244
-rwxr-xr-xqpid/java/test-profiles/CPPExcludes1
-rwxr-xr-xqpid/java/test-profiles/Java010Excludes1
4 files changed, 290 insertions, 32 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 74c9878a8e..aadfb44ca0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -126,7 +126,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/** The virtual path to connect to on the AMQ server */
private String _virtualHost;
- private ExceptionListener _exceptionListener;
+ /** The exception listener for this connection object. */
+ private volatile ExceptionListener _exceptionListener;
private ConnectionListener _connectionListener;
@@ -784,13 +785,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public ExceptionListener getExceptionListener() throws JMSException
{
checkNotClosed();
-
- return _exceptionListener;
+ return getExceptionListenerNoCheck();
}
public void setExceptionListener(ExceptionListener listener) throws JMSException
{
checkNotClosed();
+
_exceptionListener = listener;
}
@@ -1307,45 +1308,56 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_protocolHandler.getProtocolSession().notifyError(je);
}
- // get the failover mutex before trying to close
- synchronized (getFailoverMutex())
+ try
{
- // decide if we are going to close the session
- if (hardError(cause))
+ // get the failover mutex before trying to close
+ synchronized (getFailoverMutex())
{
- closer = (!setClosed()) || closer;
+ // decide if we are going to close the session
+ if (hardError(cause))
{
- _logger.info("Closing AMQConnection due to :" + cause);
+ closer = (!setClosed()) || closer;
+ {
+ _logger.info("Closing AMQConnection due to :" + cause);
+ }
}
- }
- else
- {
- _logger.info("Not a hard-error connection not closing: " + cause);
- }
-
- // deliver the exception if there is a listener
- if (_exceptionListener != null)
- {
- _exceptionListener.onException(je);
- }
- else
- {
- _logger.error("Throwable Received but no listener set: " + cause);
- }
-
- // if we are closing the connection, close sessions first
- if (closer)
- {
- try
+ else
{
- closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor.
+ _logger.info("Not a hard-error connection not closing: " + cause);
}
- catch (JMSException e)
+
+ // if we are closing the connection, close sessions first
+ if (closer)
{
- _logger.error("Error closing all sessions: " + e, e);
+ try
+ {
+ closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor.
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error closing all sessions: " + e, e);
+ }
}
}
}
+ finally
+ {
+ deliverJMSExceptionToExceptionListenerOrLog(je, cause);
+ }
+ }
+
+ private void deliverJMSExceptionToExceptionListenerOrLog(final JMSException je, final Throwable cause)
+ {
+ // deliver the exception if there is a listener
+ ExceptionListener exceptionListener = getExceptionListenerNoCheck();
+ if (exceptionListener != null)
+ {
+ exceptionListener.onException(je);
+ }
+ else
+ {
+ _logger.error("Throwable Received but no listener set: " + cause);
+ }
}
private boolean hardError(Throwable cause)
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java
new file mode 100644
index 0000000000..141de1e5a8
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.unit.client.connection;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.AMQConnectionClosedException;
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.transport.ConnectionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExceptionListenerTest extends QpidBrokerTestCase
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ExceptionListenerTest.class);
+
+ private volatile Throwable _lastExceptionListenerException = null;
+
+ public void testExceptionListenerHearsBrokerShutdown() throws Exception
+ {
+ final CountDownLatch exceptionReceivedLatch = new CountDownLatch(1);
+ final AtomicInteger exceptionCounter = new AtomicInteger(0);
+ final ExceptionListener listener = new ExceptionListener()
+ {
+ public void onException(JMSException exception)
+ {
+ exceptionCounter.incrementAndGet();
+ _lastExceptionListenerException = exception;
+ exceptionReceivedLatch.countDown();
+ }
+ };
+
+ Connection connection = getConnection();
+ connection.setExceptionListener(listener);
+
+ stopBroker();
+
+ exceptionReceivedLatch.await(10, TimeUnit.SECONDS);
+
+ assertEquals("Unexpected number of exceptions received", 1, exceptionCounter.intValue());
+ LOGGER.debug("exception was", _lastExceptionListenerException);
+ assertNotNull("Exception should have cause", _lastExceptionListenerException.getCause());
+ Class<? extends Exception> expectedExceptionClass = isBroker010() ? ConnectionException.class : AMQConnectionClosedException.class;
+ assertEquals(expectedExceptionClass, _lastExceptionListenerException.getCause().getClass());
+ }
+
+ /**
+ * It is reasonable for an application to perform Connection#close within the exception
+ * listener. This test verifies that close is allowed, and proceeds without generating
+ * further exceptions.
+ */
+ public void testExceptionListenerClosesConnection_IsAllowed() throws Exception
+ {
+ final CountDownLatch exceptionReceivedLatch = new CountDownLatch(1);
+ final Connection connection = getConnection();
+ final ExceptionListener listener = new ExceptionListener()
+ {
+ public void onException(JMSException exception)
+ {
+ try
+ {
+ connection.close();
+ // PASS
+ }
+ catch (Throwable t)
+ {
+ _lastExceptionListenerException = t;
+ }
+ finally
+ {
+ exceptionReceivedLatch.countDown();
+ }
+ }
+ };
+ connection.setExceptionListener(listener);
+
+
+ stopBroker();
+
+ boolean exceptionReceived = exceptionReceivedLatch.await(10, TimeUnit.SECONDS);
+ assertTrue("Exception listener did not hear exception within timeout", exceptionReceived);
+ assertNull("Connection#close() should not have thrown exception", _lastExceptionListenerException);
+ }
+
+ /**
+ * Spring's SingleConnectionFactory installs an ExceptionListener that calls stop()
+ * and ignores any IllegalStateException that result. This test serves to test this
+ * scenario.
+ */
+ public void testExceptionListenerStopsConnection_ThrowsIllegalStateException() throws Exception
+ {
+ final CountDownLatch exceptionReceivedLatch = new CountDownLatch(1);
+ final Connection connection = getConnection();
+ final ExceptionListener listener = new ExceptionListener()
+ {
+ public void onException(JMSException exception)
+ {
+ try
+ {
+ connection.stop();
+ fail("Exception not thrown");
+ }
+ catch (IllegalStateException ise)
+ {
+ // PASS
+ }
+ catch (Throwable t)
+ {
+ _lastExceptionListenerException = t;
+ }
+ finally
+ {
+ exceptionReceivedLatch.countDown();
+ }
+ }
+ };
+ connection.setExceptionListener(listener);
+
+ stopBroker();
+
+ boolean exceptionReceived = exceptionReceivedLatch.await(10, TimeUnit.SECONDS);
+ assertTrue("Exception listener did not hear exception within timeout", exceptionReceived);
+ assertNull("Connection#stop() should not have thrown unexpected exception", _lastExceptionListenerException);
+ }
+
+ /**
+ * This test reproduces a deadlock that was the subject of a support call. A Spring based
+ * application was using SingleConnectionFactory. It installed an ExceptionListener that
+ * stops and closes the connection in response to any exception. On receipt of a message
+ * the application would create a new session then send a response message (within onMessage).
+ * It appears that a misconfiguration in the application meant that some of these messages
+ * were bounced (no-route). Bounces are treated like connection exceptions and are passed
+ * back to the application via the ExceptionListener. The deadlock occurred between the
+ * ExceptionListener's call to stop() and the MessageListener's attempt to create a new
+ * session.
+ */
+ public void testExceptionListenerConnectionStopDeadlock() throws Exception
+ {
+ Queue messageQueue = getTestQueue();
+
+ Map<String, String> options = new HashMap<String, String>();
+ options.put(ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE, Boolean.toString(false));
+
+ final Connection connection = getConnectionWithOptions(options);
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ session.createConsumer(messageQueue).close(); // Create queue by side-effect
+
+ // Put 10 messages onto messageQueue
+ sendMessage(session, messageQueue, 10);
+
+ // Install an exception listener that stops/closes the connection on receipt of 2nd AMQNoRouteException.
+ // (Triggering on the 2nd (rather than 1st) seems to increase the probability that the test ends in deadlock,
+ // at least on my machine).
+ final CountDownLatch exceptionReceivedLatch = new CountDownLatch(2);
+ final ExceptionListener listener = new ExceptionListener()
+ {
+ public void onException(JMSException exception)
+ {
+ try
+ {
+ assertNotNull("JMS Exception must have cause", exception.getCause() );
+ assertEquals("JMS Exception is of wrong type", AMQNoRouteException.class, exception.getCause().getClass());
+ exceptionReceivedLatch.countDown();
+ if (exceptionReceivedLatch.getCount() == 0)
+ {
+ connection.stop(); // ** Deadlock
+ connection.close();
+ }
+ }
+ catch (Throwable t)
+ {
+ _lastExceptionListenerException = t;
+ }
+ }
+ };
+ connection.setExceptionListener(listener);
+
+ // Create a message listener that receives from testQueue and tries to forward them to unknown queue (thus
+ // provoking AMQNoRouteException exceptions to be delivered to the ExceptionListener).
+ final Queue unknownQueue = session.createQueue(getTestQueueName() + "_unknown");;
+ MessageListener redirectingMessageListener = new MessageListener()
+ {
+ @Override
+ public void onMessage(Message msg)
+ {
+ try
+ {
+ Session mlSession = connection.createSession(true, Session.SESSION_TRANSACTED); // ** Deadlock
+ mlSession.createProducer(unknownQueue).send(msg);
+ mlSession.commit();
+ }
+ catch (JMSException je)
+ {
+ // Connection is closed by the listener, so exceptions here are expected.
+ LOGGER.debug("Expected exception - message listener got exception", je);
+ }
+ }
+ };
+
+ MessageConsumer consumer = session.createConsumer(messageQueue);
+ consumer.setMessageListener(redirectingMessageListener);
+ connection.start();
+
+ // Await the 2nd exception
+ boolean exceptionReceived = exceptionReceivedLatch.await(10, TimeUnit.SECONDS);
+ assertTrue("Exception listener did not hear exception within timeout", exceptionReceived);
+ assertNull("Exception listener should not have had experienced exception", _lastExceptionListenerException);
+ }
+}
diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes
index a75d96fb85..8ea592daf9 100755
--- a/qpid/java/test-profiles/CPPExcludes
+++ b/qpid/java/test-profiles/CPPExcludes
@@ -62,6 +62,7 @@ org.apache.qpid.test.client.timeouts.SyncWaitTimeoutDelayTest#*
// c++ broker doesn't support message bouncing
org.apache.qpid.server.exchange.ReturnUnroutableMandatoryMessageTest#*
org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testUnsubscribeWhenUsingSelectorMakesTopicUnreachable
+org.apache.qpid.test.unit.client.connection.ExceptionListenerTest#testExceptionListenerConnectionStopDeadlock
// c++ broker expires messages on delivery or when the queue cleaner thread runs.
org.apache.qpid.server.queue.TimeToLiveTest#testActiveTTL
diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes
index 055ce25d75..02c2db1134 100755
--- a/qpid/java/test-profiles/Java010Excludes
+++ b/qpid/java/test-profiles/Java010Excludes
@@ -20,6 +20,7 @@
// Those tests are testing 0.8..-0-9-1 specific semantics
org.apache.qpid.test.client.ImmediateAndMandatoryPublishingTest#*
org.apache.qpid.test.client.CloseOnNoRouteForMandatoryMessageTest#*
+org.apache.qpid.test.unit.client.connection.ExceptionListenerTest#testExceptionListenerConnectionStopDeadlock
org.apache.qpid.systest.rest.BrokerRestTest#testSetCloseOnNoRoute
//this test checks explicitly for 0-8 flow control semantics