From a2dfed6abeaad71e69d9d73c6db42b47d7d93c66 Mon Sep 17 00:00:00 2001 From: Alex Rudyy Date: Fri, 4 Jul 2014 01:16:07 +0000 Subject: QPID-5867: Add intruder protection functionality for a cluster of BDB HA virtual host nodes git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1607772 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/store/berkeleydb/HASettings.java | 2 + .../ReplicatedEnvironmentConfiguration.java | 1 + .../replication/ReplicatedEnvironmentFacade.java | 260 ++++++++++++++++++++- .../ReplicatedEnvironmentFacadeFactory.java | 6 + .../replication/ReplicationGroupListener.java | 5 + .../virtualhost/berkeleydb/BDBHAVirtualHost.java | 6 + .../berkeleydb/BDBHAVirtualHostImpl.java | 43 ++++ .../berkeleydb/BDBHARemoteReplicationNode.java | 3 + .../berkeleydb/BDBHARemoteReplicationNodeImpl.java | 8 + .../berkeleydb/BDBHAVirtualHostNode.java | 6 +- .../berkeleydb/BDBHAVirtualHostNodeImpl.java | 124 +++++++++- 11 files changed, 444 insertions(+), 20 deletions(-) (limited to 'qpid/java/bdbstore/src/main') diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/HASettings.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/HASettings.java index 31e9987182..fd4a7bc1c7 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/HASettings.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/HASettings.java @@ -34,4 +34,6 @@ public interface HASettings extends FileBasedSettings int getPriority(); int getQuorumOverride(); + + String getHelperNodeName(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java index 90fb086dc5..ae0d4b13ae 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java @@ -34,4 +34,5 @@ public interface ReplicatedEnvironmentConfiguration extends StandardEnvironmentC int getPriority(); int getQuorumOverride(); Map getReplicationParameters(); + String getHelperNodeName(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index dff5fc372d..8b9039c792 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.store.berkeleydb.replication; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; @@ -33,6 +34,7 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -47,11 +49,13 @@ import com.sleepycat.je.Sequence; import com.sleepycat.je.SequenceConfig; import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener; import org.apache.qpid.server.util.DaemonThreadFactory; +import org.codehaus.jackson.map.ObjectMapper; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; @@ -63,10 +67,13 @@ import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.EnvironmentFailureException; import com.sleepycat.je.Transaction; import com.sleepycat.je.TransactionConfig; +import com.sleepycat.je.rep.AppStateMonitor; import com.sleepycat.je.rep.InsufficientLogException; +import com.sleepycat.je.rep.InsufficientAcksException; import com.sleepycat.je.rep.InsufficientReplicasException; import com.sleepycat.je.rep.NetworkRestore; import com.sleepycat.je.rep.NetworkRestoreConfig; +import com.sleepycat.je.rep.NodeType; import com.sleepycat.je.rep.NodeState; import com.sleepycat.je.rep.RepInternal; import com.sleepycat.je.rep.ReplicatedEnvironment; @@ -143,6 +150,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan }}); public static final String TYPE = "BDB-HA"; + private static final String PERMITTED_NODE_LIST = "permittedNodes"; private final ReplicatedEnvironmentConfiguration _configuration; private final String _prettyGroupNodeName; @@ -165,6 +173,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private final ConcurrentHashMap _cachedDatabases = new ConcurrentHashMap<>(); private final ConcurrentHashMap _cachedSequences = new ConcurrentHashMap<>(); + private final Set _permittedNodes = new CopyOnWriteArraySet(); public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration) { @@ -190,7 +199,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan _environmentJobExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("Environment-" + _prettyGroupNodeName)); _groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName)); - // create environment in a separate thread to avoid renaming of the current thread by JE _environment = createEnvironment(true); populateExistingRemoteReplicationNodes(); @@ -298,7 +306,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan @Override public DatabaseException handleDatabaseException(String contextMessage, final DatabaseException dbe) { - boolean restart = (dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientReplicasException || dbe instanceof RestartRequiredException); + boolean restart = (dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientAcksException || dbe instanceof RestartRequiredException); if (restart) { tryToRestartEnvironment(dbe); @@ -821,6 +829,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan _environment = createEnvironment(false); + registerAppStateMonitorIfPermittedNodesSpecified(); + if (_stateChangeListener.get() != null) { _environment.setStateChangeListener(this); @@ -935,25 +945,34 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan boolean designatedPrimary = _configuration.isDesignatedPrimary(); int priority = _configuration.getPriority(); int quorumOverride = _configuration.getQuorumOverride(); + String nodeName = _configuration.getName(); + String helperNodeName = _configuration.getHelperNodeName(); if (LOGGER.isInfoEnabled()) { LOGGER.info("Creating environment"); LOGGER.info("Environment path " + _environmentDirectory.getAbsolutePath()); LOGGER.info("Group name " + groupName); - LOGGER.info("Node name " + _configuration.getName()); + LOGGER.info("Node name " + nodeName); LOGGER.info("Node host port " + hostPort); LOGGER.info("Helper host port " + helperHostPort); + LOGGER.info("Helper host name " + helperNodeName); LOGGER.info("Durability " + _defaultDurability); LOGGER.info("Designated primary (applicable to 2 node case only) " + designatedPrimary); LOGGER.info("Node priority " + priority); LOGGER.info("Quorum override " + quorumOverride); + LOGGER.info("Permitted node list " + _permittedNodes); + } + + if (helperNodeName != null && _permittedNodes.isEmpty() && !helperHostPort.equals(hostPort)) + { + connectToHelperNodeAndCheckPermittedHosts(helperNodeName, helperHostPort, hostPort); } Map replicationEnvironmentParameters = new HashMap<>(ReplicatedEnvironmentFacade.REPCONFIG_DEFAULTS); replicationEnvironmentParameters.putAll(_configuration.getReplicationParameters()); - ReplicationConfig replicationConfig = new ReplicationConfig(groupName, _configuration.getName(), hostPort); + ReplicationConfig replicationConfig = new ReplicationConfig(groupName, nodeName, hostPort); replicationConfig.setHelperHosts(helperHostPort); replicationConfig.setDesignatedPrimary(designatedPrimary); replicationConfig.setNodePriority(priority); @@ -1111,6 +1130,144 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } + public void setPermittedNodes(Collection permittedNodes) + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Setting permitted nodes to " + permittedNodes); + } + + _permittedNodes.clear(); + if (permittedNodes != null) + { + _permittedNodes.addAll(permittedNodes); + registerAppStateMonitorIfPermittedNodesSpecified(); + + ReplicationGroupListener listener = _replicationGroupListener.get(); + for(ReplicationNode node: _remoteReplicationNodes.values()) + { + if (!isNodePermitted(node)) + { + onIntruder(listener, node); + } + } + } + } + + private void registerAppStateMonitorIfPermittedNodesSpecified() + { + if (!_permittedNodes.isEmpty()) + { + byte[] data = permittedNodeListToBytes(_permittedNodes); + _environment.registerAppStateMonitor(new EnvironmentStateHolder(data)); + } + } + + private boolean isNodePermitted(ReplicationNode replicationNode) + { + if (_permittedNodes.isEmpty()) + { + return true; + } + + String nodeHostPort = getHostPort(replicationNode); + return _permittedNodes.contains(nodeHostPort); + } + + private String getHostPort(ReplicationNode replicationNode) + { + return replicationNode.getHostName() + ":" + replicationNode.getPort(); + } + + + private void onIntruder(ReplicationGroupListener replicationGroupListener, ReplicationNode replicationNode) + { + if (replicationGroupListener != null) + { + replicationGroupListener.onIntruderNode(replicationNode); + } + else + { + LOGGER.warn(String.format("Found an intruder node '%s' from ''%s' . The node is not listed in permitted list: %s", + replicationNode.getName(), getHostPort(replicationNode), String.valueOf(_permittedNodes))); + } + } + + private byte[] permittedNodeListToBytes(Set permittedNodeList) + { + HashMap data = new HashMap(); + data.put(PERMITTED_NODE_LIST, permittedNodeList); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectMapper objectMapper = new ObjectMapper(); + try + { + objectMapper.writeValue(baos, data); + } + catch (Exception e) + { + throw new RuntimeException("Unexpected exception on serializing of permitted node list into json", e); + } + return baos.toByteArray(); + } + + Collection bytesToPermittedNodeList(byte[] applicationState) + { + if (applicationState == null || applicationState.length == 0) + { + return Collections.emptySet(); + } + + ObjectMapper objectMapper = new ObjectMapper(); + try + { + Map settings = objectMapper.readValue(applicationState, Map.class); + return (Collection)settings.get(PERMITTED_NODE_LIST); + } + catch (Exception e) + { + throw new RuntimeException("Unexpected exception on de-serializing of application state", e); + } + } + + private void connectToHelperNodeAndCheckPermittedHosts(String helperNodeName, String helperHostPort, String hostPort) + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug(String.format("Requesting state of the node '%s' at '%s'", helperNodeName, helperHostPort)); + } + + Collection permittedNodes = null; + try + { + NodeState state = getRemoteNodeState(new ReplicationNodeImpl(helperNodeName, helperHostPort)); + byte[] applicationState = state.getAppState(); + permittedNodes = bytesToPermittedNodeList(applicationState); + } + catch (IOException e) + { + throw new IllegalConfigurationException(String.format("Cannot connect to '%s'", helperHostPort), e); + } + catch (ServiceConnectFailedException e) + { + throw new IllegalConfigurationException(String.format("Failure to connect to '%s'", helperHostPort), e); + } + catch (Exception e) + { + throw new RuntimeException(String.format("Unexpected exception on attempt to retrieve state from '%s' at '%s'", + helperNodeName, helperHostPort), e); + } + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug(String.format("Attribute 'permittedNodes' on node '%s' is set to '%s'", helperNodeName, String.valueOf(permittedNodes))); + } + + if (permittedNodes != null && !permittedNodes.isEmpty() && !permittedNodes.contains(hostPort)) + { + throw new IllegalConfigurationException(String.format("Node from '%s' is not permitted!", hostPort)); + } + } + private void populateExistingRemoteReplicationNodes() { ReplicationGroup group = _environment.getGroup(); @@ -1157,7 +1314,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan Map nodeStates = discoverNodeStates(_remoteReplicationNodes.values()); - executeDabasePingerOnNodeChangesIfMaster(nodeStates); + executeDatabasePingerOnNodeChangesIfMaster(nodeStates); notifyGroupListenerAboutNodeStates(nodeStates); } @@ -1187,7 +1344,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan if (env != null) { ReplicationGroup group = env.getGroup(); - Set nodes = new HashSet(group.getElectableNodes()); + Set nodes = new HashSet(group.getNodes()); String localNodeName = getNodeName(); Map removalMap = new HashMap(_remoteReplicationNodes); @@ -1205,9 +1362,16 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan _remoteReplicationNodes.put(discoveredNodeName, replicationNode); - if (replicationGroupListener != null) + if (isNodePermitted(replicationNode)) { - replicationGroupListener.onReplicationNodeAddedToGroup(replicationNode); + if (replicationGroupListener != null) + { + replicationGroupListener.onReplicationNodeAddedToGroup(replicationNode); + } + } + else + { + onIntruder(replicationGroupListener, replicationNode); } } else @@ -1288,7 +1452,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan return nodeStates; } - private void executeDabasePingerOnNodeChangesIfMaster(final Map nodeStates) + private void executeDatabasePingerOnNodeChangesIfMaster(final Map nodeStates) { if (ReplicatedEnvironment.State.MASTER == _environment.getState()) { @@ -1330,4 +1494,82 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan CLOSING, CLOSED } + + private static class EnvironmentStateHolder implements AppStateMonitor + { + private byte[] _data; + + private EnvironmentStateHolder(byte[] data) + { + this._data = data; + } + + @Override + public byte[] getAppState() + { + return _data; + } + } + + static class ReplicationNodeImpl implements ReplicationNode + { + + private final InetSocketAddress _address; + private final String _nodeName; + private final String _host; + private final int _port; + + public ReplicationNodeImpl(String nodeName, String hostPort) + { + String[] tokens = hostPort.split(":"); + if (tokens.length != 2) + { + throw new IllegalArgumentException("Unexpected host port value :" + hostPort); + } + _host = tokens[0]; + _port = Integer.parseInt(tokens[1]); + _nodeName = nodeName; + _address = new InetSocketAddress(_host, _port); + } + + @Override + public String getName() + { + return _nodeName; + } + + @Override + public NodeType getType() + { + return NodeType.ELECTABLE; + } + + @Override + public InetSocketAddress getSocketAddress() + { + return _address; + } + + @Override + public String getHostName() + { + return _host; + } + + @Override + public int getPort() + { + return _port; + } + + @Override + public String toString() + { + return "ReplicationNodeImpl{" + + "_nodeName='" + _nodeName + '\'' + + ", _host='" + _host + '\'' + + ", _port=" + _port + + '}'; + } + } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java index d7b01bc829..f67d232b9f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java @@ -65,6 +65,12 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact return buildReplicationConfigParameters(parent); } + @Override + public String getHelperNodeName() + { + return settings.getHelperNodeName(); + } + @Override public int getQuorumOverride() { diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java index e88b9e8651..7f9f0fcf64 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java @@ -48,4 +48,9 @@ public interface ReplicationGroupListener */ void onNodeState(ReplicationNode node, NodeState nodeState); + /** + * Invoked on intruder node detected + */ + void onIntruderNode(ReplicationNode node); + } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java index f231638426..477ffb5a64 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.virtualhost.berkeleydb; +import java.util.List; + import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.DerivedAttribute; import org.apache.qpid.server.model.ManagedAttribute; @@ -34,6 +36,7 @@ public interface BDBHAVirtualHost> extends Virtual String COALESCING_SYNC = "coalescingSync"; String DURABILITY = "durability"; String STORE_PATH = "storePath"; + String PERMITTED_NODES = "permittedNodes"; @ManagedAttribute( defaultValue = "SYNC") String getLocalTransactionSynchronizationPolicy(); @@ -52,4 +55,7 @@ public interface BDBHAVirtualHost> extends Virtual @ManagedAttribute(mandatory = true, defaultValue = "0") Long getStoreOverfullSize(); + + @ManagedAttribute + List getPermittedNodes(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java index ba210b470c..aaa5f6aca4 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.virtualhost.berkeleydb; +import java.util.List; import java.util.Map; import java.util.Set; @@ -54,6 +55,9 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost _permittedNodes; + @ManagedObjectFactoryConstructor public BDBHAVirtualHostImpl(final Map attributes, VirtualHostNode virtualHostNode) { @@ -127,6 +131,31 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost)proxyForValidation).getRemoteTransactionSynchronizationPolicy(); validateTransactionSynchronizationPolicy(policy); } + + if(changedAttributes.contains(PERMITTED_NODES)) + { + + List permittedNodes = ((BDBHAVirtualHost)proxyForValidation).getPermittedNodes(); + if (permittedNodes != null) + { + for (String permittedNode: permittedNodes) + { + String[] tokens = permittedNode.split(":"); + if (tokens.length != 2) + { + throw new IllegalArgumentException(String.format("Invalid permitted node specified '%s'. ", permittedNode)); + } + try + { + Integer.parseInt(tokens[1]); + } + catch(Exception e) + { + throw new IllegalArgumentException(String.format("Invalid port is specified in permitted node '%s'. ", permittedNode)); + } + } + } + } } private void validateTransactionSynchronizationPolicy(String policy) @@ -158,4 +187,18 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost getPermittedNodes() + { + return _permittedNodes; + } + + protected void applyPermittedNodes() + { + ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade(); + if (facade != null) + { + facade.setPermittedNodes(getPermittedNodes()); + } + } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java index 691096f59f..10b2b13bc9 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java @@ -33,6 +33,7 @@ public interface BDBHARemoteReplicationNode _state; + private final boolean _isMonitor; public BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNode virtualHostNode, Map attributes, ReplicatedEnvironmentFacade replicatedEnvironmentFacade) { @@ -61,6 +62,7 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject(State.ACTIVE); + _isMonitor = (Boolean)attributes.get(MONITOR); } @Override @@ -99,6 +101,12 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject> extends public static final String ROLE = "role"; public static final String LAST_KNOWN_REPLICATION_TRANSACTION_ID = "lastKnownReplicationTransactionId"; public static final String JOIN_TIME = "joinTime"; + public static final String HELPER_NODE_NAME = "helperNodeName"; @ManagedAttribute(mandatory=true) String getGroupName(); @@ -65,4 +64,7 @@ public interface BDBHAVirtualHostNode> extends @DerivedAttribute Long getJoinTime(); + + @ManagedAttribute(persist = false) + String getHelperNodeName(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index cf1cea3efa..3868025096 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -36,6 +36,7 @@ import javax.security.auth.Subject; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.rep.NodeState; +import com.sleepycat.je.rep.NodeType; import com.sleepycat.je.rep.ReplicatedEnvironment; import com.sleepycat.je.rep.ReplicationNode; import com.sleepycat.je.rep.StateChangeEvent; @@ -44,6 +45,8 @@ import com.sleepycat.je.rep.util.ReplicationGroupAdmin; import com.sleepycat.je.rep.utilint.HostPortPair; import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.updater.Task; +import org.apache.qpid.server.configuration.updater.VoidTask; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.BrokerModel; @@ -107,6 +110,9 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode attributes, Broker broker) { @@ -202,6 +208,12 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode getRemoteReplicationNodes() @@ -260,6 +272,11 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode() + { + @Override + public Void execute() + { + addRemoteReplicationNode(node); + return null; + } + }); + } + + private void addRemoteReplicationNode(ReplicationNode node) { BDBHARemoteReplicationNodeImpl remoteNode = new BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNodeImpl.this, nodeToAttributes(node), getReplicatedEnvironmentFacade()); remoteNode.create(); @@ -606,14 +641,40 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode() + { + @Override + public Void execute() + { + recoverRemoteReplicationNode(node); + return null; + } + }); + } + + private void recoverRemoteReplicationNode(ReplicationNode node) { BDBHARemoteReplicationNodeImpl remoteNode = new BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNodeImpl.this, nodeToAttributes(node), getReplicatedEnvironmentFacade()); remoteNode.open(); } @Override - public void onReplicationNodeRemovedFromGroup(ReplicationNode node) + public void onReplicationNodeRemovedFromGroup(final ReplicationNode node) + { + getTaskExecutor().submit(new Task() + { + @Override + public Void execute() + { + removeRemoteReplicationNode(node); + return null; + } + }); + } + + private void removeRemoteReplicationNode(ReplicationNode node) { BDBHARemoteReplicationNodeImpl remoteNode = getChildByName(BDBHARemoteReplicationNodeImpl.class, node.getName()); if (remoteNode != null) @@ -634,11 +695,55 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode() + { + @Override + public Void execute() + { + State state = getState(); + + if (state == State.ACTIVE) + { + try + { + stopAndSetStateTo(State.ERRORED); + } + finally + { + stopEnvironment(); + } + notifyStateChanged(state, State.ERRORED); + } + return null; + } + }); + } } private Map nodeToAttributes(ReplicationNode replicationNode) @@ -647,6 +752,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode