diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-04-10 13:33:43 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-04-10 13:33:43 +0000 |
| commit | 015c589b44da40867fe08a86c5dd3c8b7c75aae8 (patch) | |
| tree | a7d8d48754657bf9a292d49cdac9c9ded040effd /java/broker | |
| parent | 2ff7d28cb4a453a23811f1a7eda407996601e164 (diff) | |
| download | qpid-python-015c589b44da40867fe08a86c5dd3c8b7c75aae8.tar.gz | |
QPID-2789 : [Java Broker] Prevent additional messages being enqueued after a queue is deleted.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1466482 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java | 27 | ||||
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java | 1 |
2 files changed, 27 insertions, 1 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 246e056f0b..7a3367d215 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.exchange; +import java.util.ArrayList; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.binding.Binding; @@ -47,6 +49,7 @@ import java.util.concurrent.atomic.AtomicLong; public abstract class AbstractExchange implements Exchange { + private static final Logger _logger = Logger.getLogger(AbstractExchange.class); private AMQShortString _name; private final AtomicBoolean _closed = new AtomicBoolean(); @@ -295,7 +298,29 @@ public abstract class AbstractExchange implements Exchange { _receivedMessageCount.incrementAndGet(); _receivedMessageSize.addAndGet(message.getSize()); - final List<? extends BaseQueue> queues = doRoute(message); + List<? extends BaseQueue> queues = doRoute(message); + List<? extends BaseQueue> allQueues = queues; + + boolean deletedQueues = false; + + for(BaseQueue q : allQueues) + { + if(q.isDeleted()) + { + if(!deletedQueues) + { + deletedQueues = true; + queues = new ArrayList<BaseQueue>(allQueues); + } + if(_logger.isDebugEnabled()) + { + _logger.debug("Exchange: " + getName() + " - attempt to enqueue message onto deleted queue " + String.valueOf(q.getNameShortString())); + } + queues.remove(q); + } + } + + if(!queues.isEmpty()) { _routedMessageCount.incrementAndGet(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java index 35b7cac1a1..cef7e2d0c8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java @@ -38,6 +38,7 @@ public interface BaseQueue extends TransactionLogResource void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException; boolean isDurable(); + boolean isDeleted(); AMQShortString getNameShortString(); } |
