diff options
author | Keith Wall <kwall@apache.org> | 2012-02-09 14:31:33 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2012-02-09 14:31:33 +0000 |
commit | 6ac39fefbb2d113c0f4413cd04d2019d92125292 (patch) | |
tree | 5ea6cc6f8eca48f980db413e7a0347b129c06888 /java/broker/src | |
parent | 9ea403d9d1f8ce1b01dfe0cc6e7a4e76a27b38c4 (diff) | |
download | qpid-python-6ac39fefbb2d113c0f4413cd04d2019d92125292.tar.gz |
QPID-3821: Uncaught exception thrown in QueueRunner.run() could cause QueueRunner to remain stuck in RUNNING state permanently
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1242339 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java | 24 | ||||
-rwxr-xr-x | java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java | 27 |
2 files changed, 30 insertions, 21 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java index e9d4290174..22a2029494 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java @@ -46,7 +46,6 @@ public class QueueRunner implements Runnable private static int SCHEDULED = 1; private static int RUNNING = 2; - private final AtomicInteger _scheduled = new AtomicInteger(IDLE); private final AtomicBoolean _stateChange = new AtomicBoolean(); @@ -54,8 +53,6 @@ public class QueueRunner implements Runnable private final AtomicLong _lastRunAgain = new AtomicLong(); private final AtomicLong _lastRunTime = new AtomicLong(); - private long _continues; - public QueueRunner(SimpleAMQQueue queue) { _queue = queue; @@ -86,23 +83,22 @@ public class QueueRunner implements Runnable } else { - _logger.info(errorMessage + transe.getMessage()); + _logger.info(errorMessage + ' ' + transe.getMessage()); } } finally { CurrentActor.remove(); - } - _scheduled.compareAndSet(RUNNING, IDLE); - long stateChangeCount = _queue.getStateChangeCount(); - _lastRunAgain.set(runAgain); - _lastRunTime.set(System.nanoTime()); - if(runAgain == 0L || runAgain != stateChangeCount || _stateChange.compareAndSet(true,false)) - { - _continues++; - if(_scheduled.compareAndSet(IDLE, SCHEDULED)) + _scheduled.compareAndSet(RUNNING, IDLE); + final long stateChangeCount = _queue.getStateChangeCount(); + _lastRunAgain.set(runAgain); + _lastRunTime.set(System.nanoTime()); + if(runAgain == 0L || runAgain != stateChangeCount || _stateChange.compareAndSet(true,false)) { - _queue.execute(this); + if(_scheduled.compareAndSet(IDLE, SCHEDULED)) + { + _queue.execute(this); + } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java index 857625bd65..8f3b7ae4ce 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java @@ -26,6 +26,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.transport.TransportException; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -68,18 +69,30 @@ class SubFlushRunner implements Runnable } catch (AMQException e) { - _logger.error(e); + _logger.error("Exception during asynchronous delivery by " + toString(), e); } - finally + catch (final TransportException transe) { - CurrentActor.remove(); + final String errorMessage = "Problem during asynchronous delivery by " + toString(); + if(_logger.isDebugEnabled()) + { + _logger.debug(errorMessage, transe); + } + else + { + _logger.info(errorMessage + ' ' + transe.getMessage()); + } } - _scheduled.compareAndSet(RUNNING, IDLE); - if ((!complete || _stateChange.compareAndSet(true,false))&& !_sub.isSuspended()) + finally { - if(_scheduled.compareAndSet(IDLE,SCHEDULED)) + CurrentActor.remove(); + _scheduled.compareAndSet(RUNNING, IDLE); + if ((!complete || _stateChange.compareAndSet(true,false))&& !_sub.isSuspended()) { - getQueue().execute(this); + if(_scheduled.compareAndSet(IDLE,SCHEDULED)) + { + getQueue().execute(this); + } } } } |