summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2014-08-28 09:43:27 +0000
committerAlex Rudyy <orudyy@apache.org>2014-08-28 09:43:27 +0000
commit9a5e9e59763989f2e1ceb8e7bc32376a203bc0d9 (patch)
tree77ec1d60e28f2c56cd9d0617415f7abccfc1d1eb /qpid/java
parentbdabd82ea49d500770cd680e97bdefcaeb3b48b4 (diff)
downloadqpid-python-9a5e9e59763989f2e1ceb8e7bc32376a203bc0d9.tar.gz
QPID-6051: Fix handling of exceptions thrown from post commit or deferred actions on transaction commit
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1621106 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java32
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java14
2 files changed, 36 insertions, 10 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index e371bcdb02..f5d32d2e20 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -24,6 +24,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -279,9 +281,9 @@ public class LocalTransaction implements ServerTransaction
public StoreFuture commitAsync(final Runnable deferred)
{
sync();
+ StoreFuture future = StoreFuture.IMMEDIATE_FUTURE;
try
{
- StoreFuture future = StoreFuture.IMMEDIATE_FUTURE;
if(_transaction != null)
{
future = new StoreFuture()
@@ -325,8 +327,7 @@ public class LocalTransaction implements ServerTransaction
}
catch (RuntimeException e)
{
- doRollbackActions();
- throw e;
+ handleUnexpectedException(e);
}
finally
{
@@ -350,21 +351,40 @@ public class LocalTransaction implements ServerTransaction
}
}
- return future;
}
catch (RuntimeException e)
{
try
{
- doRollbackActions();
+ handleUnexpectedException(e);
}
finally
{
resetDetails();
}
- throw e;
}
+ return future;
+ }
+ private void handleUnexpectedException(RuntimeException e)
+ {
+ if(e instanceof ConnectionScopedRuntimeException || e instanceof TransportException)
+ {
+ throw e;
+ }
+ else
+ {
+ _logger.error("Unexpected exception on execution of post commit deferred actions", e);
+ boolean continueOnError = Boolean.getBoolean("qpid.broker.exceptionHandler.continue");
+ if (continueOnError)
+ {
+ throw e;
+ }
+ else
+ {
+ Runtime.getRuntime().halt(1);
+ }
+ }
}
private void doPostTransactionActions()
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index d1ec2e139e..f6ef4256d0 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -1109,10 +1109,16 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
@Override
public void run()
{
- immediateAction.run();
- _txnCommits.incrementAndGet();
- _txnStarts.incrementAndGet();
- decrementOutstandingTxnsIfNecessary();
+ try
+ {
+ immediateAction.run();
+ }
+ finally
+ {
+ _txnCommits.incrementAndGet();
+ _txnStarts.incrementAndGet();
+ decrementOutstandingTxnsIfNecessary();
+ }
}
});
}