summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src/test
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2014-04-30 13:08:49 +0000
committerAlex Rudyy <orudyy@apache.org>2014-04-30 13:08:49 +0000
commitaeb2793707799a2d459888fc0ed0dd9faf1c7c78 (patch)
treec921c90de8252dc3ed3d5ec8e47bcdd42c73cf9c /qpid/java/bdbstore/src/test
parent472d84f65a13f2882252cf85fbfb99457294518f (diff)
downloadqpid-python-aeb2793707799a2d459888fc0ed0dd9faf1c7c78.tar.gz
QPID-5715,QPID-5412: Add remote replication nodes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1591281 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/test')
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java105
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java390
2 files changed, 467 insertions, 28 deletions
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
index d296e9c2b7..826694904c 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.store.berkeleydb;
import static org.mockito.Mockito.when;
import java.io.File;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -39,11 +40,13 @@ import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.model.RemoteReplicationNode;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.util.BrokerTestHelper;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
@@ -159,8 +162,11 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
@Override
public void childAdded(ConfiguredObject object, ConfiguredObject child)
{
- child.addChangeListener(this);
- virtualHostAddedLatch.countDown();
+ if (child instanceof VirtualHost)
+ {
+ child.addChangeListener(this);
+ virtualHostAddedLatch.countDown();
+ }
}
@Override
@@ -314,7 +320,100 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
while(!"MASTER".equals(replica.getRole()))
{
Thread.sleep(100);
- if (awaitMastershipCount > 20)
+ if (awaitMastershipCount > 50)
+ {
+ fail("Replica did not assume master role");
+ }
+ awaitMastershipCount++;
+ }
+ }
+
+
+ public void testTransferMasterToReplica() throws Exception
+ {
+ int node1PortNumber = findFreePort();
+ String helperAddress = "localhost:" + node1PortNumber;
+ String groupName = "group";
+
+ Map<String, Object> node1Attributes = new HashMap<String, Object>();
+ node1Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
+ node1Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+ node1Attributes.put(BDBHAVirtualHostNode.NAME, "node1");
+ node1Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
+ node1Attributes.put(BDBHAVirtualHostNode.ADDRESS, helperAddress);
+ node1Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress);
+ node1Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "1");
+
+ BDBHAVirtualHostNode<?> node1 = createHaVHN(node1Attributes);
+ assertEquals("Failed to activate node", State.ACTIVE, node1.setDesiredState(node1.getState(), State.ACTIVE));
+
+ final CountDownLatch remoteNodeLatch = new CountDownLatch(2);
+ node1.addChangeListener(new ConfigurationChangeListener()
+ {
+ @Override
+ public void stateChanged(ConfiguredObject object, State oldState, State newState)
+ {
+ }
+
+ @Override
+ public void childRemoved(ConfiguredObject object, ConfiguredObject child)
+ {
+ }
+
+ @Override
+ public void childAdded(ConfiguredObject object, ConfiguredObject child)
+ {
+ if (child instanceof RemoteReplicationNode)
+ {
+ remoteNodeLatch.countDown();
+ }
+ }
+
+ @Override
+ public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue,
+ Object newAttributeValue)
+ {
+ }
+ });
+
+ int node2PortNumber = getNextAvailable(node1PortNumber+1);
+
+ Map<String, Object> node2Attributes = new HashMap<String, Object>();
+ node2Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
+ node2Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+ node2Attributes.put(BDBHAVirtualHostNode.NAME, "node2");
+ node2Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
+ node2Attributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + node2PortNumber);
+ node2Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress);
+ node2Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "2");
+
+ BDBHAVirtualHostNode<?> node2 = createHaVHN(node2Attributes);
+ assertEquals("Failed to activate node2", State.ACTIVE, node2.setDesiredState(node2.getState(), State.ACTIVE));
+
+ int node3PortNumber = getNextAvailable(node2PortNumber+1);
+ Map<String, Object> node3Attributes = new HashMap<String, Object>();
+ node3Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
+ node3Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+ node3Attributes.put(BDBHAVirtualHostNode.NAME, "node3");
+ node3Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
+ node3Attributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + node3PortNumber);
+ node3Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress);
+ node3Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "3");
+ BDBHAVirtualHostNode<?> node3 = createHaVHN(node3Attributes);
+ assertEquals("Failed to activate node3", State.ACTIVE, node3.setDesiredState(node3.getState(), State.ACTIVE));
+
+ assertTrue("Replication nodes have not been seen during 5s", remoteNodeLatch.await(5, TimeUnit.SECONDS));
+
+ Collection<? extends RemoteReplicationNode> remoteNodes = node1.getRemoteReplicationNodes();
+ RemoteReplicationNode replicaRemoteNode = remoteNodes.iterator().next();
+ replicaRemoteNode.setAttribute(BDBHARemoteReplicationNode.ROLE, "REPLICA", "MASTER");
+
+ BDBHAVirtualHostNode<?> replica = replicaRemoteNode.getName().equals(node2.getName())? node2 : node3;
+ int awaitMastershipCount = 0;
+ while(!"MASTER".equals(replica.getRole()))
+ {
+ Thread.sleep(100);
+ if (awaitMastershipCount > 50)
{
fail("Replica did not assume master role");
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index 1becfe3ede..7f6083a0c9 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -24,12 +24,14 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -40,15 +42,17 @@ import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.Durability;
import com.sleepycat.je.Environment;
+import com.sleepycat.je.rep.InsufficientReplicasException;
+import com.sleepycat.je.rep.NodeState;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicatedEnvironment.State;
import com.sleepycat.je.rep.ReplicationConfig;
+import com.sleepycat.je.rep.ReplicationNode;
import com.sleepycat.je.rep.StateChangeEvent;
import com.sleepycat.je.rep.StateChangeListener;
public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
{
-
private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort();
private static final int LISTENER_TIMEOUT = 5;
private static final int WAIT_STATE_CHANGE_TIMEOUT = 30;
@@ -220,6 +224,304 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
assertEquals("Unexpected Electable Group Size Override after change", TEST_ELECTABLE_GROUP_OVERRIDE + 1, facade.getElectableGroupSizeOverride());
}
+ public void testReplicationGroupListenerHearsAboutExistingRemoteReplicationNodes() throws Exception
+ {
+ ReplicatedEnvironmentFacade master = createMaster();
+ String nodeName2 = TEST_NODE_NAME + "_2";
+ String host = "localhost";
+ int port = getNextAvailable(TEST_NODE_PORT + 1);
+ String node2NodeHostPort = host + ":" + port;
+
+ final AtomicInteger invocationCount = new AtomicInteger();
+ final CountDownLatch nodeRecoveryLatch = new CountDownLatch(1);
+ ReplicationGroupListener listener = new NoopReplicationGroupListener()
+ {
+ @Override
+ public void onReplicationNodeRecovered(ReplicationNode node)
+ {
+ nodeRecoveryLatch.countDown();
+ invocationCount.incrementAndGet();
+ }
+ };
+
+ createReplica(nodeName2, node2NodeHostPort, listener);
+
+ assertEquals("Unexpected number of nodes", 2, master.getNumberOfElectableGroupMembers());
+
+ assertTrue("Listener not fired within timeout", nodeRecoveryLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+ assertEquals("Unexpected number of listener invocations", 1, invocationCount.get());
+ }
+
+ public void testReplicationGroupListenerHearsNodeAdded() throws Exception
+ {
+ final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
+ final AtomicInteger invocationCount = new AtomicInteger();
+ ReplicationGroupListener listener = new NoopReplicationGroupListener()
+ {
+ @Override
+ public void onReplicationNodeAddedToGroup(ReplicationNode node)
+ {
+ invocationCount.getAndIncrement();
+ nodeAddedLatch.countDown();
+ }
+ };
+
+ TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
+ ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener);
+ assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+ assertEquals("Unexpected number of nodes at start of test", 1, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
+
+ String node2Name = TEST_NODE_NAME + "_2";
+ String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
+ createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener());
+
+ assertTrue("Listener not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+ assertEquals("Unexpected number of nodes", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
+
+ assertEquals("Unexpected number of listener invocations", 1, invocationCount.get());
+ }
+
+ public void testReplicationGroupListenerHearsNodeRemoved() throws Exception
+ {
+ final CountDownLatch nodeDeletedLatch = new CountDownLatch(1);
+ final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
+ final AtomicInteger invocationCount = new AtomicInteger();
+ ReplicationGroupListener listener = new NoopReplicationGroupListener()
+ {
+ @Override
+ public void onReplicationNodeRecovered(ReplicationNode node)
+ {
+ nodeAddedLatch.countDown();
+ }
+
+ @Override
+ public void onReplicationNodeAddedToGroup(ReplicationNode node)
+ {
+ nodeAddedLatch.countDown();
+ }
+
+ @Override
+ public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
+ {
+ invocationCount.getAndIncrement();
+ nodeDeletedLatch.countDown();
+ }
+ };
+
+ TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
+ ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener);
+ assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+ String node2Name = TEST_NODE_NAME + "_2";
+ String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
+ createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener());
+
+ assertEquals("Unexpected number of nodes at start of test", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
+
+ // Need to await the listener hearing the addition of the node to the model.
+ assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+ // Now remove the node and ensure we hear the event
+ replicatedEnvironmentFacade.removeNodeFromGroup(node2Name);
+
+ assertTrue("Node delete not fired within timeout", nodeDeletedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+ assertEquals("Unexpected number of nodes after node removal", 1, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
+
+ assertEquals("Unexpected number of listener invocations", 1, invocationCount.get());
+ }
+
+ public void testMasterHearsRemoteNodeRoles() throws Exception
+ {
+ final String node2Name = TEST_NODE_NAME + "_2";
+ final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
+ final AtomicReference<ReplicationNode> nodeRef = new AtomicReference<ReplicationNode>();
+ final CountDownLatch stateLatch = new CountDownLatch(1);
+ final AtomicReference<NodeState> stateRef = new AtomicReference<NodeState>();
+ ReplicationGroupListener listener = new NoopReplicationGroupListener()
+ {
+ @Override
+ public void onReplicationNodeAddedToGroup(ReplicationNode node)
+ {
+ nodeRef.set(node);
+ nodeAddedLatch.countDown();
+ }
+
+ @Override
+ public void onNodeState(ReplicationNode node, NodeState nodeState)
+ {
+ if (node2Name.equals(node.getName()))
+ {
+ stateRef.set(nodeState);
+ stateLatch.countDown();
+ }
+ }
+ };
+
+ TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
+ ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener);
+ assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+ String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
+ createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener());
+
+ assertEquals("Unexpected number of nodes at start of test", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
+
+ assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+ ReplicationNode remoteNode = (ReplicationNode)nodeRef.get();
+ assertEquals("Unexpcted node name", node2Name, remoteNode.getName());
+
+ assertTrue("Node state not fired within timeout", stateLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+ assertEquals("Unexpcted node state", State.REPLICA, stateRef.get().getNodeState());
+ }
+
+ public void testRemoveNodeFromGroup() throws Exception
+ {
+ ReplicatedEnvironmentFacade environmentFacade = createMaster();
+
+ String node2Name = TEST_NODE_NAME + "_2";
+ String node2NodeHostPort = "localhost:" + getNextAvailable(TEST_NODE_PORT + 1);
+ ReplicatedEnvironmentFacade ref2 = createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener());
+
+ assertEquals("Unexpected group members count", 2, environmentFacade.getNumberOfElectableGroupMembers());
+ ref2.close();
+
+ environmentFacade.removeNodeFromGroup(node2Name);
+ assertEquals("Unexpected group members count", 1, environmentFacade.getNumberOfElectableGroupMembers());
+ }
+
+
+ public void testEnvironmentFacadeDetectsRemovalOfRemoteNode() throws Exception
+ {
+ final String replicaName = TEST_NODE_NAME + "_1";
+ final CountDownLatch nodeRemovedLatch = new CountDownLatch(1);
+ final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
+ final AtomicReference<ReplicationNode> addedNodeRef = new AtomicReference<ReplicationNode>();
+ final AtomicReference<ReplicationNode> removedNodeRef = new AtomicReference<ReplicationNode>();
+ final CountDownLatch stateLatch = new CountDownLatch(1);
+ final AtomicReference<NodeState> stateRef = new AtomicReference<NodeState>();
+
+ ReplicationGroupListener listener = new NoopReplicationGroupListener()
+ {
+ @Override
+ public void onReplicationNodeAddedToGroup(ReplicationNode node)
+ {
+ if (addedNodeRef.compareAndSet(null, node))
+ {
+ nodeAddedLatch.countDown();
+ }
+ }
+
+ @Override
+ public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
+ {
+ removedNodeRef.set(node);
+ nodeRemovedLatch.countDown();
+ }
+
+ @Override
+ public void onNodeState(ReplicationNode node, NodeState nodeState)
+ {
+ if (replicaName.equals(node.getName()))
+ {
+ stateRef.set(nodeState);
+ stateLatch.countDown();
+ }
+ }
+ };
+
+ TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
+ final ReplicatedEnvironmentFacade masterEnvironment = addNode(State.MASTER, stateChangeListener, listener);
+ assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+ masterEnvironment.setDesignatedPrimary(true);
+
+ int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+ String node1NodeHostPort = "localhost:" + replica1Port;
+
+ ReplicatedEnvironmentFacade replica = createReplica(replicaName, node1NodeHostPort, new NoopReplicationGroupListener());
+
+ assertTrue("Node should be added", nodeAddedLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS));
+
+ ReplicationNode node = addedNodeRef.get();
+ assertEquals("Unexpected node name", replicaName, node.getName());
+
+ assertTrue("Node state was not heared", stateLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS));
+ assertEquals("Unexpected node role", State.REPLICA, stateRef.get().getNodeState());
+ assertEquals("Unexpected node name", replicaName, stateRef.get().getNodeName());
+
+ replica.close();
+ masterEnvironment.removeNodeFromGroup(node.getName());
+
+ assertTrue("Node deleting is undetected by the environment facade", nodeRemovedLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS));
+ assertEquals("Unexpected node is deleted", node, removedNodeRef.get());
+ }
+
+ public void testCloseStateTransitions() throws Exception
+ {
+ ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster();
+
+ assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, replicatedEnvironmentFacade.getFacadeState());
+ replicatedEnvironmentFacade.close();
+ assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState());
+ }
+
+ public void testEnvironmentRestartOnInsufficientReplicas() throws Exception
+ {
+
+ ReplicatedEnvironmentFacade master = createMaster();
+
+ int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+ String replica1NodeName = TEST_NODE_NAME + "_1";
+ String replica1NodeHostPort = "localhost:" + replica1Port;
+ ReplicatedEnvironmentFacade replica1 = createReplica(replica1NodeName, replica1NodeHostPort, new NoopReplicationGroupListener());
+
+ int replica2Port = getNextAvailable(replica1Port + 1);
+ String replica2NodeName = TEST_NODE_NAME + "_2";
+ String replica2NodeHostPort = "localhost:" + replica2Port;
+ ReplicatedEnvironmentFacade replica2 = createReplica(replica2NodeName, replica2NodeHostPort, new NoopReplicationGroupListener());
+
+ String databaseName = "test";
+
+ DatabaseConfig dbConfig = createDatabase(master, databaseName);
+
+ // close replicas
+ replica1.close();
+ replica2.close();
+
+ Environment e = master.getEnvironment();
+ master.getOpenDatabase(databaseName);
+ try
+ {
+ master.openDatabases(dbConfig, "test2");
+ fail("Opening of new database without quorum should fail");
+ }
+ catch(InsufficientReplicasException ex)
+ {
+ master.handleDatabaseException(null, ex);
+ }
+
+ EnumSet<State> states = EnumSet.of(State.MASTER, State.REPLICA);
+ replica1 = createReplica(replica1NodeName, replica1NodeHostPort, new TestStateChangeListener(states), new NoopReplicationGroupListener());
+ replica2 = createReplica(replica2NodeName, replica2NodeHostPort, new TestStateChangeListener(states), new NoopReplicationGroupListener());
+
+ // Need to poll to await the remote node updating itself
+ long timeout = System.currentTimeMillis() + 5000;
+ while(!(State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) ) && System.currentTimeMillis() < timeout)
+ {
+ Thread.sleep(200);
+ }
+
+ assertTrue("The node could not rejoin the cluster. State is " + master.getNodeState(),
+ State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) );
+
+ Environment e2 = master.getEnvironment();
+ assertNotSame("Environment has not been restarted", e2, e);
+ }
+
public void testEnvironmentAutomaticallyRestartsAndBecomesUnknownOnInsufficientReplicas() throws Exception
{
final CountDownLatch masterLatch = new CountDownLatch(1);
@@ -244,7 +546,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
}
};
- addNode(State.MASTER, stateChangeListener);
+ addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener());
assertTrue("Master was not started", masterLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
@@ -252,8 +554,8 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
int replica2Port = getNextAvailable(replica1Port + 1);
String node2NodeHostPort = "localhost:" + replica2Port;
- ReplicatedEnvironmentFacade replica1 = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort);
- ReplicatedEnvironmentFacade replica2 = createReplica(TEST_NODE_NAME + "_2", node2NodeHostPort);
+ ReplicatedEnvironmentFacade replica1 = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener());
+ ReplicatedEnvironmentFacade replica2 = createReplica(TEST_NODE_NAME + "_2", node2NodeHostPort, new NoopReplicationGroupListener());
// close replicas
replica1.close();
@@ -266,15 +568,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
assertEquals("Node made unknown an unexpected number of times", 1, unknownStateChangeCount.get());
}
- public void testCloseStateTransitions() throws Exception
- {
- ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster();
-
- assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, replicatedEnvironmentFacade.getFacadeState());
- replicatedEnvironmentFacade.close();
- assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState());
- }
-
public void testTransferMasterToSelf() throws Exception
{
final CountDownLatch firstNodeReplicaStateLatch = new CountDownLatch(1);
@@ -295,12 +588,12 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
}
}
};
- ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener);
+ ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener());
assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS));
int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
String node1NodeHostPort = "localhost:" + replica1Port;
- ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort);
+ ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener());
assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState());
int replica2Port = getNextAvailable(replica1Port + 1);
@@ -323,7 +616,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
}
}
};
- ReplicatedEnvironmentFacade thirdNode = addNode(TEST_NODE_NAME + "_2", node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener);
+ ReplicatedEnvironmentFacade thirdNode = addNode(TEST_NODE_NAME + "_2", node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, new NoopReplicationGroupListener());
assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS));
assertEquals(3, thirdNode.getNumberOfElectableGroupMembers());
@@ -353,12 +646,12 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
}
}
};
- ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener);
+ ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener());
assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS));
int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
String node1NodeHostPort = "localhost:" + replica1Port;
- ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort);
+ ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener());
assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState());
int replica2Port = getNextAvailable(replica1Port + 1);
@@ -382,7 +675,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
}
};
String thirdNodeName = TEST_NODE_NAME + "_2";
- ReplicatedEnvironmentFacade thirdNode = addNode(thirdNodeName, node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener);
+ ReplicatedEnvironmentFacade thirdNode = addNode(thirdNodeName, node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, new NoopReplicationGroupListener());
assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS));
assertEquals(3, thirdNode.getNumberOfElectableGroupMembers());
@@ -394,34 +687,56 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
private ReplicatedEnvironmentFacade createMaster() throws Exception
{
+ return createMaster(new NoopReplicationGroupListener());
+ }
+
+ private ReplicatedEnvironmentFacade createMaster(ReplicationGroupListener replicationGroupListener) throws Exception
+ {
TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
- ReplicatedEnvironmentFacade env = addNode(State.MASTER, stateChangeListener);
+ ReplicatedEnvironmentFacade env = addNode(State.MASTER, stateChangeListener, replicationGroupListener);
assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
return env;
}
- private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort) throws Exception
+ private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort, ReplicationGroupListener replicationGroupListener) throws Exception
{
TestStateChangeListener testStateChangeListener = new TestStateChangeListener(State.REPLICA);
- ReplicatedEnvironmentFacade replicaEnvironmentFacade = addNode(nodeName, nodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener);
+ return createReplica(nodeName, nodeHostPort, testStateChangeListener, replicationGroupListener);
+ }
+
+ private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort,
+ TestStateChangeListener testStateChangeListener, ReplicationGroupListener replicationGroupListener)
+ throws InterruptedException
+ {
+ ReplicatedEnvironmentFacade replicaEnvironmentFacade = addNode(nodeName, nodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, replicationGroupListener);
boolean awaitForStateChange = testStateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS);
assertTrue("Replica " + nodeName + " did not go into desired state; current actual state is " + testStateChangeListener.getCurrentActualState(), awaitForStateChange);
return replicaEnvironmentFacade;
}
private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary,
- State desiredState, StateChangeListener stateChangeListener)
+ State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener)
{
ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary);
ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config, null);
ref.setStateChangeListener(stateChangeListener);
+ ref.setReplicationGroupListener(replicationGroupListener);
_nodes.put(nodeName, ref);
return ref;
}
- private ReplicatedEnvironmentFacade addNode(State desiredState, StateChangeListener stateChangeListener)
+ private ReplicatedEnvironmentFacade addNode(State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener)
+ {
+ return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener, replicationGroupListener);
+ }
+
+ private DatabaseConfig createDatabase(ReplicatedEnvironmentFacade environmentFacade, String databaseName)
{
- return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener);
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ environmentFacade.openDatabases(dbConfig, databaseName);
+ return dbConfig;
}
private ReplicatedEnvironmentConfiguration createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, boolean designatedPrimary)
@@ -444,4 +759,29 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
when(node.getStorePath()).thenReturn(new File(_storePath, nodeName).getAbsolutePath());
return node;
}
+
+ class NoopReplicationGroupListener implements ReplicationGroupListener
+ {
+
+ @Override
+ public void onReplicationNodeAddedToGroup(ReplicationNode node)
+ {
+ }
+
+ @Override
+ public void onReplicationNodeRecovered(ReplicationNode node)
+ {
+ }
+
+ @Override
+ public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
+ {
+ }
+
+ @Override
+ public void onNodeState(ReplicationNode node, NodeState nodeState)
+ {
+ }
+
+ }
}