diff options
| author | Keith Wall <kwall@apache.org> | 2014-05-20 15:04:11 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-05-20 15:04:11 +0000 |
| commit | 65af0e588588f15bbb1498403ee49f3c41ed04a0 (patch) | |
| tree | 6c249ba8f011e0b033caa5ef77ec10fc5dbcfdc4 /qpid/java/broker-core | |
| parent | 7371feb185388d4bedda4ac10ee7c78a17023a7e (diff) | |
| download | qpid-python-65af0e588588f15bbb1498403ee49f3c41ed04a0.tar.gz | |
QPID-5715: [Java Broker] Make virtualhosts respect the states ACTIVE and STOPPED
* Changes in virtualhost state are now persisted to store.
* VirtualHostState eliminated. The PASSIVE state used when a BDB HA Virtualhost is in replica is replaced by UNAVAILABLE.
Work by me and Andrew MacBean <andymacbean@gmail.com>.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1596281 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core')
6 files changed, 67 insertions, 96 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/State.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/State.java index f021db009e..881991b2d2 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/State.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/State.java @@ -27,7 +27,8 @@ public enum State STOPPED, ACTIVE, DELETED, - ERRORED(false); + ERRORED(false), + UNAVAILABLE; private final boolean _valid; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index cd3d7c889c..2bec380820 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.virtualhost; import java.security.AccessControlException; import java.security.PrivilegedAction; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -37,6 +36,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; @@ -108,7 +108,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte private final SystemNodeRegistry _systemNodeRegistry = new SystemNodeRegistry(); - private volatile VirtualHostState _state = VirtualHostState.INITIALISING; + private final AtomicReference<State> _state = new AtomicReference<>(State.UNINITIALIZED); private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; @@ -273,18 +273,15 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } } } - } - private void checkVHostStateIsActive() - { - checkVHostState(VirtualHostState.ACTIVE); + addChangeListener(new StoreUpdatingChangeListener()); } - private void checkVHostState(VirtualHostState... states) + private void checkVHostStateIsActive() { - if (!Arrays.asList(states).contains(getVirtualHostState())) + if (_state.get() != State.ACTIVE) { - throw new IllegalStateException("The virtual hosts state of " + getVirtualHostState() + throw new IllegalStateException("The virtual host state of " + _state.get() + " does not permit this operation."); } } @@ -392,24 +389,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte { return State.DELETED; } - VirtualHostState implementationState = getVirtualHostState(); - switch(implementationState) - { - case INITIALISING: - return State.UNINITIALIZED; - case ACTIVE: - return State.ACTIVE; - case PASSIVE: - // TODO - return State.ACTIVE; - case STOPPED: - return State.STOPPED; - case ERRORED: - return State.ERRORED; - default: - throw new IllegalStateException("Unsupported state:" + implementationState); - } - + return _state.get(); } @Override @@ -709,7 +689,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte //Stop Connections _connectionRegistry.close(); _dtxRegistry.close(); - closeStorage(); + closeMessageStore(); shutdownHouseKeeping(); _eventLogger.message(VirtualHostMessages.CLOSED(getName())); @@ -725,7 +705,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } } - private void closeStorage() + private void closeMessageStore() { if (getMessageStore() != null) { @@ -808,11 +788,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte return _dtxRegistry; } - public VirtualHostState getVirtualHostState() - { - return _state; - } - public void block() { synchronized (_connectionRegistry) @@ -902,14 +877,9 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } } - protected void setState(VirtualHostState state) + protected void reportIfError(State state) { - _state = state; - } - - protected void reportIfError(VirtualHostState state) - { - if (state == VirtualHostState.ERRORED) + if (state == State.ERRORED) { _eventLogger.message(VirtualHostMessages.ERRORED(getName())); } @@ -1239,6 +1209,16 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte return _housekeepingThreadCount; } + @StateTransition( currentState = { State.ACTIVE, State.ERRORED, State.UNINITIALIZED }, desiredState = State.STOPPED ) + protected void doStop() + { + closeChildren(); + shutdownHouseKeeping(); + closeMessageStore(); + _state.set(State.STOPPED); + + } + @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED }, desiredState = State.DELETED ) private void doDelete() { @@ -1250,11 +1230,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte { throw new IntegrityViolationException("Cannot delete default virtual host '" + hostName + "'"); } - if (getVirtualHostState() == VirtualHostState.ACTIVE - || getVirtualHostState() == VirtualHostState.INITIALISING) - { - close(); - } + close(); MessageStore ms = getMessageStore(); if (ms != null) @@ -1268,8 +1244,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte _logger.warn("Exception occurred on message store deletion", e); } } - setAttribute(VirtualHost.STATE, getState(), State.DELETED); - getDurableConfigurationStore().remove(asObjectRecord()); deleted(); } } @@ -1426,8 +1400,8 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte getDurableConfigurationStore().create(new ConfiguredObjectRecordImpl(record.getId(), record.getType(), record.getAttributes())); } - @StateTransition( currentState = {State.UNINITIALIZED, State.ERRORED, State.QUIESCED}, desiredState = State.ACTIVE ) - protected void activate() + @StateTransition( currentState = {State.UNINITIALIZED, State.ERRORED, State.QUIESCED, State.STOPPED}, desiredState = State.ACTIVE ) + private void onActivate() { _houseKeepingTasks = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount()); @@ -1454,17 +1428,53 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte new MessageStoreRecoverer(this, getMessageStoreLogSubject()).recover(); - VirtualHostState finalState = VirtualHostState.ERRORED; + State finalState = State.ERRORED; try { initialiseHouseKeeping(getHousekeepingCheckPeriod()); - finalState = VirtualHostState.ACTIVE; + finalState = State.ACTIVE; } finally { - _state = finalState; - reportIfError(_state); + _state.set(finalState); + reportIfError(_state.get()); } + } + private class StoreUpdatingChangeListener implements ConfigurationChangeListener + { + @Override + public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState) + { + if (newState == State.DELETED) + { + getDurableConfigurationStore().remove(asObjectRecord()); + object.removeChangeListener(this); + } + } + + @Override + public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child) + { + + } + + @Override + public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child) + { + + } + + @Override + public void attributeSet(final ConfiguredObject<?> object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + getDurableConfigurationStore().update(false, asObjectRecord()); + } + } + + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index 3c892a4d65..2ca1f1f5c8 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -104,8 +104,6 @@ public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends AM ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask); - VirtualHostState getVirtualHostState(); - public void block(); public void unblock(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostState.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostState.java deleted file mode 100644 index b3b30fab82..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostState.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost; - -public enum VirtualHostState -{ - INITIALISING, - ACTIVE, - PASSIVE, - STOPPED, - /** Terminal state that signifies the virtual host has experienced an unexpected condition. */ - ERRORED -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java index 1e270839bb..62c05991c5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java @@ -122,7 +122,7 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Creating new virtualhost with name : " + getName()); + LOGGER.debug("Creating new virtualhost with name : " + getName()); } Map<String, Object> hostAttributes = new HashMap<String, Object>(); hostAttributes.put(VirtualHost.MODEL_VERSION, BrokerModel.MODEL_VERSION); @@ -147,8 +147,6 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard } }); } - - host.start(); } @Override diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index cec34bd890..64c2cdbd95 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -584,11 +584,6 @@ public class MockVirtualHost implements VirtualHostImpl<MockVirtualHost, AMQQueu } - public VirtualHostState getVirtualHostState() - { - return VirtualHostState.ACTIVE; - } - public void block() { } |
