summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-02-09 14:31:33 +0000
committerKeith Wall <kwall@apache.org>2012-02-09 14:31:33 +0000
commit6ac39fefbb2d113c0f4413cd04d2019d92125292 (patch)
tree5ea6cc6f8eca48f980db413e7a0347b129c06888 /java/broker/src/main/java/org
parent9ea403d9d1f8ce1b01dfe0cc6e7a4e76a27b38c4 (diff)
downloadqpid-python-6ac39fefbb2d113c0f4413cd04d2019d92125292.tar.gz
QPID-3821: Uncaught exception thrown in QueueRunner.run() could cause QueueRunner to remain stuck in RUNNING state permanently
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1242339 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main/java/org')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java24
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java27
2 files changed, 30 insertions, 21 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
index e9d4290174..22a2029494 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
@@ -46,7 +46,6 @@ public class QueueRunner implements Runnable
private static int SCHEDULED = 1;
private static int RUNNING = 2;
-
private final AtomicInteger _scheduled = new AtomicInteger(IDLE);
private final AtomicBoolean _stateChange = new AtomicBoolean();
@@ -54,8 +53,6 @@ public class QueueRunner implements Runnable
private final AtomicLong _lastRunAgain = new AtomicLong();
private final AtomicLong _lastRunTime = new AtomicLong();
- private long _continues;
-
public QueueRunner(SimpleAMQQueue queue)
{
_queue = queue;
@@ -86,23 +83,22 @@ public class QueueRunner implements Runnable
}
else
{
- _logger.info(errorMessage + transe.getMessage());
+ _logger.info(errorMessage + ' ' + transe.getMessage());
}
}
finally
{
CurrentActor.remove();
- }
- _scheduled.compareAndSet(RUNNING, IDLE);
- long stateChangeCount = _queue.getStateChangeCount();
- _lastRunAgain.set(runAgain);
- _lastRunTime.set(System.nanoTime());
- if(runAgain == 0L || runAgain != stateChangeCount || _stateChange.compareAndSet(true,false))
- {
- _continues++;
- if(_scheduled.compareAndSet(IDLE, SCHEDULED))
+ _scheduled.compareAndSet(RUNNING, IDLE);
+ final long stateChangeCount = _queue.getStateChangeCount();
+ _lastRunAgain.set(runAgain);
+ _lastRunTime.set(System.nanoTime());
+ if(runAgain == 0L || runAgain != stateChangeCount || _stateChange.compareAndSet(true,false))
{
- _queue.execute(this);
+ if(_scheduled.compareAndSet(IDLE, SCHEDULED))
+ {
+ _queue.execute(this);
+ }
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
index 857625bd65..8f3b7ae4ce 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
@@ -26,6 +26,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.transport.TransportException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -68,18 +69,30 @@ class SubFlushRunner implements Runnable
}
catch (AMQException e)
{
- _logger.error(e);
+ _logger.error("Exception during asynchronous delivery by " + toString(), e);
}
- finally
+ catch (final TransportException transe)
{
- CurrentActor.remove();
+ final String errorMessage = "Problem during asynchronous delivery by " + toString();
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug(errorMessage, transe);
+ }
+ else
+ {
+ _logger.info(errorMessage + ' ' + transe.getMessage());
+ }
}
- _scheduled.compareAndSet(RUNNING, IDLE);
- if ((!complete || _stateChange.compareAndSet(true,false))&& !_sub.isSuspended())
+ finally
{
- if(_scheduled.compareAndSet(IDLE,SCHEDULED))
+ CurrentActor.remove();
+ _scheduled.compareAndSet(RUNNING, IDLE);
+ if ((!complete || _stateChange.compareAndSet(true,false))&& !_sub.isSuspended())
{
- getQueue().execute(this);
+ if(_scheduled.compareAndSet(IDLE,SCHEDULED))
+ {
+ getQueue().execute(this);
+ }
}
}
}