From ab57937c2f608245bae1671e52620cd4daf96414 Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Fri, 4 Feb 2011 08:14:00 +0000 Subject: QPID-1670: Implement an UncaughtExceptionHandler to log exceptions causing the permature termination of Qpid client threads. Applied patch from Keith Wall git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1067108 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 37 +++++++++++++------- .../qpid/client/protocol/AMQProtocolHandler.java | 39 ++++++++++++++++++---- 2 files changed, 57 insertions(+), 19 deletions(-) (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b5c41e483c..1f940b62f0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -119,7 +119,6 @@ import org.slf4j.LoggerFactory; */ public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession { - public static final class IdToConsumerMap { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; @@ -363,7 +362,7 @@ public abstract class AMQSessionNote that this operation succeeds automatically if a fail-over interupts the sycnronous request to close + *

Note that this operation succeeds automatically if a fail-over interrupts the synchronous request to close * the channel. This is because the channel is marked as closed before the request to close it is made, so the * fail-over should not re-open it. * - * @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker. + * @param timeout The timeout in milliseconds to wait for the session close acknowledgement from the broker. * * @throws JMSException If the JMS provider fails to close the session due to some internal error. * @todo Be aware of possible changes to parameter order as versions change. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index c16941b341..eb5af119b2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -28,6 +28,7 @@ import java.util.Iterator; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.mina.filter.codec.ProtocolCodecException; @@ -63,6 +64,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.network.io.IoTransport; import org.slf4j.Logger; @@ -100,7 +102,7 @@ import org.slf4j.LoggerFactory; * connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection * and the protocol session data is held outside of the MINA IOSession. * - *

This handler is responsibile for setting up the filter chain to filter all events for this handler through. + *

This handler is responsible for setting up the filter chain to filter all events for this handler through. * The filter chain is set up as a stack of event handers that perform the following functions (working upwards from * the network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work, * optionally handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself. @@ -114,8 +116,8 @@ import org.slf4j.LoggerFactory; * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could - * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data - * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so + * be merged, although there is sense in keeping the session model separate. Will clarify things by having data + * held per protocol handler, per protocol session, per network connection, per channel, in separate classes, so * that lifecycles of the fields match lifecycles of their containing objects. */ public class AMQProtocolHandler implements ProtocolEngine @@ -158,7 +160,7 @@ public class AMQProtocolHandler implements ProtocolEngine /** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */ private CountDownLatch _failoverLatch; - /** The last failover exception that occured */ + /** The last failover exception that occurred */ private FailoverException _lastFailoverException; /** Defines the default timeout to use for synchronous protocol commands. */ @@ -187,6 +189,21 @@ public class AMQProtocolHandler implements ProtocolEngine _protocolSession = new AMQProtocolSession(this, _connection); _stateManager = new AMQStateManager(_protocolSession); _codecFactory = new AMQCodecFactory(false, _protocolSession); + _poolReference.setThreadFactory(new ThreadFactory() + { + + public Thread newThread(final Runnable runnable) + { + try + { + return Threading.getThreadFactory().createThread(runnable); + } + catch (Exception e) + { + throw new RuntimeException("Failed to create thread", e); + } + } + }); _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true); _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false); _poolReference.acquireExecutorService(); @@ -275,7 +292,15 @@ public class AMQProtocolHandler implements ProtocolEngine { if(!_connection.isClosed()) { - Thread failoverThread = new Thread(_failoverHandler); + final Thread failoverThread; + try + { + failoverThread = Threading.getThreadFactory().createThread(_failoverHandler); + } + catch (Exception e) + { + throw new RuntimeException("Failed to create thread", e); + } failoverThread.setName("Failover"); // Do not inherit daemon-ness from current thread as this can be a daemon // thread such as a AnonymousIoService thread. @@ -369,7 +394,7 @@ public class AMQProtocolHandler implements ProtocolEngine } /** - * This caters for the case where we only need to propogate an exception to the the frame listeners to interupt any + * This caters for the case where we only need to propagate an exception to the the frame listeners to interupt any * protocol level waits. * * This will would normally be used to notify all Frame Listeners that Failover is about to occur and they should @@ -407,7 +432,7 @@ public class AMQProtocolHandler implements ProtocolEngine } //Only notify the Frame listeners that failover is going to occur as the State listeners shouldn't be - // interupted unless failover cannot restore the state. + // interrupted unless failover cannot restore the state. propagateExceptionToFrameListeners(_lastFailoverException); } -- cgit v1.2.1