diff options
| author | Alex Rudyy <orudyy@apache.org> | 2014-04-30 13:08:49 +0000 |
|---|---|---|
| committer | Alex Rudyy <orudyy@apache.org> | 2014-04-30 13:08:49 +0000 |
| commit | aeb2793707799a2d459888fc0ed0dd9faf1c7c78 (patch) | |
| tree | c921c90de8252dc3ed3d5ec8e47bcdd42c73cf9c /qpid/java/bdbstore/src | |
| parent | 472d84f65a13f2882252cf85fbfb99457294518f (diff) | |
| download | qpid-python-aeb2793707799a2d459888fc0ed0dd9faf1c7c78.tar.gz | |
QPID-5715,QPID-5412: Add remote replication nodes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1591281 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src')
7 files changed, 1073 insertions, 88 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index 0f839ea02d..a669904664 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -24,6 +24,7 @@ import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -67,6 +68,7 @@ import com.sleepycat.je.rep.NodeState; import com.sleepycat.je.rep.RepInternal; import com.sleepycat.je.rep.ReplicatedEnvironment; import com.sleepycat.je.rep.ReplicationConfig; +import com.sleepycat.je.rep.ReplicationGroup; import com.sleepycat.je.rep.ReplicationMutableConfig; import com.sleepycat.je.rep.ReplicationNode; import com.sleepycat.je.rep.RestartRequiredException; @@ -147,6 +149,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private final ScheduledExecutorService _groupChangeExecutor; private final AtomicReference<State> _state = new AtomicReference<State>(State.OPENING); private final ConcurrentMap<String, DatabaseHolder> _databases = new ConcurrentHashMap<String, DatabaseHolder>(); + private final ConcurrentMap<String, ReplicationNode> _remoteReplicationNodes = new ConcurrentHashMap<String, ReplicationNode>(); + private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<ReplicationGroupListener>(); private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>(); private final AtomicBoolean _initialised; private final EnvironmentFacadeTask[] _initialisationTasks; @@ -178,10 +182,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan // we relay on this executor being single-threaded as we need to restart and mutate the environment in one thread _environmentJobExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("Environment-" + _prettyGroupNodeName)); _groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName)); - _groupChangeExecutor.schedule(new RemoteNodeStateLearner(), 100, TimeUnit.MILLISECONDS); // TODO make configurable // create environment in a separate thread to avoid renaming of the current thread by JE _environment = createEnvironment(true); + populateExistingRemoteReplicationNodes(); + _groupChangeExecutor.submit(new RemoteNodeStateLearner()); } @Override @@ -805,6 +810,19 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan LOGGER.warn("Ignoring an exception whilst closing databases", e); } } + else + { + // reset database holders for invalid environments + for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet()) + { + DatabaseHolder databaseHolder = entry.getValue(); + Database database = databaseHolder.getDatabase(); + if (database != null) + { + databaseHolder.setDatabase(null); + } + } + } environment.close(); } catch (EnvironmentFailureException efe) @@ -1004,7 +1022,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } - public NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException + NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException { if (repNode == null) { @@ -1023,84 +1041,229 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan return _environment.getGroup().getElectableNodes().size(); } + public void setReplicationGroupListener(ReplicationGroupListener replicationGroupListener) + { + if (_replicationGroupListener.compareAndSet(null, replicationGroupListener)) + { + notifyExistingRemoteReplicationNodes(replicationGroupListener); + } + else + { + throw new IllegalStateException("ReplicationGroupListener is already set on " + _prettyGroupNodeName); + } + } + + private void populateExistingRemoteReplicationNodes() + { + ReplicationGroup group = _environment.getGroup(); + Set<ReplicationNode> nodes = new HashSet<ReplicationNode>(group.getElectableNodes()); + String localNodeName = getNodeName(); + + for (ReplicationNode replicationNode : nodes) + { + String discoveredNodeName = replicationNode.getName(); + if (!discoveredNodeName.equals(localNodeName)) + { + _remoteReplicationNodes.put(replicationNode.getName(), replicationNode); + } + } + } + + private void notifyExistingRemoteReplicationNodes(ReplicationGroupListener listener) + { + for (ReplicationNode value : _remoteReplicationNodes.values()) + { + listener.onReplicationNodeRecovered(value); + } + } + private class RemoteNodeStateLearner implements Callable<Void> { private Map<String, ReplicatedEnvironment.State> _previousGroupState = Collections.emptyMap(); + @Override public Void call() { - final Map<String, ReplicatedEnvironment.State> currentGroupState = new HashMap<String, ReplicatedEnvironment.State>(); try { - Set<Future<Void>> futures = new HashSet<Future<Void>>(); + if (_state.get() == State.OPEN) + { + try + { + detectGroupChangesAndNotify(); + } + catch(DatabaseException e) + { + handleDatabaseException("Exception on replication group check", e); + } + + Map<ReplicationNode, NodeState> nodeStates = discoverNodeStates(_remoteReplicationNodes.values()); + + executeDabasePingerOnNodeChangesIfMaster(nodeStates); - for (final ReplicationNode node : _environment.getGroup().getElectableNodes()) + notifyGroupListenerAboutNodeStates(nodeStates); + } + + } + finally + { + State state = _state.get(); + if (state != State.CLOSED && state != State.CLOSING) { - Future<Void> future = _groupChangeExecutor.submit(new Callable<Void>() + _groupChangeExecutor.schedule(this, REMOTE_NODE_MONITOR_INTERVAL, TimeUnit.MILLISECONDS); + } + } + return null; + } + + private void detectGroupChangesAndNotify() + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Checking for changes in the group " + _configuration.getGroupName() + " on node " + _configuration.getName()); + } + + String groupName = _configuration.getGroupName(); + ReplicatedEnvironment env = _environment; + ReplicationGroupListener replicationGroupListener = _replicationGroupListener.get(); + if (env != null) + { + ReplicationGroup group = env.getGroup(); + Set<ReplicationNode> nodes = new HashSet<ReplicationNode>(group.getElectableNodes()); + String localNodeName = getNodeName(); + + Map<String, ReplicationNode> removalMap = new HashMap<String, ReplicationNode>(_remoteReplicationNodes); + for (ReplicationNode replicationNode : nodes) + { + String discoveredNodeName = replicationNode.getName(); + if (!discoveredNodeName.equals(localNodeName)) { - @Override - public Void call() + if (!_remoteReplicationNodes.containsKey(discoveredNodeName)) { - DbPing ping = new DbPing(node, _configuration.getGroupName(), REMOTE_NODE_MONITOR_INTERVAL); - ReplicatedEnvironment.State nodeState; - try - { - nodeState = ping.getNodeState().getNodeState(); - } - catch (IOException e) + if (LOGGER.isDebugEnabled()) { - nodeState = ReplicatedEnvironment.State.UNKNOWN; + LOGGER.debug("Remote replication node added '" + replicationNode + "' to '" + groupName + "'"); } - catch (ServiceConnectFailedException e) + + _remoteReplicationNodes.put(discoveredNodeName, replicationNode); + + if (replicationGroupListener != null) { - nodeState = ReplicatedEnvironment.State.UNKNOWN; + replicationGroupListener.onReplicationNodeAddedToGroup(replicationNode); } - - currentGroupState.put(node.getName(), nodeState); - return null; } - }); - futures.add(future); + else + { + removalMap.remove(discoveredNodeName); + } + } } - for (Future<Void> future : futures) + if (!removalMap.isEmpty()) { - try + for (Map.Entry<String, ReplicationNode> replicationNodeEntry : removalMap.entrySet()) { - future.get(REMOTE_NODE_MONITOR_INTERVAL, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - Thread.currentThread().interrupt(); - } - catch (ExecutionException e) - { - LOGGER.warn("Cannot update node state for group " + _configuration.getGroupName(), e.getCause()); - } - catch (TimeoutException e) - { - LOGGER.warn("Timeout whilst updating node state for group " + _configuration.getGroupName()); - future.cancel(true); + String replicationNodeName = replicationNodeEntry.getKey(); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Remote replication node removed '" + replicationNodeName + "' from '" + groupName + "'"); + } + _remoteReplicationNodes.remove(replicationNodeName); + if (replicationGroupListener != null) + { + replicationGroupListener.onReplicationNodeRemovedFromGroup(replicationNodeEntry.getValue()); + } } } + } + } + + private Map<ReplicationNode, NodeState> discoverNodeStates(Collection<ReplicationNode> electableNodes) + { + final Map<ReplicationNode, NodeState> nodeStates = new HashMap<ReplicationNode, NodeState>(); + Set<Future<Void>> futures = new HashSet<Future<Void>>(); - if (ReplicatedEnvironment.State.MASTER == _environment.getState()) + for (final ReplicationNode node : electableNodes) + { + Future<Void> future = _groupChangeExecutor.submit(new Callable<Void>() { - boolean stateChanged = !_previousGroupState.equals(currentGroupState); - _previousGroupState = currentGroupState; - if (stateChanged && State.OPEN == _state.get()) + @Override + public Void call() { - new DatabasePinger().pingDb(ReplicatedEnvironmentFacade.this); + NodeState nodeStateObject = null; + try + { + nodeStateObject = getRemoteNodeState(node); + } + catch (IOException | ServiceConnectFailedException e ) + { + // Cannot discover node states. The node state should be treated as UNKNOWN + } + + nodeStates.put(node, nodeStateObject); + return null; } + }); + futures.add(future); + } + + for (Future<Void> future : futures) + { + try + { + future.get(REMOTE_NODE_MONITOR_INTERVAL, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + catch (ExecutionException e) + { + LOGGER.warn("Cannot update node state for group " + _configuration.getGroupName(), e.getCause()); + } + catch (TimeoutException e) + { + LOGGER.warn("Timeout whilst updating node state for group " + _configuration.getGroupName()); + future.cancel(true); } } - finally + return nodeStates; + } + + private void executeDabasePingerOnNodeChangesIfMaster(final Map<ReplicationNode, NodeState> nodeStates) + { + if (ReplicatedEnvironment.State.MASTER == _environment.getState()) { - _groupChangeExecutor.schedule(this, REMOTE_NODE_MONITOR_INTERVAL, TimeUnit.MILLISECONDS); + Map<String, ReplicatedEnvironment.State> currentGroupState = new HashMap<String, ReplicatedEnvironment.State>(); + for (Map.Entry<ReplicationNode, NodeState> entry : nodeStates.entrySet()) + { + ReplicationNode node = entry.getKey(); + NodeState nodeState = entry.getValue(); + ReplicatedEnvironment.State state = nodeState == null? ReplicatedEnvironment.State.UNKNOWN : nodeState.getNodeState(); + currentGroupState.put(node.getName(), state); + } + boolean stateChanged = !_previousGroupState.equals(currentGroupState); + _previousGroupState = currentGroupState; + if (stateChanged && State.OPEN == _state.get()) + { + new DatabasePinger().pingDb(ReplicatedEnvironmentFacade.this); + } + } + } + + private void notifyGroupListenerAboutNodeStates(final Map<ReplicationNode, NodeState> nodeStates) + { + ReplicationGroupListener replicationGroupListener = _replicationGroupListener.get(); + if (replicationGroupListener != null) + { + for (Map.Entry<ReplicationNode, NodeState> entry : nodeStates.entrySet()) + { + replicationGroupListener.onNodeState(entry.getKey(), entry.getValue()); + } } - return null; } } + public static enum State { OPENING, diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java new file mode 100644 index 0000000000..e88b9e8651 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicationGroupListener.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.replication; + +import com.sleepycat.je.rep.NodeState; +import com.sleepycat.je.rep.ReplicationNode; + +public interface ReplicationGroupListener +{ + /** + * Fired when a remote replication node is added to a group. This event happens + * exactly once just after a new replication node is created. + */ + void onReplicationNodeAddedToGroup(ReplicationNode node); + + /** + * Fired exactly once for each existing remote node. Used to inform the application + * on any existing nodes as it starts up for the first time. + */ + void onReplicationNodeRecovered(ReplicationNode node); + + /** + * Fired when a remote replication node is (permanently) removed from group. This event + * happens exactly once just after the existing replication node is deleted. + */ + void onReplicationNodeRemovedFromGroup(ReplicationNode node); + + /** + * Invoked to notify listener on node state update + */ + void onNodeState(ReplicationNode node, NodeState nodeState); + +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java new file mode 100644 index 0000000000..e18c76b986 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNode.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhostnode.berkeleydb; + +import org.apache.qpid.server.model.ManagedAttribute; +import org.apache.qpid.server.model.ManagedObject; +import org.apache.qpid.server.model.RemoteReplicationNode; + +@ManagedObject(category=false, managesChildren=false, creatable=false) +public interface BDBHARemoteReplicationNode<X extends BDBHARemoteReplicationNode<X>> extends RemoteReplicationNode<X> +{ + String GROUP_NAME = "groupName"; + String ADDRESS = "address"; + String ROLE = "role"; + String LAST_KNOWN_REPLICATION_TRANSACTION_ID = "lastKnownReplicationTransactionId"; + String JOIN_TIME = "joinTime"; + + @ManagedAttribute(derived = true) + String getGroupName(); + + @ManagedAttribute(derived = true) + String getAddress(); + + @ManagedAttribute(automate = true) + String getRole(); + + @ManagedAttribute(derived = true) + long getJoinTime(); + + @ManagedAttribute(derived = true) + long getLastKnownReplicationTransactionId(); + +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java new file mode 100644 index 0000000000..8e94649a1a --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java @@ -0,0 +1,201 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.virtualhostnode.berkeleydb; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.model.AbstractConfiguredObject; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.IllegalStateTransitionException; +import org.apache.qpid.server.model.ManagedAttributeField; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; + +import com.sleepycat.je.rep.MasterStateException; +import com.sleepycat.je.rep.ReplicatedEnvironment; + +public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDBHARemoteReplicationNodeImpl> implements BDBHARemoteReplicationNode<BDBHARemoteReplicationNodeImpl> +{ + private static final Logger LOGGER = Logger.getLogger(BDBHARemoteReplicationNodeImpl.class); + + private final ReplicatedEnvironmentFacade _replicatedEnvironmentFacade; + private final String _address; + + private volatile long _joinTime; + private volatile long _lastTransactionId; + + @ManagedAttributeField(afterSet="afterSetRole") + private volatile String _role; + + private final AtomicReference<State> _state; + + public BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNodeImpl virtualHostNode, Map<String, Object> attributes, ReplicatedEnvironmentFacade replicatedEnvironmentFacade) + { + super(parentsMap(virtualHostNode), attributes); + _address = (String)attributes.get(ADDRESS); + _replicatedEnvironmentFacade = replicatedEnvironmentFacade; + _state = new AtomicReference<State>(State.ACTIVE); + } + + @Override + public State getState() + { + return _state.get(); + } + + @Override + public String getGroupName() + { + return _replicatedEnvironmentFacade.getGroupName(); + } + + @Override + public String getAddress() + { + return _address; + } + + @Override + public String getRole() + { + return _role; + } + + @Override + public long getJoinTime() + { + return _joinTime; + } + + @Override + public long getLastKnownReplicationTransactionId() + { + return _lastTransactionId; + } + + public void delete() + { + this.deleted(); + } + + protected void afterSetRole() + { + try + { + String nodeName = getName(); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Trying to transfer master to " + nodeName); + } + + _replicatedEnvironmentFacade.transferMasterAsynchronously(nodeName); + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("The mastership has been transfered to " + nodeName); + } + } + catch(Exception e) + { + throw new IllegalConfigurationException("Cannot transfer mastership to " + getName(), e); + } + } + + @Override + protected boolean setState(State currentState, State desiredState) + { + if (desiredState == State.DELETED) + { + String nodeName = getName(); + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Deleting node '" + nodeName + "' from group '" + getGroupName() + "'"); + } + + try + { + _replicatedEnvironmentFacade.removeNodeFromGroup(nodeName); + _state.set(State.DELETED); + delete(); + return true; + } + catch(MasterStateException e) + { + throw new IllegalStateTransitionException("Node '" + nodeName + "' cannot be deleted when role is a master"); + } + catch (Exception e) + { + throw new IllegalStateTransitionException("Unexpected exception on node '" + nodeName + "' deletion", e); + } + } + return false; + } + + @Override + protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes) + { + super.validateChange(proxyForValidation, changedAttributes); + if (changedAttributes.contains(ROLE)) + { + String currentRole = getRole(); + if (!ReplicatedEnvironment.State.REPLICA.name().equals(currentRole)) + { + throw new IllegalArgumentException("Cannot transfer mastership when not a replica"); + } + if (!ReplicatedEnvironment.State.MASTER.name().equals(((BDBHARemoteReplicationNode<?>)proxyForValidation).getRole())) + { + throw new IllegalArgumentException("Changing role to other value then " + ReplicatedEnvironment.State.MASTER.name() + " is unsupported"); + } + } + + if (changedAttributes.contains(JOIN_TIME)) + { + throw new IllegalArgumentException("Cannot change derived attribute " + JOIN_TIME); + } + + if (changedAttributes.contains(LAST_KNOWN_REPLICATION_TRANSACTION_ID)) + { + throw new IllegalArgumentException("Cannot change derived attribute " + LAST_KNOWN_REPLICATION_TRANSACTION_ID); + } + } + + void setRole(String role) + { + _role = role; + } + + void setJoinTime(long joinTime) + { + _joinTime = joinTime; + } + + void setLastTransactionId(long lastTransactionId) + { + _lastTransactionId = lastTransactionId; + } + +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index 9b72440280..1f647652c2 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.virtualhostnode.berkeleydb; import java.security.PrivilegedAction; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -30,12 +31,14 @@ import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; +import com.sleepycat.je.rep.NodeState; import com.sleepycat.je.rep.ReplicatedEnvironment; +import com.sleepycat.je.rep.ReplicationNode; import com.sleepycat.je.rep.StateChangeEvent; import com.sleepycat.je.rep.StateChangeListener; -import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.log4j.Logger; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.BrokerModel; @@ -43,6 +46,7 @@ import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.ManagedObject; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; +import org.apache.qpid.server.model.RemoteReplicationNode; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; @@ -54,12 +58,14 @@ import org.apache.qpid.server.store.berkeleydb.BDBHAVirtualHost; import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicationGroupListener; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHostState; import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode; @ManagedObject( category = false, type = "BDB_HA" ) -public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtualHostNodeImpl> implements BDBHAVirtualHostNode<BDBHAVirtualHostNodeImpl> +public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtualHostNodeImpl> implements + BDBHAVirtualHostNode<BDBHAVirtualHostNodeImpl> { /** * Length of time we synchronously await the a JE mutation to complete. It is not considered an error if we exceed this timeout, although a @@ -183,7 +189,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu @Override public String getRole() { - ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade(); if (environmentFacade != null) { return environmentFacade.getNodeState(); @@ -194,7 +200,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu @Override public Long getLastKnownReplicationTransactionId() { - ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade(); if (environmentFacade != null) { return environmentFacade.getLastKnownReplicationTransactionId(); @@ -205,7 +211,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu @Override public Long getJoinTime() { - ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade(); if (environmentFacade != null) { return environmentFacade.getJoinTime(); @@ -219,6 +225,14 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu return _replicatedEnvironmentConfiguration; } + @SuppressWarnings("rawtypes") + @Override + public Collection<? extends RemoteReplicationNode> getRemoteReplicationNodes() + { + Collection<RemoteReplicationNode> remoteNodes = getChildren(RemoteReplicationNode.class); + return (Collection<? extends RemoteReplicationNode>)remoteNodes; + } + @Override public String toString() { @@ -255,6 +269,12 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu return (BDBMessageStore) super.getConfigurationStore(); } + protected ReplicatedEnvironmentFacade getReplicatedEnvironmentFacade() + { + return _environmentFacade.get(); + } + + @Override protected DurableConfigurationStore createConfigurationStore() { return new BDBMessageStore(new ReplicatedEnvironmentFacadeFactory()); @@ -276,8 +296,11 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.STORE_LOCATION(getStorePath())); ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) getConfigurationStore().getEnvironmentFacade(); - environmentFacade.setStateChangeListener(new BDBHAMessageStoreStateChangeListener()); - _environmentFacade.set(environmentFacade); + if (_environmentFacade.compareAndSet(null, environmentFacade)) + { + environmentFacade.setStateChangeListener(new BDBHAMessageStoreStateChangeListener()); + environmentFacade.setReplicationGroupListener(new RemoteNodesDiscoverer()); + } } @Override @@ -289,7 +312,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } finally { - ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade(); if (_environmentFacade.compareAndSet(environmentFacade, null)) { environmentFacade.close(); @@ -421,7 +444,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu @SuppressWarnings("unused") private void postSetPriority() { - ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade(); if (environmentFacade != null) { try @@ -451,7 +474,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu @SuppressWarnings("unused") private void postSetDesignatedPrimary() { - ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade(); if (environmentFacade != null) { try @@ -481,7 +504,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu @SuppressWarnings("unused") private void postSetQuorumOverride() { - ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade(); if (environmentFacade != null) { try @@ -511,7 +534,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu @SuppressWarnings("unused") private void preSetRole() { - ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade(); if (environmentFacade != null) { String currentRole = environmentFacade.getNodeState(); @@ -531,7 +554,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu @SuppressWarnings("unused") private void postSetRole() { - ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade(); if (environmentFacade != null) { try @@ -561,6 +584,63 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } } + private class RemoteNodesDiscoverer implements ReplicationGroupListener + { + @Override + public void onReplicationNodeAddedToGroup(ReplicationNode node) + { + BDBHARemoteReplicationNodeImpl remoteNode = new BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNodeImpl.this, nodeToAttributes(node), getReplicatedEnvironmentFacade()); + remoteNode.create(); + childAdded(remoteNode); + } + + @Override + public void onReplicationNodeRecovered(ReplicationNode node) + { + BDBHARemoteReplicationNodeImpl remoteNode = new BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNodeImpl.this, nodeToAttributes(node), getReplicatedEnvironmentFacade()); + remoteNode.open(); + } + + @Override + public void onReplicationNodeRemovedFromGroup(ReplicationNode node) + { + BDBHARemoteReplicationNodeImpl remoteNode = getChildByName(BDBHARemoteReplicationNodeImpl.class, node.getName()); + if (remoteNode != null) + { + remoteNode.delete(); + childRemoved(remoteNode); + } + } + + @Override + public void onNodeState(ReplicationNode node, NodeState nodeState) + { + BDBHARemoteReplicationNodeImpl remoteNode = getChildByName(BDBHARemoteReplicationNodeImpl.class, node.getName()); + if (remoteNode != null) + { + if (nodeState == null) + { + remoteNode.setRole(ReplicatedEnvironment.State.UNKNOWN.name()); + } + else + { + remoteNode.setRole(nodeState.getNodeState().name()); + remoteNode.setJoinTime(nodeState.getJoinTime()); + remoteNode.setLastTransactionId(nodeState.getKnownMasterTxnEndVLSN()); + } + } + } + + private Map<String, Object> nodeToAttributes(ReplicationNode replicationNode) + { + Map<String, Object> attributes = new HashMap<String, Object>(); + attributes.put(ConfiguredObject.NAME, replicationNode.getName()); + attributes.put(ConfiguredObject.DURABLE, false); + attributes.put(BDBHARemoteReplicationNode.ADDRESS, replicationNode.getHostName() + ":" + replicationNode.getPort()); + return attributes; + } + } + private class ReplicaVirtualHost extends BDBHAVirtualHost { ReplicaVirtualHost(Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode) diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java index d296e9c2b7..826694904c 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.store.berkeleydb; import static org.mockito.Mockito.when; import java.io.File; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -39,11 +40,13 @@ import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFactory; +import org.apache.qpid.server.model.RemoteReplicationNode; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode; import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; @@ -159,8 +162,11 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase @Override public void childAdded(ConfiguredObject object, ConfiguredObject child) { - child.addChangeListener(this); - virtualHostAddedLatch.countDown(); + if (child instanceof VirtualHost) + { + child.addChangeListener(this); + virtualHostAddedLatch.countDown(); + } } @Override @@ -314,7 +320,100 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase while(!"MASTER".equals(replica.getRole())) { Thread.sleep(100); - if (awaitMastershipCount > 20) + if (awaitMastershipCount > 50) + { + fail("Replica did not assume master role"); + } + awaitMastershipCount++; + } + } + + + public void testTransferMasterToReplica() throws Exception + { + int node1PortNumber = findFreePort(); + String helperAddress = "localhost:" + node1PortNumber; + String groupName = "group"; + + Map<String, Object> node1Attributes = new HashMap<String, Object>(); + node1Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID()); + node1Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); + node1Attributes.put(BDBHAVirtualHostNode.NAME, "node1"); + node1Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName); + node1Attributes.put(BDBHAVirtualHostNode.ADDRESS, helperAddress); + node1Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress); + node1Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "1"); + + BDBHAVirtualHostNode<?> node1 = createHaVHN(node1Attributes); + assertEquals("Failed to activate node", State.ACTIVE, node1.setDesiredState(node1.getState(), State.ACTIVE)); + + final CountDownLatch remoteNodeLatch = new CountDownLatch(2); + node1.addChangeListener(new ConfigurationChangeListener() + { + @Override + public void stateChanged(ConfiguredObject object, State oldState, State newState) + { + } + + @Override + public void childRemoved(ConfiguredObject object, ConfiguredObject child) + { + } + + @Override + public void childAdded(ConfiguredObject object, ConfiguredObject child) + { + if (child instanceof RemoteReplicationNode) + { + remoteNodeLatch.countDown(); + } + } + + @Override + public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, + Object newAttributeValue) + { + } + }); + + int node2PortNumber = getNextAvailable(node1PortNumber+1); + + Map<String, Object> node2Attributes = new HashMap<String, Object>(); + node2Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID()); + node2Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); + node2Attributes.put(BDBHAVirtualHostNode.NAME, "node2"); + node2Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName); + node2Attributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + node2PortNumber); + node2Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress); + node2Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "2"); + + BDBHAVirtualHostNode<?> node2 = createHaVHN(node2Attributes); + assertEquals("Failed to activate node2", State.ACTIVE, node2.setDesiredState(node2.getState(), State.ACTIVE)); + + int node3PortNumber = getNextAvailable(node2PortNumber+1); + Map<String, Object> node3Attributes = new HashMap<String, Object>(); + node3Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID()); + node3Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); + node3Attributes.put(BDBHAVirtualHostNode.NAME, "node3"); + node3Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName); + node3Attributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + node3PortNumber); + node3Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress); + node3Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "3"); + BDBHAVirtualHostNode<?> node3 = createHaVHN(node3Attributes); + assertEquals("Failed to activate node3", State.ACTIVE, node3.setDesiredState(node3.getState(), State.ACTIVE)); + + assertTrue("Replication nodes have not been seen during 5s", remoteNodeLatch.await(5, TimeUnit.SECONDS)); + + Collection<? extends RemoteReplicationNode> remoteNodes = node1.getRemoteReplicationNodes(); + RemoteReplicationNode replicaRemoteNode = remoteNodes.iterator().next(); + replicaRemoteNode.setAttribute(BDBHARemoteReplicationNode.ROLE, "REPLICA", "MASTER"); + + BDBHAVirtualHostNode<?> replica = replicaRemoteNode.getName().equals(node2.getName())? node2 : node3; + int awaitMastershipCount = 0; + while(!"MASTER".equals(replica.getRole())) + { + Thread.sleep(100); + if (awaitMastershipCount > 50) { fail("Replica did not assume master role"); } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index 1becfe3ede..7f6083a0c9 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -24,12 +24,14 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.File; +import java.util.EnumSet; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; import org.apache.qpid.test.utils.QpidTestCase; @@ -40,15 +42,17 @@ import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.Durability; import com.sleepycat.je.Environment; +import com.sleepycat.je.rep.InsufficientReplicasException; +import com.sleepycat.je.rep.NodeState; import com.sleepycat.je.rep.ReplicatedEnvironment; import com.sleepycat.je.rep.ReplicatedEnvironment.State; import com.sleepycat.je.rep.ReplicationConfig; +import com.sleepycat.je.rep.ReplicationNode; import com.sleepycat.je.rep.StateChangeEvent; import com.sleepycat.je.rep.StateChangeListener; public class ReplicatedEnvironmentFacadeTest extends QpidTestCase { - private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort(); private static final int LISTENER_TIMEOUT = 5; private static final int WAIT_STATE_CHANGE_TIMEOUT = 30; @@ -220,6 +224,304 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase assertEquals("Unexpected Electable Group Size Override after change", TEST_ELECTABLE_GROUP_OVERRIDE + 1, facade.getElectableGroupSizeOverride()); } + public void testReplicationGroupListenerHearsAboutExistingRemoteReplicationNodes() throws Exception + { + ReplicatedEnvironmentFacade master = createMaster(); + String nodeName2 = TEST_NODE_NAME + "_2"; + String host = "localhost"; + int port = getNextAvailable(TEST_NODE_PORT + 1); + String node2NodeHostPort = host + ":" + port; + + final AtomicInteger invocationCount = new AtomicInteger(); + final CountDownLatch nodeRecoveryLatch = new CountDownLatch(1); + ReplicationGroupListener listener = new NoopReplicationGroupListener() + { + @Override + public void onReplicationNodeRecovered(ReplicationNode node) + { + nodeRecoveryLatch.countDown(); + invocationCount.incrementAndGet(); + } + }; + + createReplica(nodeName2, node2NodeHostPort, listener); + + assertEquals("Unexpected number of nodes", 2, master.getNumberOfElectableGroupMembers()); + + assertTrue("Listener not fired within timeout", nodeRecoveryLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + assertEquals("Unexpected number of listener invocations", 1, invocationCount.get()); + } + + public void testReplicationGroupListenerHearsNodeAdded() throws Exception + { + final CountDownLatch nodeAddedLatch = new CountDownLatch(1); + final AtomicInteger invocationCount = new AtomicInteger(); + ReplicationGroupListener listener = new NoopReplicationGroupListener() + { + @Override + public void onReplicationNodeAddedToGroup(ReplicationNode node) + { + invocationCount.getAndIncrement(); + nodeAddedLatch.countDown(); + } + }; + + TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); + ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener); + assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + assertEquals("Unexpected number of nodes at start of test", 1, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers()); + + String node2Name = TEST_NODE_NAME + "_2"; + String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1); + createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener()); + + assertTrue("Listener not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + assertEquals("Unexpected number of nodes", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers()); + + assertEquals("Unexpected number of listener invocations", 1, invocationCount.get()); + } + + public void testReplicationGroupListenerHearsNodeRemoved() throws Exception + { + final CountDownLatch nodeDeletedLatch = new CountDownLatch(1); + final CountDownLatch nodeAddedLatch = new CountDownLatch(1); + final AtomicInteger invocationCount = new AtomicInteger(); + ReplicationGroupListener listener = new NoopReplicationGroupListener() + { + @Override + public void onReplicationNodeRecovered(ReplicationNode node) + { + nodeAddedLatch.countDown(); + } + + @Override + public void onReplicationNodeAddedToGroup(ReplicationNode node) + { + nodeAddedLatch.countDown(); + } + + @Override + public void onReplicationNodeRemovedFromGroup(ReplicationNode node) + { + invocationCount.getAndIncrement(); + nodeDeletedLatch.countDown(); + } + }; + + TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); + ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener); + assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + String node2Name = TEST_NODE_NAME + "_2"; + String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1); + createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener()); + + assertEquals("Unexpected number of nodes at start of test", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers()); + + // Need to await the listener hearing the addition of the node to the model. + assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + // Now remove the node and ensure we hear the event + replicatedEnvironmentFacade.removeNodeFromGroup(node2Name); + + assertTrue("Node delete not fired within timeout", nodeDeletedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + assertEquals("Unexpected number of nodes after node removal", 1, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers()); + + assertEquals("Unexpected number of listener invocations", 1, invocationCount.get()); + } + + public void testMasterHearsRemoteNodeRoles() throws Exception + { + final String node2Name = TEST_NODE_NAME + "_2"; + final CountDownLatch nodeAddedLatch = new CountDownLatch(1); + final AtomicReference<ReplicationNode> nodeRef = new AtomicReference<ReplicationNode>(); + final CountDownLatch stateLatch = new CountDownLatch(1); + final AtomicReference<NodeState> stateRef = new AtomicReference<NodeState>(); + ReplicationGroupListener listener = new NoopReplicationGroupListener() + { + @Override + public void onReplicationNodeAddedToGroup(ReplicationNode node) + { + nodeRef.set(node); + nodeAddedLatch.countDown(); + } + + @Override + public void onNodeState(ReplicationNode node, NodeState nodeState) + { + if (node2Name.equals(node.getName())) + { + stateRef.set(nodeState); + stateLatch.countDown(); + } + } + }; + + TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); + ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener); + assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1); + createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener()); + + assertEquals("Unexpected number of nodes at start of test", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers()); + + assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + ReplicationNode remoteNode = (ReplicationNode)nodeRef.get(); + assertEquals("Unexpcted node name", node2Name, remoteNode.getName()); + + assertTrue("Node state not fired within timeout", stateLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + assertEquals("Unexpcted node state", State.REPLICA, stateRef.get().getNodeState()); + } + + public void testRemoveNodeFromGroup() throws Exception + { + ReplicatedEnvironmentFacade environmentFacade = createMaster(); + + String node2Name = TEST_NODE_NAME + "_2"; + String node2NodeHostPort = "localhost:" + getNextAvailable(TEST_NODE_PORT + 1); + ReplicatedEnvironmentFacade ref2 = createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener()); + + assertEquals("Unexpected group members count", 2, environmentFacade.getNumberOfElectableGroupMembers()); + ref2.close(); + + environmentFacade.removeNodeFromGroup(node2Name); + assertEquals("Unexpected group members count", 1, environmentFacade.getNumberOfElectableGroupMembers()); + } + + + public void testEnvironmentFacadeDetectsRemovalOfRemoteNode() throws Exception + { + final String replicaName = TEST_NODE_NAME + "_1"; + final CountDownLatch nodeRemovedLatch = new CountDownLatch(1); + final CountDownLatch nodeAddedLatch = new CountDownLatch(1); + final AtomicReference<ReplicationNode> addedNodeRef = new AtomicReference<ReplicationNode>(); + final AtomicReference<ReplicationNode> removedNodeRef = new AtomicReference<ReplicationNode>(); + final CountDownLatch stateLatch = new CountDownLatch(1); + final AtomicReference<NodeState> stateRef = new AtomicReference<NodeState>(); + + ReplicationGroupListener listener = new NoopReplicationGroupListener() + { + @Override + public void onReplicationNodeAddedToGroup(ReplicationNode node) + { + if (addedNodeRef.compareAndSet(null, node)) + { + nodeAddedLatch.countDown(); + } + } + + @Override + public void onReplicationNodeRemovedFromGroup(ReplicationNode node) + { + removedNodeRef.set(node); + nodeRemovedLatch.countDown(); + } + + @Override + public void onNodeState(ReplicationNode node, NodeState nodeState) + { + if (replicaName.equals(node.getName())) + { + stateRef.set(nodeState); + stateLatch.countDown(); + } + } + }; + + TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); + final ReplicatedEnvironmentFacade masterEnvironment = addNode(State.MASTER, stateChangeListener, listener); + assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + masterEnvironment.setDesignatedPrimary(true); + + int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); + String node1NodeHostPort = "localhost:" + replica1Port; + + ReplicatedEnvironmentFacade replica = createReplica(replicaName, node1NodeHostPort, new NoopReplicationGroupListener()); + + assertTrue("Node should be added", nodeAddedLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS)); + + ReplicationNode node = addedNodeRef.get(); + assertEquals("Unexpected node name", replicaName, node.getName()); + + assertTrue("Node state was not heared", stateLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS)); + assertEquals("Unexpected node role", State.REPLICA, stateRef.get().getNodeState()); + assertEquals("Unexpected node name", replicaName, stateRef.get().getNodeName()); + + replica.close(); + masterEnvironment.removeNodeFromGroup(node.getName()); + + assertTrue("Node deleting is undetected by the environment facade", nodeRemovedLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS)); + assertEquals("Unexpected node is deleted", node, removedNodeRef.get()); + } + + public void testCloseStateTransitions() throws Exception + { + ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster(); + + assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, replicatedEnvironmentFacade.getFacadeState()); + replicatedEnvironmentFacade.close(); + assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState()); + } + + public void testEnvironmentRestartOnInsufficientReplicas() throws Exception + { + + ReplicatedEnvironmentFacade master = createMaster(); + + int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); + String replica1NodeName = TEST_NODE_NAME + "_1"; + String replica1NodeHostPort = "localhost:" + replica1Port; + ReplicatedEnvironmentFacade replica1 = createReplica(replica1NodeName, replica1NodeHostPort, new NoopReplicationGroupListener()); + + int replica2Port = getNextAvailable(replica1Port + 1); + String replica2NodeName = TEST_NODE_NAME + "_2"; + String replica2NodeHostPort = "localhost:" + replica2Port; + ReplicatedEnvironmentFacade replica2 = createReplica(replica2NodeName, replica2NodeHostPort, new NoopReplicationGroupListener()); + + String databaseName = "test"; + + DatabaseConfig dbConfig = createDatabase(master, databaseName); + + // close replicas + replica1.close(); + replica2.close(); + + Environment e = master.getEnvironment(); + master.getOpenDatabase(databaseName); + try + { + master.openDatabases(dbConfig, "test2"); + fail("Opening of new database without quorum should fail"); + } + catch(InsufficientReplicasException ex) + { + master.handleDatabaseException(null, ex); + } + + EnumSet<State> states = EnumSet.of(State.MASTER, State.REPLICA); + replica1 = createReplica(replica1NodeName, replica1NodeHostPort, new TestStateChangeListener(states), new NoopReplicationGroupListener()); + replica2 = createReplica(replica2NodeName, replica2NodeHostPort, new TestStateChangeListener(states), new NoopReplicationGroupListener()); + + // Need to poll to await the remote node updating itself + long timeout = System.currentTimeMillis() + 5000; + while(!(State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) ) && System.currentTimeMillis() < timeout) + { + Thread.sleep(200); + } + + assertTrue("The node could not rejoin the cluster. State is " + master.getNodeState(), + State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) ); + + Environment e2 = master.getEnvironment(); + assertNotSame("Environment has not been restarted", e2, e); + } + public void testEnvironmentAutomaticallyRestartsAndBecomesUnknownOnInsufficientReplicas() throws Exception { final CountDownLatch masterLatch = new CountDownLatch(1); @@ -244,7 +546,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase } }; - addNode(State.MASTER, stateChangeListener); + addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener()); assertTrue("Master was not started", masterLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); @@ -252,8 +554,8 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase int replica2Port = getNextAvailable(replica1Port + 1); String node2NodeHostPort = "localhost:" + replica2Port; - ReplicatedEnvironmentFacade replica1 = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort); - ReplicatedEnvironmentFacade replica2 = createReplica(TEST_NODE_NAME + "_2", node2NodeHostPort); + ReplicatedEnvironmentFacade replica1 = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener()); + ReplicatedEnvironmentFacade replica2 = createReplica(TEST_NODE_NAME + "_2", node2NodeHostPort, new NoopReplicationGroupListener()); // close replicas replica1.close(); @@ -266,15 +568,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase assertEquals("Node made unknown an unexpected number of times", 1, unknownStateChangeCount.get()); } - public void testCloseStateTransitions() throws Exception - { - ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster(); - - assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, replicatedEnvironmentFacade.getFacadeState()); - replicatedEnvironmentFacade.close(); - assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState()); - } - public void testTransferMasterToSelf() throws Exception { final CountDownLatch firstNodeReplicaStateLatch = new CountDownLatch(1); @@ -295,12 +588,12 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase } } }; - ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener); + ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener()); assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS)); int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); String node1NodeHostPort = "localhost:" + replica1Port; - ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort); + ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener()); assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState()); int replica2Port = getNextAvailable(replica1Port + 1); @@ -323,7 +616,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase } } }; - ReplicatedEnvironmentFacade thirdNode = addNode(TEST_NODE_NAME + "_2", node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener); + ReplicatedEnvironmentFacade thirdNode = addNode(TEST_NODE_NAME + "_2", node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, new NoopReplicationGroupListener()); assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS)); assertEquals(3, thirdNode.getNumberOfElectableGroupMembers()); @@ -353,12 +646,12 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase } } }; - ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener); + ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener()); assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS)); int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); String node1NodeHostPort = "localhost:" + replica1Port; - ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort); + ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener()); assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState()); int replica2Port = getNextAvailable(replica1Port + 1); @@ -382,7 +675,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase } }; String thirdNodeName = TEST_NODE_NAME + "_2"; - ReplicatedEnvironmentFacade thirdNode = addNode(thirdNodeName, node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener); + ReplicatedEnvironmentFacade thirdNode = addNode(thirdNodeName, node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, new NoopReplicationGroupListener()); assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS)); assertEquals(3, thirdNode.getNumberOfElectableGroupMembers()); @@ -394,34 +687,56 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase private ReplicatedEnvironmentFacade createMaster() throws Exception { + return createMaster(new NoopReplicationGroupListener()); + } + + private ReplicatedEnvironmentFacade createMaster(ReplicationGroupListener replicationGroupListener) throws Exception + { TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); - ReplicatedEnvironmentFacade env = addNode(State.MASTER, stateChangeListener); + ReplicatedEnvironmentFacade env = addNode(State.MASTER, stateChangeListener, replicationGroupListener); assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); return env; } - private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort) throws Exception + private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort, ReplicationGroupListener replicationGroupListener) throws Exception { TestStateChangeListener testStateChangeListener = new TestStateChangeListener(State.REPLICA); - ReplicatedEnvironmentFacade replicaEnvironmentFacade = addNode(nodeName, nodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener); + return createReplica(nodeName, nodeHostPort, testStateChangeListener, replicationGroupListener); + } + + private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort, + TestStateChangeListener testStateChangeListener, ReplicationGroupListener replicationGroupListener) + throws InterruptedException + { + ReplicatedEnvironmentFacade replicaEnvironmentFacade = addNode(nodeName, nodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, replicationGroupListener); boolean awaitForStateChange = testStateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS); assertTrue("Replica " + nodeName + " did not go into desired state; current actual state is " + testStateChangeListener.getCurrentActualState(), awaitForStateChange); return replicaEnvironmentFacade; } private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary, - State desiredState, StateChangeListener stateChangeListener) + State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener) { ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary); ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config, null); ref.setStateChangeListener(stateChangeListener); + ref.setReplicationGroupListener(replicationGroupListener); _nodes.put(nodeName, ref); return ref; } - private ReplicatedEnvironmentFacade addNode(State desiredState, StateChangeListener stateChangeListener) + private ReplicatedEnvironmentFacade addNode(State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener) + { + return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener, replicationGroupListener); + } + + private DatabaseConfig createDatabase(ReplicatedEnvironmentFacade environmentFacade, String databaseName) { - return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener); + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + environmentFacade.openDatabases(dbConfig, databaseName); + return dbConfig; } private ReplicatedEnvironmentConfiguration createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, boolean designatedPrimary) @@ -444,4 +759,29 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase when(node.getStorePath()).thenReturn(new File(_storePath, nodeName).getAbsolutePath()); return node; } + + class NoopReplicationGroupListener implements ReplicationGroupListener + { + + @Override + public void onReplicationNodeAddedToGroup(ReplicationNode node) + { + } + + @Override + public void onReplicationNodeRecovered(ReplicationNode node) + { + } + + @Override + public void onReplicationNodeRemovedFromGroup(ReplicationNode node) + { + } + + @Override + public void onNodeState(ReplicationNode node, NodeState nodeState) + { + } + + } } |
