diff options
Diffstat (limited to 'qpid/java')
4 files changed, 290 insertions, 32 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 74c9878a8e..aadfb44ca0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -126,7 +126,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** The virtual path to connect to on the AMQ server */ private String _virtualHost; - private ExceptionListener _exceptionListener; + /** The exception listener for this connection object. */ + private volatile ExceptionListener _exceptionListener; private ConnectionListener _connectionListener; @@ -784,13 +785,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public ExceptionListener getExceptionListener() throws JMSException { checkNotClosed(); - - return _exceptionListener; + return getExceptionListenerNoCheck(); } public void setExceptionListener(ExceptionListener listener) throws JMSException { checkNotClosed(); + _exceptionListener = listener; } @@ -1307,45 +1308,56 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _protocolHandler.getProtocolSession().notifyError(je); } - // get the failover mutex before trying to close - synchronized (getFailoverMutex()) + try { - // decide if we are going to close the session - if (hardError(cause)) + // get the failover mutex before trying to close + synchronized (getFailoverMutex()) { - closer = (!setClosed()) || closer; + // decide if we are going to close the session + if (hardError(cause)) { - _logger.info("Closing AMQConnection due to :" + cause); + closer = (!setClosed()) || closer; + { + _logger.info("Closing AMQConnection due to :" + cause); + } } - } - else - { - _logger.info("Not a hard-error connection not closing: " + cause); - } - - // deliver the exception if there is a listener - if (_exceptionListener != null) - { - _exceptionListener.onException(je); - } - else - { - _logger.error("Throwable Received but no listener set: " + cause); - } - - // if we are closing the connection, close sessions first - if (closer) - { - try + else { - closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. + _logger.info("Not a hard-error connection not closing: " + cause); } - catch (JMSException e) + + // if we are closing the connection, close sessions first + if (closer) { - _logger.error("Error closing all sessions: " + e, e); + try + { + closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. + } + catch (JMSException e) + { + _logger.error("Error closing all sessions: " + e, e); + } } } } + finally + { + deliverJMSExceptionToExceptionListenerOrLog(je, cause); + } + } + + private void deliverJMSExceptionToExceptionListenerOrLog(final JMSException je, final Throwable cause) + { + // deliver the exception if there is a listener + ExceptionListener exceptionListener = getExceptionListenerNoCheck(); + if (exceptionListener != null) + { + exceptionListener.onException(je); + } + else + { + _logger.error("Throwable Received but no listener set: " + cause); + } } private boolean hardError(Throwable cause) diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java new file mode 100644 index 0000000000..141de1e5a8 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.test.unit.client.connection; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.qpid.AMQConnectionClosedException; +import org.apache.qpid.client.AMQNoRouteException; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.transport.ConnectionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExceptionListenerTest extends QpidBrokerTestCase +{ + private static final Logger LOGGER = LoggerFactory.getLogger(ExceptionListenerTest.class); + + private volatile Throwable _lastExceptionListenerException = null; + + public void testExceptionListenerHearsBrokerShutdown() throws Exception + { + final CountDownLatch exceptionReceivedLatch = new CountDownLatch(1); + final AtomicInteger exceptionCounter = new AtomicInteger(0); + final ExceptionListener listener = new ExceptionListener() + { + public void onException(JMSException exception) + { + exceptionCounter.incrementAndGet(); + _lastExceptionListenerException = exception; + exceptionReceivedLatch.countDown(); + } + }; + + Connection connection = getConnection(); + connection.setExceptionListener(listener); + + stopBroker(); + + exceptionReceivedLatch.await(10, TimeUnit.SECONDS); + + assertEquals("Unexpected number of exceptions received", 1, exceptionCounter.intValue()); + LOGGER.debug("exception was", _lastExceptionListenerException); + assertNotNull("Exception should have cause", _lastExceptionListenerException.getCause()); + Class<? extends Exception> expectedExceptionClass = isBroker010() ? ConnectionException.class : AMQConnectionClosedException.class; + assertEquals(expectedExceptionClass, _lastExceptionListenerException.getCause().getClass()); + } + + /** + * It is reasonable for an application to perform Connection#close within the exception + * listener. This test verifies that close is allowed, and proceeds without generating + * further exceptions. + */ + public void testExceptionListenerClosesConnection_IsAllowed() throws Exception + { + final CountDownLatch exceptionReceivedLatch = new CountDownLatch(1); + final Connection connection = getConnection(); + final ExceptionListener listener = new ExceptionListener() + { + public void onException(JMSException exception) + { + try + { + connection.close(); + // PASS + } + catch (Throwable t) + { + _lastExceptionListenerException = t; + } + finally + { + exceptionReceivedLatch.countDown(); + } + } + }; + connection.setExceptionListener(listener); + + + stopBroker(); + + boolean exceptionReceived = exceptionReceivedLatch.await(10, TimeUnit.SECONDS); + assertTrue("Exception listener did not hear exception within timeout", exceptionReceived); + assertNull("Connection#close() should not have thrown exception", _lastExceptionListenerException); + } + + /** + * Spring's SingleConnectionFactory installs an ExceptionListener that calls stop() + * and ignores any IllegalStateException that result. This test serves to test this + * scenario. + */ + public void testExceptionListenerStopsConnection_ThrowsIllegalStateException() throws Exception + { + final CountDownLatch exceptionReceivedLatch = new CountDownLatch(1); + final Connection connection = getConnection(); + final ExceptionListener listener = new ExceptionListener() + { + public void onException(JMSException exception) + { + try + { + connection.stop(); + fail("Exception not thrown"); + } + catch (IllegalStateException ise) + { + // PASS + } + catch (Throwable t) + { + _lastExceptionListenerException = t; + } + finally + { + exceptionReceivedLatch.countDown(); + } + } + }; + connection.setExceptionListener(listener); + + stopBroker(); + + boolean exceptionReceived = exceptionReceivedLatch.await(10, TimeUnit.SECONDS); + assertTrue("Exception listener did not hear exception within timeout", exceptionReceived); + assertNull("Connection#stop() should not have thrown unexpected exception", _lastExceptionListenerException); + } + + /** + * This test reproduces a deadlock that was the subject of a support call. A Spring based + * application was using SingleConnectionFactory. It installed an ExceptionListener that + * stops and closes the connection in response to any exception. On receipt of a message + * the application would create a new session then send a response message (within onMessage). + * It appears that a misconfiguration in the application meant that some of these messages + * were bounced (no-route). Bounces are treated like connection exceptions and are passed + * back to the application via the ExceptionListener. The deadlock occurred between the + * ExceptionListener's call to stop() and the MessageListener's attempt to create a new + * session. + */ + public void testExceptionListenerConnectionStopDeadlock() throws Exception + { + Queue messageQueue = getTestQueue(); + + Map<String, String> options = new HashMap<String, String>(); + options.put(ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE, Boolean.toString(false)); + + final Connection connection = getConnectionWithOptions(options); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + session.createConsumer(messageQueue).close(); // Create queue by side-effect + + // Put 10 messages onto messageQueue + sendMessage(session, messageQueue, 10); + + // Install an exception listener that stops/closes the connection on receipt of 2nd AMQNoRouteException. + // (Triggering on the 2nd (rather than 1st) seems to increase the probability that the test ends in deadlock, + // at least on my machine). + final CountDownLatch exceptionReceivedLatch = new CountDownLatch(2); + final ExceptionListener listener = new ExceptionListener() + { + public void onException(JMSException exception) + { + try + { + assertNotNull("JMS Exception must have cause", exception.getCause() ); + assertEquals("JMS Exception is of wrong type", AMQNoRouteException.class, exception.getCause().getClass()); + exceptionReceivedLatch.countDown(); + if (exceptionReceivedLatch.getCount() == 0) + { + connection.stop(); // ** Deadlock + connection.close(); + } + } + catch (Throwable t) + { + _lastExceptionListenerException = t; + } + } + }; + connection.setExceptionListener(listener); + + // Create a message listener that receives from testQueue and tries to forward them to unknown queue (thus + // provoking AMQNoRouteException exceptions to be delivered to the ExceptionListener). + final Queue unknownQueue = session.createQueue(getTestQueueName() + "_unknown");; + MessageListener redirectingMessageListener = new MessageListener() + { + @Override + public void onMessage(Message msg) + { + try + { + Session mlSession = connection.createSession(true, Session.SESSION_TRANSACTED); // ** Deadlock + mlSession.createProducer(unknownQueue).send(msg); + mlSession.commit(); + } + catch (JMSException je) + { + // Connection is closed by the listener, so exceptions here are expected. + LOGGER.debug("Expected exception - message listener got exception", je); + } + } + }; + + MessageConsumer consumer = session.createConsumer(messageQueue); + consumer.setMessageListener(redirectingMessageListener); + connection.start(); + + // Await the 2nd exception + boolean exceptionReceived = exceptionReceivedLatch.await(10, TimeUnit.SECONDS); + assertTrue("Exception listener did not hear exception within timeout", exceptionReceived); + assertNull("Exception listener should not have had experienced exception", _lastExceptionListenerException); + } +} diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index a75d96fb85..8ea592daf9 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -62,6 +62,7 @@ org.apache.qpid.test.client.timeouts.SyncWaitTimeoutDelayTest#* // c++ broker doesn't support message bouncing org.apache.qpid.server.exchange.ReturnUnroutableMandatoryMessageTest#* org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testUnsubscribeWhenUsingSelectorMakesTopicUnreachable +org.apache.qpid.test.unit.client.connection.ExceptionListenerTest#testExceptionListenerConnectionStopDeadlock // c++ broker expires messages on delivery or when the queue cleaner thread runs. org.apache.qpid.server.queue.TimeToLiveTest#testActiveTTL diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes index 055ce25d75..02c2db1134 100755 --- a/qpid/java/test-profiles/Java010Excludes +++ b/qpid/java/test-profiles/Java010Excludes @@ -20,6 +20,7 @@ // Those tests are testing 0.8..-0-9-1 specific semantics org.apache.qpid.test.client.ImmediateAndMandatoryPublishingTest#* org.apache.qpid.test.client.CloseOnNoRouteForMandatoryMessageTest#* +org.apache.qpid.test.unit.client.connection.ExceptionListenerTest#testExceptionListenerConnectionStopDeadlock org.apache.qpid.systest.rest.BrokerRestTest#testSetCloseOnNoRoute //this test checks explicitly for 0-8 flow control semantics |
