From 9a5e9e59763989f2e1ceb8e7bc32376a203bc0d9 Mon Sep 17 00:00:00 2001 From: Alex Rudyy Date: Thu, 28 Aug 2014 09:43:27 +0000 Subject: QPID-6051: Fix handling of exceptions thrown from post commit or deferred actions on transaction commit git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1621106 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/server/txn/LocalTransaction.java | 32 ++++++++++++++++++---- .../qpid/server/protocol/v0_8/AMQChannel.java | 14 +++++++--- 2 files changed, 36 insertions(+), 10 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index e371bcdb02..f5d32d2e20 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -24,6 +24,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; +import org.apache.qpid.transport.TransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -279,9 +281,9 @@ public class LocalTransaction implements ServerTransaction public StoreFuture commitAsync(final Runnable deferred) { sync(); + StoreFuture future = StoreFuture.IMMEDIATE_FUTURE; try { - StoreFuture future = StoreFuture.IMMEDIATE_FUTURE; if(_transaction != null) { future = new StoreFuture() @@ -325,8 +327,7 @@ public class LocalTransaction implements ServerTransaction } catch (RuntimeException e) { - doRollbackActions(); - throw e; + handleUnexpectedException(e); } finally { @@ -350,21 +351,40 @@ public class LocalTransaction implements ServerTransaction } } - return future; } catch (RuntimeException e) { try { - doRollbackActions(); + handleUnexpectedException(e); } finally { resetDetails(); } - throw e; } + return future; + } + private void handleUnexpectedException(RuntimeException e) + { + if(e instanceof ConnectionScopedRuntimeException || e instanceof TransportException) + { + throw e; + } + else + { + _logger.error("Unexpected exception on execution of post commit deferred actions", e); + boolean continueOnError = Boolean.getBoolean("qpid.broker.exceptionHandler.continue"); + if (continueOnError) + { + throw e; + } + else + { + Runtime.getRuntime().halt(1); + } + } } private void doPostTransactionActions() diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index d1ec2e139e..f6ef4256d0 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -1109,10 +1109,16 @@ public class AMQChannel> @Override public void run() { - immediateAction.run(); - _txnCommits.incrementAndGet(); - _txnStarts.incrementAndGet(); - decrementOutstandingTxnsIfNecessary(); + try + { + immediateAction.run(); + } + finally + { + _txnCommits.incrementAndGet(); + _txnStarts.incrementAndGet(); + decrementOutstandingTxnsIfNecessary(); + } } }); } -- cgit v1.2.1