summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-05-20 15:04:11 +0000
committerKeith Wall <kwall@apache.org>2014-05-20 15:04:11 +0000
commit65af0e588588f15bbb1498403ee49f3c41ed04a0 (patch)
tree6c249ba8f011e0b033caa5ef77ec10fc5dbcfdc4 /qpid/java/broker-core
parent7371feb185388d4bedda4ac10ee7c78a17023a7e (diff)
downloadqpid-python-65af0e588588f15bbb1498403ee49f3c41ed04a0.tar.gz
QPID-5715: [Java Broker] Make virtualhosts respect the states ACTIVE and STOPPED
* Changes in virtualhost state are now persisted to store. * VirtualHostState eliminated. The PASSIVE state used when a BDB HA Virtualhost is in replica is replaced by UNAVAILABLE. Work by me and Andrew MacBean <andymacbean@gmail.com>. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1596281 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/State.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java118
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostState.java31
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java4
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java5
6 files changed, 67 insertions, 96 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/State.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/State.java
index f021db009e..881991b2d2 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/State.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/State.java
@@ -27,7 +27,8 @@ public enum State
STOPPED,
ACTIVE,
DELETED,
- ERRORED(false);
+ ERRORED(false),
+ UNAVAILABLE;
private final boolean _valid;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index cd3d7c889c..2bec380820 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.virtualhost;
import java.security.AccessControlException;
import java.security.PrivilegedAction;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -37,6 +36,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
@@ -108,7 +108,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
private final SystemNodeRegistry _systemNodeRegistry = new SystemNodeRegistry();
- private volatile VirtualHostState _state = VirtualHostState.INITIALISING;
+ private final AtomicReference<State> _state = new AtomicReference<>(State.UNINITIALIZED);
private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
@@ -273,18 +273,15 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
}
}
- }
- private void checkVHostStateIsActive()
- {
- checkVHostState(VirtualHostState.ACTIVE);
+ addChangeListener(new StoreUpdatingChangeListener());
}
- private void checkVHostState(VirtualHostState... states)
+ private void checkVHostStateIsActive()
{
- if (!Arrays.asList(states).contains(getVirtualHostState()))
+ if (_state.get() != State.ACTIVE)
{
- throw new IllegalStateException("The virtual hosts state of " + getVirtualHostState()
+ throw new IllegalStateException("The virtual host state of " + _state.get()
+ " does not permit this operation.");
}
}
@@ -392,24 +389,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
{
return State.DELETED;
}
- VirtualHostState implementationState = getVirtualHostState();
- switch(implementationState)
- {
- case INITIALISING:
- return State.UNINITIALIZED;
- case ACTIVE:
- return State.ACTIVE;
- case PASSIVE:
- // TODO
- return State.ACTIVE;
- case STOPPED:
- return State.STOPPED;
- case ERRORED:
- return State.ERRORED;
- default:
- throw new IllegalStateException("Unsupported state:" + implementationState);
- }
-
+ return _state.get();
}
@Override
@@ -709,7 +689,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
//Stop Connections
_connectionRegistry.close();
_dtxRegistry.close();
- closeStorage();
+ closeMessageStore();
shutdownHouseKeeping();
_eventLogger.message(VirtualHostMessages.CLOSED(getName()));
@@ -725,7 +705,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
}
- private void closeStorage()
+ private void closeMessageStore()
{
if (getMessageStore() != null)
{
@@ -808,11 +788,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
return _dtxRegistry;
}
- public VirtualHostState getVirtualHostState()
- {
- return _state;
- }
-
public void block()
{
synchronized (_connectionRegistry)
@@ -902,14 +877,9 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
}
- protected void setState(VirtualHostState state)
+ protected void reportIfError(State state)
{
- _state = state;
- }
-
- protected void reportIfError(VirtualHostState state)
- {
- if (state == VirtualHostState.ERRORED)
+ if (state == State.ERRORED)
{
_eventLogger.message(VirtualHostMessages.ERRORED(getName()));
}
@@ -1239,6 +1209,16 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
return _housekeepingThreadCount;
}
+ @StateTransition( currentState = { State.ACTIVE, State.ERRORED, State.UNINITIALIZED }, desiredState = State.STOPPED )
+ protected void doStop()
+ {
+ closeChildren();
+ shutdownHouseKeeping();
+ closeMessageStore();
+ _state.set(State.STOPPED);
+
+ }
+
@StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED }, desiredState = State.DELETED )
private void doDelete()
{
@@ -1250,11 +1230,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
{
throw new IntegrityViolationException("Cannot delete default virtual host '" + hostName + "'");
}
- if (getVirtualHostState() == VirtualHostState.ACTIVE
- || getVirtualHostState() == VirtualHostState.INITIALISING)
- {
- close();
- }
+ close();
MessageStore ms = getMessageStore();
if (ms != null)
@@ -1268,8 +1244,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
_logger.warn("Exception occurred on message store deletion", e);
}
}
- setAttribute(VirtualHost.STATE, getState(), State.DELETED);
- getDurableConfigurationStore().remove(asObjectRecord());
deleted();
}
}
@@ -1426,8 +1400,8 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
getDurableConfigurationStore().create(new ConfiguredObjectRecordImpl(record.getId(), record.getType(), record.getAttributes()));
}
- @StateTransition( currentState = {State.UNINITIALIZED, State.ERRORED, State.QUIESCED}, desiredState = State.ACTIVE )
- protected void activate()
+ @StateTransition( currentState = {State.UNINITIALIZED, State.ERRORED, State.QUIESCED, State.STOPPED}, desiredState = State.ACTIVE )
+ private void onActivate()
{
_houseKeepingTasks = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount());
@@ -1454,17 +1428,53 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
new MessageStoreRecoverer(this, getMessageStoreLogSubject()).recover();
- VirtualHostState finalState = VirtualHostState.ERRORED;
+ State finalState = State.ERRORED;
try
{
initialiseHouseKeeping(getHousekeepingCheckPeriod());
- finalState = VirtualHostState.ACTIVE;
+ finalState = State.ACTIVE;
}
finally
{
- _state = finalState;
- reportIfError(_state);
+ _state.set(finalState);
+ reportIfError(_state.get());
}
+
}
+ private class StoreUpdatingChangeListener implements ConfigurationChangeListener
+ {
+ @Override
+ public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState)
+ {
+ if (newState == State.DELETED)
+ {
+ getDurableConfigurationStore().remove(asObjectRecord());
+ object.removeChangeListener(this);
+ }
+ }
+
+ @Override
+ public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
+ {
+
+ }
+
+ @Override
+ public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
+ {
+
+ }
+
+ @Override
+ public void attributeSet(final ConfiguredObject<?> object,
+ final String attributeName,
+ final Object oldAttributeValue,
+ final Object newAttributeValue)
+ {
+ getDurableConfigurationStore().update(false, asObjectRecord());
+ }
+ }
+
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index 3c892a4d65..2ca1f1f5c8 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -104,8 +104,6 @@ public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends AM
ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask);
- VirtualHostState getVirtualHostState();
-
public void block();
public void unblock();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostState.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostState.java
deleted file mode 100644
index b3b30fab82..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostState.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-public enum VirtualHostState
-{
- INITIALISING,
- ACTIVE,
- PASSIVE,
- STOPPED,
- /** Terminal state that signifies the virtual host has experienced an unexpected condition. */
- ERRORED
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
index 1e270839bb..62c05991c5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
@@ -122,7 +122,7 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Creating new virtualhost with name : " + getName());
+ LOGGER.debug("Creating new virtualhost with name : " + getName());
}
Map<String, Object> hostAttributes = new HashMap<String, Object>();
hostAttributes.put(VirtualHost.MODEL_VERSION, BrokerModel.MODEL_VERSION);
@@ -147,8 +147,6 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard
}
});
}
-
- host.start();
}
@Override
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index cec34bd890..64c2cdbd95 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -584,11 +584,6 @@ public class MockVirtualHost implements VirtualHostImpl<MockVirtualHost, AMQQueu
}
- public VirtualHostState getVirtualHostState()
- {
- return VirtualHostState.ACTIVE;
- }
-
public void block()
{
}