From e2e6d542b8cde9e702d1c3b63376e9d8380ba1c7 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Tue, 10 Feb 2015 13:37:00 +0000 Subject: QPID-6374 : tidyup calls to connection task pool git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1658714 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQConnection.java | 40 ++++++++---- .../org/apache/qpid/client/AMQSession_0_8.java | 71 ++++++++++++---------- 2 files changed, 67 insertions(+), 44 deletions(-) (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 5518435b94..4c596b88a0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -32,6 +32,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -1353,16 +1354,23 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (exceptionListener != null) { performConnectionTask(new Runnable() - { - @Override - public void run() - { - // deliver the exception if there is a listener - exceptionListener.onException(je); - } - }); - } - else + { + @Override + public void run() + { + // deliver the exception if there is a listener + try + { + exceptionListener.onException(je); + } + catch (RuntimeException e) + { + _logger.error("Exception occurred in ExceptionListener", e); + } + } + }); + } + else { _logger.error("Throwable Received but no listener set: " + cause); } @@ -1478,7 +1486,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public void performConnectionTask(Runnable task) { - _taskPool.execute(task); + try + { + _taskPool.execute(task); + } + catch (RejectedExecutionException e) + { + if(!(isClosed() || isClosing())) + { + throw e; + } + } } public AMQSession getSession(int channelId) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index bb0f0d9b13..143de271a1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -772,42 +772,47 @@ public class AMQSession_0_8 extends AMQSession