diff options
| author | Keith Wall <kwall@apache.org> | 2014-10-28 14:16:18 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-10-28 14:16:18 +0000 |
| commit | f791d3b16bd935284b312ce80fbee6a904e9e681 (patch) | |
| tree | 091028434511b2e5031efdc7ec8c25be13ca3dc5 | |
| parent | 3dba41c0af250e905e1b37d53d9447aa77a2a0fd (diff) | |
| download | qpid-python-f791d3b16bd935284b312ce80fbee6a904e9e681.tar.gz | |
QPID-6192: [Java Broker] On close, close the connections before exchanges/queues
* Exchanges/Queue now check virtualhost state prior to routing a message to queue/consumer.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1634884 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 33 insertions, 4 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 989a4abea5..29c3007eaf 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -507,8 +507,13 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> final ServerTransaction txn, final Action<? super MessageInstance> postEnqueueAction) { - List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties); + if (_virtualHost.getState() != State.ACTIVE) + { + _logger.debug("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent"); + return 0; + } + List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties); if(queues == null || queues.isEmpty()) { Exchange altExchange = getAlternateExchange(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index bfd1d83249..6d2d7e33b5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -490,6 +490,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED)) { + beforeClose(); closeChildren(); onClose(); unregister(false); @@ -497,6 +498,10 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } } + protected void beforeClose() + { + } + protected void onClose() { } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java index 3dfc272053..a1cbb0fc15 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java @@ -78,9 +78,9 @@ public final class BrokerModel extends Model addRelationship(VirtualHostNode.class, VirtualHost.class); addRelationship(VirtualHostNode.class, RemoteReplicationNode.class); + addRelationship(VirtualHost.class, Connection.class); addRelationship(VirtualHost.class, Exchange.class); addRelationship(VirtualHost.class, Queue.class); - addRelationship(VirtualHost.class, Connection.class); addRelationship(Port.class, VirtualHostAlias.class); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 53e446ba2d..8679e5c43a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -1923,6 +1923,12 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> sub.releaseSendLock(); } } + + if (_virtualHost.getState() != State.ACTIVE) + { + _logger.debug("Subscription flush halted owing to virtualhost state " + _virtualHost.getState()); + return true; + } } } finally @@ -1967,12 +1973,13 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> boolean atTail = false; boolean subActive = sub.isActive() && !sub.isSuspended(); + if (subActive) { QueueEntry node = getNextAvailableEntry(sub); - if (node != null && node.isAvailable()) + if (_virtualHost.getState() == State.ACTIVE && node != null && node.isAvailable()) { if (sub.hasInterest(node) && mightAssign(sub, node)) { @@ -2178,6 +2185,12 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> sub.flushBatched(); break; } + if (_virtualHost.getState() != State.ACTIVE) + { + _logger.debug("Queue process halted owing to virtualhost state " + _virtualHost.getState()); + + break; + } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 97af9ecf2b..a0a89d9f12 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -664,9 +664,15 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte return _broker.getSecurityManager(); } - protected void onClose() + @Override + protected void beforeClose() { setState(State.UNAVAILABLE); + } + + @Override + protected void onClose() + { //Stop Connections _connectionRegistry.close(); _dtxRegistry.close(); |
