diff options
| author | Keith Wall <kwall@apache.org> | 2014-07-22 14:30:57 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-07-22 14:30:57 +0000 |
| commit | 3b2ba30a14f5c03c80be5562e2b6a6f03f2c9d47 (patch) | |
| tree | 10fab2e5b7647ae69e0a26735fbb07ff490cf607 /qpid/java | |
| parent | 74e5f4d065e6b7117704575e00cfb4c59869070b (diff) | |
| download | qpid-python-3b2ba30a14f5c03c80be5562e2b6a6f03f2c9d47.tar.gz | |
QPID-5912: [Java Broker] Prevent failure to send to a consumer on the straight through delivery path from preventing the message being enqueued to the store.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1612578 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
3 files changed, 99 insertions, 88 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index f2ac1a7cdc..1d981beb54 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -82,11 +82,13 @@ import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.Deletable; import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.transport.TransportException; public abstract class AbstractQueue<X extends AbstractQueue<X>> extends AbstractConfiguredObject<X> @@ -931,90 +933,115 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> final QueueConsumer<?> exclusiveSub = _exclusiveSubscriber; final QueueEntry entry = getEntries().add(message); - if(action != null || (exclusiveSub == null && _queueRunner.isIdle())) + try { - /* - * iterate over consumers and if any is at the end of the queue and can deliver this message, - * then deliver the message - */ - - Subject.doAs(SecurityManager.getSystemTaskSubject("Immediate Delivery"), - new PrivilegedAction<Object>() - { - @Override - public Object run() + if (action != null || (exclusiveSub == null && _queueRunner.isIdle())) + { + Subject.doAs(SecurityManager.getSystemTaskSubject("Immediate Delivery"), + new PrivilegedAction<Void>() { - - QueueConsumerList.ConsumerNode node = _consumerList.getMarkedNode(); - QueueConsumerList.ConsumerNode nextNode = node.findNext(); - if (nextNode == null) - { - nextNode = _consumerList.getHead().findNext(); - } - while (nextNode != null) + @Override + public Void run() { - if (_consumerList.updateMarkedNode(node, nextNode)) - { - break; - } - else - { - node = _consumerList.getMarkedNode(); - nextNode = node.findNext(); - if (nextNode == null) - { - nextNode = _consumerList.getHead().findNext(); - } - } + tryDeliverStraightThrough(entry); + return null; } - // always do one extra loop after we believe we've finished - // this catches the case where we *just* miss an update - int loops = 2; - - while (entry.isAvailable() && loops != 0) - { - if (nextNode == null) - { - loops--; - nextNode = _consumerList.getHead(); - } - else - { - // if consumer at end, and active, offer - final QueueConsumer<?> sub = nextNode.getConsumer(); - deliverToConsumer(sub, entry); - + } + ); + } - } - nextNode = nextNode.findNext(); + if (entry.isAvailable()) + { + checkConsumersNotAheadOfDelivery(entry); - } + if (exclusiveSub != null) + { + deliverAsync(exclusiveSub); + } + else + { + deliverAsync(); + } + } - return null; - } - }); + checkForNotification(entry.getMessage()); + } + finally + { + if(action != null) + { + action.performAction(entry); + } } + } - if (entry.isAvailable()) + /** + * iterate over consumers and if any is at the end of the queue and can deliver this message, + * then deliver the message + */ + private void tryDeliverStraightThrough(final QueueEntry entry) + { + try { - checkConsumersNotAheadOfDelivery(entry); - - if (exclusiveSub != null) + QueueConsumerList.ConsumerNode node = _consumerList.getMarkedNode(); + QueueConsumerList.ConsumerNode nextNode = node.findNext(); + if (nextNode == null) { - deliverAsync(exclusiveSub); + nextNode = _consumerList.getHead().findNext(); } - else + while (nextNode != null) { - deliverAsync(); - } - } + if (_consumerList.updateMarkedNode(node, nextNode)) + { + break; + } + else + { + node = _consumerList.getMarkedNode(); + nextNode = node.findNext(); + if (nextNode == null) + { + nextNode = _consumerList.getHead().findNext(); + } + } + } + // always do one extra loop after we believe we've finished + // this catches the case where we *just* miss an update + int loops = 2; - checkForNotification(entry.getMessage()); + while (entry.isAvailable() && loops != 0) + { + if (nextNode == null) + { + loops--; + nextNode = _consumerList.getHead(); + } + else + { + // if consumer at end, and active, offer + final QueueConsumer<?> sub = nextNode.getConsumer(); + deliverToConsumer(sub, entry); + + + } + nextNode = nextNode.findNext(); - if(action != null) + } + } + catch (ConnectionScopedRuntimeException | TransportException e) { - action.performAction(entry); + String errorMessage = "Suppressing " + e.getClass().getSimpleName() + + " during straight through delivery, as this" + + " can only indicate an issue with a consumer."; + if(_logger.isDebugEnabled()) + { + _logger.debug(errorMessage, e); + } + else + { + _logger.info(errorMessage + ' ' + e.getMessage()); + } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java index f3f644b2f9..b4fb2aab9f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java @@ -21,16 +21,13 @@ package org.apache.qpid.server.queue; import java.security.PrivilegedAction; -import java.util.Collections; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; -import org.apache.qpid.server.security.*; import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.security.auth.TaskPrincipal; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.transport.TransportException; @@ -79,7 +76,7 @@ public class QueueRunner implements Runnable { runAgain = _queue.processQueue(QueueRunner.this); } - catch (final ConnectionScopedRuntimeException e) + catch (ConnectionScopedRuntimeException | TransportException e) { final String errorMessage = "Problem during asynchronous delivery by " + toString(); if(_logger.isDebugEnabled()) @@ -91,18 +88,6 @@ public class QueueRunner implements Runnable _logger.info(errorMessage + ' ' + e.getMessage()); } } - catch (final TransportException transe) - { - final String errorMessage = "Problem during asynchronous delivery by " + toString(); - if(_logger.isDebugEnabled()) - { - _logger.debug(errorMessage, transe); - } - else - { - _logger.info(errorMessage + ' ' + transe.getMessage()); - } - } finally { _scheduled.compareAndSet(RUNNING, IDLE); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java index 73a23cd53e..b4f0fde1f7 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java @@ -24,12 +24,11 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.security.auth.TaskPrincipal; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.transport.TransportException; import javax.security.auth.Subject; import java.security.PrivilegedAction; -import java.util.Collections; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -73,16 +72,16 @@ class SubFlushRunner implements Runnable { complete = getQueue().flushConsumer(_sub, ITERATIONS); } - catch (final TransportException transe) + catch (ConnectionScopedRuntimeException | TransportException e) { final String errorMessage = "Problem during asynchronous delivery by " + toString(); if(_logger.isDebugEnabled()) { - _logger.debug(errorMessage, transe); + _logger.debug(errorMessage, e); } else { - _logger.info(errorMessage + ' ' + transe.getMessage()); + _logger.info(errorMessage + ' ' + e.getMessage()); } } finally |
