summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-10-28 14:16:18 +0000
committerKeith Wall <kwall@apache.org>2014-10-28 14:16:18 +0000
commitf791d3b16bd935284b312ce80fbee6a904e9e681 (patch)
tree091028434511b2e5031efdc7ec8c25be13ca3dc5
parent3dba41c0af250e905e1b37d53d9447aa77a2a0fd (diff)
downloadqpid-python-f791d3b16bd935284b312ce80fbee6a904e9e681.tar.gz
QPID-6192: [Java Broker] On close, close the connections before exchanges/queues
* Exchanges/Queue now check virtualhost state prior to routing a message to queue/consumer. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1634884 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java15
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java8
5 files changed, 33 insertions, 4 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 989a4abea5..29c3007eaf 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -507,8 +507,13 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
final ServerTransaction txn,
final Action<? super MessageInstance> postEnqueueAction)
{
- List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties);
+ if (_virtualHost.getState() != State.ACTIVE)
+ {
+ _logger.debug("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
+ return 0;
+ }
+ List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties);
if(queues == null || queues.isEmpty())
{
Exchange altExchange = getAlternateExchange();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index bfd1d83249..6d2d7e33b5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -490,6 +490,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
{
+ beforeClose();
closeChildren();
onClose();
unregister(false);
@@ -497,6 +498,10 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
+ protected void beforeClose()
+ {
+ }
+
protected void onClose()
{
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
index 3dfc272053..a1cbb0fc15 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
@@ -78,9 +78,9 @@ public final class BrokerModel extends Model
addRelationship(VirtualHostNode.class, VirtualHost.class);
addRelationship(VirtualHostNode.class, RemoteReplicationNode.class);
+ addRelationship(VirtualHost.class, Connection.class);
addRelationship(VirtualHost.class, Exchange.class);
addRelationship(VirtualHost.class, Queue.class);
- addRelationship(VirtualHost.class, Connection.class);
addRelationship(Port.class, VirtualHostAlias.class);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 53e446ba2d..8679e5c43a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1923,6 +1923,12 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
sub.releaseSendLock();
}
}
+
+ if (_virtualHost.getState() != State.ACTIVE)
+ {
+ _logger.debug("Subscription flush halted owing to virtualhost state " + _virtualHost.getState());
+ return true;
+ }
}
}
finally
@@ -1967,12 +1973,13 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
boolean atTail = false;
boolean subActive = sub.isActive() && !sub.isSuspended();
+
if (subActive)
{
QueueEntry node = getNextAvailableEntry(sub);
- if (node != null && node.isAvailable())
+ if (_virtualHost.getState() == State.ACTIVE && node != null && node.isAvailable())
{
if (sub.hasInterest(node) && mightAssign(sub, node))
{
@@ -2178,6 +2185,12 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
sub.flushBatched();
break;
}
+ if (_virtualHost.getState() != State.ACTIVE)
+ {
+ _logger.debug("Queue process halted owing to virtualhost state " + _virtualHost.getState());
+
+ break;
+ }
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 97af9ecf2b..a0a89d9f12 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -664,9 +664,15 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
return _broker.getSecurityManager();
}
- protected void onClose()
+ @Override
+ protected void beforeClose()
{
setState(State.UNAVAILABLE);
+ }
+
+ @Override
+ protected void onClose()
+ {
//Stop Connections
_connectionRegistry.close();
_dtxRegistry.close();