diff options
Diffstat (limited to 'java')
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java | 27 | ||||
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java | 1 |
2 files changed, 27 insertions, 1 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 246e056f0b..7a3367d215 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.exchange; +import java.util.ArrayList; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.binding.Binding; @@ -47,6 +49,7 @@ import java.util.concurrent.atomic.AtomicLong; public abstract class AbstractExchange implements Exchange { + private static final Logger _logger = Logger.getLogger(AbstractExchange.class); private AMQShortString _name; private final AtomicBoolean _closed = new AtomicBoolean(); @@ -295,7 +298,29 @@ public abstract class AbstractExchange implements Exchange { _receivedMessageCount.incrementAndGet(); _receivedMessageSize.addAndGet(message.getSize()); - final List<? extends BaseQueue> queues = doRoute(message); + List<? extends BaseQueue> queues = doRoute(message); + List<? extends BaseQueue> allQueues = queues; + + boolean deletedQueues = false; + + for(BaseQueue q : allQueues) + { + if(q.isDeleted()) + { + if(!deletedQueues) + { + deletedQueues = true; + queues = new ArrayList<BaseQueue>(allQueues); + } + if(_logger.isDebugEnabled()) + { + _logger.debug("Exchange: " + getName() + " - attempt to enqueue message onto deleted queue " + String.valueOf(q.getNameShortString())); + } + queues.remove(q); + } + } + + if(!queues.isEmpty()) { _routedMessageCount.incrementAndGet(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java index 35b7cac1a1..cef7e2d0c8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java @@ -38,6 +38,7 @@ public interface BaseQueue extends TransactionLogResource void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException; boolean isDurable(); + boolean isDeleted(); AMQShortString getNameShortString(); } |
