summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-07-22 14:30:57 +0000
committerKeith Wall <kwall@apache.org>2014-07-22 14:30:57 +0000
commit3b2ba30a14f5c03c80be5562e2b6a6f03f2c9d47 (patch)
tree10fab2e5b7647ae69e0a26735fbb07ff490cf607 /qpid/java
parent74e5f4d065e6b7117704575e00cfb4c59869070b (diff)
downloadqpid-python-3b2ba30a14f5c03c80be5562e2b6a6f03f2c9d47.tar.gz
QPID-5912: [Java Broker] Prevent failure to send to a consumer on the straight through delivery path from preventing the message being enqueued to the store.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1612578 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java161
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java17
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java9
3 files changed, 99 insertions, 88 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index f2ac1a7cdc..1d981beb54 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -82,11 +82,13 @@ import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.Deletable;
import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.transport.TransportException;
public abstract class AbstractQueue<X extends AbstractQueue<X>>
extends AbstractConfiguredObject<X>
@@ -931,90 +933,115 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
final QueueConsumer<?> exclusiveSub = _exclusiveSubscriber;
final QueueEntry entry = getEntries().add(message);
- if(action != null || (exclusiveSub == null && _queueRunner.isIdle()))
+ try
{
- /*
- * iterate over consumers and if any is at the end of the queue and can deliver this message,
- * then deliver the message
- */
-
- Subject.doAs(SecurityManager.getSystemTaskSubject("Immediate Delivery"),
- new PrivilegedAction<Object>()
- {
- @Override
- public Object run()
+ if (action != null || (exclusiveSub == null && _queueRunner.isIdle()))
+ {
+ Subject.doAs(SecurityManager.getSystemTaskSubject("Immediate Delivery"),
+ new PrivilegedAction<Void>()
{
-
- QueueConsumerList.ConsumerNode node = _consumerList.getMarkedNode();
- QueueConsumerList.ConsumerNode nextNode = node.findNext();
- if (nextNode == null)
- {
- nextNode = _consumerList.getHead().findNext();
- }
- while (nextNode != null)
+ @Override
+ public Void run()
{
- if (_consumerList.updateMarkedNode(node, nextNode))
- {
- break;
- }
- else
- {
- node = _consumerList.getMarkedNode();
- nextNode = node.findNext();
- if (nextNode == null)
- {
- nextNode = _consumerList.getHead().findNext();
- }
- }
+ tryDeliverStraightThrough(entry);
+ return null;
}
- // always do one extra loop after we believe we've finished
- // this catches the case where we *just* miss an update
- int loops = 2;
-
- while (entry.isAvailable() && loops != 0)
- {
- if (nextNode == null)
- {
- loops--;
- nextNode = _consumerList.getHead();
- }
- else
- {
- // if consumer at end, and active, offer
- final QueueConsumer<?> sub = nextNode.getConsumer();
- deliverToConsumer(sub, entry);
-
+ }
+ );
+ }
- }
- nextNode = nextNode.findNext();
+ if (entry.isAvailable())
+ {
+ checkConsumersNotAheadOfDelivery(entry);
- }
+ if (exclusiveSub != null)
+ {
+ deliverAsync(exclusiveSub);
+ }
+ else
+ {
+ deliverAsync();
+ }
+ }
- return null;
- }
- });
+ checkForNotification(entry.getMessage());
+ }
+ finally
+ {
+ if(action != null)
+ {
+ action.performAction(entry);
+ }
}
+ }
- if (entry.isAvailable())
+ /**
+ * iterate over consumers and if any is at the end of the queue and can deliver this message,
+ * then deliver the message
+ */
+ private void tryDeliverStraightThrough(final QueueEntry entry)
+ {
+ try
{
- checkConsumersNotAheadOfDelivery(entry);
-
- if (exclusiveSub != null)
+ QueueConsumerList.ConsumerNode node = _consumerList.getMarkedNode();
+ QueueConsumerList.ConsumerNode nextNode = node.findNext();
+ if (nextNode == null)
{
- deliverAsync(exclusiveSub);
+ nextNode = _consumerList.getHead().findNext();
}
- else
+ while (nextNode != null)
{
- deliverAsync();
- }
- }
+ if (_consumerList.updateMarkedNode(node, nextNode))
+ {
+ break;
+ }
+ else
+ {
+ node = _consumerList.getMarkedNode();
+ nextNode = node.findNext();
+ if (nextNode == null)
+ {
+ nextNode = _consumerList.getHead().findNext();
+ }
+ }
+ }
+ // always do one extra loop after we believe we've finished
+ // this catches the case where we *just* miss an update
+ int loops = 2;
- checkForNotification(entry.getMessage());
+ while (entry.isAvailable() && loops != 0)
+ {
+ if (nextNode == null)
+ {
+ loops--;
+ nextNode = _consumerList.getHead();
+ }
+ else
+ {
+ // if consumer at end, and active, offer
+ final QueueConsumer<?> sub = nextNode.getConsumer();
+ deliverToConsumer(sub, entry);
+
+
+ }
+ nextNode = nextNode.findNext();
- if(action != null)
+ }
+ }
+ catch (ConnectionScopedRuntimeException | TransportException e)
{
- action.performAction(entry);
+ String errorMessage = "Suppressing " + e.getClass().getSimpleName() +
+ " during straight through delivery, as this" +
+ " can only indicate an issue with a consumer.";
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug(errorMessage, e);
+ }
+ else
+ {
+ _logger.info(errorMessage + ' ' + e.getMessage());
+ }
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
index f3f644b2f9..b4fb2aab9f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
@@ -21,16 +21,13 @@
package org.apache.qpid.server.queue;
import java.security.PrivilegedAction;
-import java.util.Collections;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.security.*;
import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.TaskPrincipal;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.transport.TransportException;
@@ -79,7 +76,7 @@ public class QueueRunner implements Runnable
{
runAgain = _queue.processQueue(QueueRunner.this);
}
- catch (final ConnectionScopedRuntimeException e)
+ catch (ConnectionScopedRuntimeException | TransportException e)
{
final String errorMessage = "Problem during asynchronous delivery by " + toString();
if(_logger.isDebugEnabled())
@@ -91,18 +88,6 @@ public class QueueRunner implements Runnable
_logger.info(errorMessage + ' ' + e.getMessage());
}
}
- catch (final TransportException transe)
- {
- final String errorMessage = "Problem during asynchronous delivery by " + toString();
- if(_logger.isDebugEnabled())
- {
- _logger.debug(errorMessage, transe);
- }
- else
- {
- _logger.info(errorMessage + ' ' + transe.getMessage());
- }
- }
finally
{
_scheduled.compareAndSet(RUNNING, IDLE);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
index 73a23cd53e..b4f0fde1f7 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
@@ -24,12 +24,11 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.TaskPrincipal;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.transport.TransportException;
import javax.security.auth.Subject;
import java.security.PrivilegedAction;
-import java.util.Collections;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -73,16 +72,16 @@ class SubFlushRunner implements Runnable
{
complete = getQueue().flushConsumer(_sub, ITERATIONS);
}
- catch (final TransportException transe)
+ catch (ConnectionScopedRuntimeException | TransportException e)
{
final String errorMessage = "Problem during asynchronous delivery by " + toString();
if(_logger.isDebugEnabled())
{
- _logger.debug(errorMessage, transe);
+ _logger.debug(errorMessage, e);
}
else
{
- _logger.info(errorMessage + ' ' + transe.getMessage());
+ _logger.info(errorMessage + ' ' + e.getMessage());
}
}
finally