diff options
| author | Alex Rudyy <orudyy@apache.org> | 2014-04-30 13:08:49 +0000 |
|---|---|---|
| committer | Alex Rudyy <orudyy@apache.org> | 2014-04-30 13:08:49 +0000 |
| commit | aeb2793707799a2d459888fc0ed0dd9faf1c7c78 (patch) | |
| tree | c921c90de8252dc3ed3d5ec8e47bcdd42c73cf9c /qpid/java/bdbstore/src/test | |
| parent | 472d84f65a13f2882252cf85fbfb99457294518f (diff) | |
| download | qpid-python-aeb2793707799a2d459888fc0ed0dd9faf1c7c78.tar.gz | |
QPID-5715,QPID-5412: Add remote replication nodes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1591281 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/test')
2 files changed, 467 insertions, 28 deletions
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java index d296e9c2b7..826694904c 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.store.berkeleydb; import static org.mockito.Mockito.when; import java.io.File; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -39,11 +40,13 @@ import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFactory; +import org.apache.qpid.server.model.RemoteReplicationNode; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode; import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; @@ -159,8 +162,11 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase @Override public void childAdded(ConfiguredObject object, ConfiguredObject child) { - child.addChangeListener(this); - virtualHostAddedLatch.countDown(); + if (child instanceof VirtualHost) + { + child.addChangeListener(this); + virtualHostAddedLatch.countDown(); + } } @Override @@ -314,7 +320,100 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase while(!"MASTER".equals(replica.getRole())) { Thread.sleep(100); - if (awaitMastershipCount > 20) + if (awaitMastershipCount > 50) + { + fail("Replica did not assume master role"); + } + awaitMastershipCount++; + } + } + + + public void testTransferMasterToReplica() throws Exception + { + int node1PortNumber = findFreePort(); + String helperAddress = "localhost:" + node1PortNumber; + String groupName = "group"; + + Map<String, Object> node1Attributes = new HashMap<String, Object>(); + node1Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID()); + node1Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); + node1Attributes.put(BDBHAVirtualHostNode.NAME, "node1"); + node1Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName); + node1Attributes.put(BDBHAVirtualHostNode.ADDRESS, helperAddress); + node1Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress); + node1Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "1"); + + BDBHAVirtualHostNode<?> node1 = createHaVHN(node1Attributes); + assertEquals("Failed to activate node", State.ACTIVE, node1.setDesiredState(node1.getState(), State.ACTIVE)); + + final CountDownLatch remoteNodeLatch = new CountDownLatch(2); + node1.addChangeListener(new ConfigurationChangeListener() + { + @Override + public void stateChanged(ConfiguredObject object, State oldState, State newState) + { + } + + @Override + public void childRemoved(ConfiguredObject object, ConfiguredObject child) + { + } + + @Override + public void childAdded(ConfiguredObject object, ConfiguredObject child) + { + if (child instanceof RemoteReplicationNode) + { + remoteNodeLatch.countDown(); + } + } + + @Override + public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, + Object newAttributeValue) + { + } + }); + + int node2PortNumber = getNextAvailable(node1PortNumber+1); + + Map<String, Object> node2Attributes = new HashMap<String, Object>(); + node2Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID()); + node2Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); + node2Attributes.put(BDBHAVirtualHostNode.NAME, "node2"); + node2Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName); + node2Attributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + node2PortNumber); + node2Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress); + node2Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "2"); + + BDBHAVirtualHostNode<?> node2 = createHaVHN(node2Attributes); + assertEquals("Failed to activate node2", State.ACTIVE, node2.setDesiredState(node2.getState(), State.ACTIVE)); + + int node3PortNumber = getNextAvailable(node2PortNumber+1); + Map<String, Object> node3Attributes = new HashMap<String, Object>(); + node3Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID()); + node3Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); + node3Attributes.put(BDBHAVirtualHostNode.NAME, "node3"); + node3Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName); + node3Attributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + node3PortNumber); + node3Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress); + node3Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "3"); + BDBHAVirtualHostNode<?> node3 = createHaVHN(node3Attributes); + assertEquals("Failed to activate node3", State.ACTIVE, node3.setDesiredState(node3.getState(), State.ACTIVE)); + + assertTrue("Replication nodes have not been seen during 5s", remoteNodeLatch.await(5, TimeUnit.SECONDS)); + + Collection<? extends RemoteReplicationNode> remoteNodes = node1.getRemoteReplicationNodes(); + RemoteReplicationNode replicaRemoteNode = remoteNodes.iterator().next(); + replicaRemoteNode.setAttribute(BDBHARemoteReplicationNode.ROLE, "REPLICA", "MASTER"); + + BDBHAVirtualHostNode<?> replica = replicaRemoteNode.getName().equals(node2.getName())? node2 : node3; + int awaitMastershipCount = 0; + while(!"MASTER".equals(replica.getRole())) + { + Thread.sleep(100); + if (awaitMastershipCount > 50) { fail("Replica did not assume master role"); } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index 1becfe3ede..7f6083a0c9 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -24,12 +24,14 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.File; +import java.util.EnumSet; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; import org.apache.qpid.test.utils.QpidTestCase; @@ -40,15 +42,17 @@ import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.Durability; import com.sleepycat.je.Environment; +import com.sleepycat.je.rep.InsufficientReplicasException; +import com.sleepycat.je.rep.NodeState; import com.sleepycat.je.rep.ReplicatedEnvironment; import com.sleepycat.je.rep.ReplicatedEnvironment.State; import com.sleepycat.je.rep.ReplicationConfig; +import com.sleepycat.je.rep.ReplicationNode; import com.sleepycat.je.rep.StateChangeEvent; import com.sleepycat.je.rep.StateChangeListener; public class ReplicatedEnvironmentFacadeTest extends QpidTestCase { - private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort(); private static final int LISTENER_TIMEOUT = 5; private static final int WAIT_STATE_CHANGE_TIMEOUT = 30; @@ -220,6 +224,304 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase assertEquals("Unexpected Electable Group Size Override after change", TEST_ELECTABLE_GROUP_OVERRIDE + 1, facade.getElectableGroupSizeOverride()); } + public void testReplicationGroupListenerHearsAboutExistingRemoteReplicationNodes() throws Exception + { + ReplicatedEnvironmentFacade master = createMaster(); + String nodeName2 = TEST_NODE_NAME + "_2"; + String host = "localhost"; + int port = getNextAvailable(TEST_NODE_PORT + 1); + String node2NodeHostPort = host + ":" + port; + + final AtomicInteger invocationCount = new AtomicInteger(); + final CountDownLatch nodeRecoveryLatch = new CountDownLatch(1); + ReplicationGroupListener listener = new NoopReplicationGroupListener() + { + @Override + public void onReplicationNodeRecovered(ReplicationNode node) + { + nodeRecoveryLatch.countDown(); + invocationCount.incrementAndGet(); + } + }; + + createReplica(nodeName2, node2NodeHostPort, listener); + + assertEquals("Unexpected number of nodes", 2, master.getNumberOfElectableGroupMembers()); + + assertTrue("Listener not fired within timeout", nodeRecoveryLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + assertEquals("Unexpected number of listener invocations", 1, invocationCount.get()); + } + + public void testReplicationGroupListenerHearsNodeAdded() throws Exception + { + final CountDownLatch nodeAddedLatch = new CountDownLatch(1); + final AtomicInteger invocationCount = new AtomicInteger(); + ReplicationGroupListener listener = new NoopReplicationGroupListener() + { + @Override + public void onReplicationNodeAddedToGroup(ReplicationNode node) + { + invocationCount.getAndIncrement(); + nodeAddedLatch.countDown(); + } + }; + + TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); + ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener); + assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + assertEquals("Unexpected number of nodes at start of test", 1, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers()); + + String node2Name = TEST_NODE_NAME + "_2"; + String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1); + createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener()); + + assertTrue("Listener not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + assertEquals("Unexpected number of nodes", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers()); + + assertEquals("Unexpected number of listener invocations", 1, invocationCount.get()); + } + + public void testReplicationGroupListenerHearsNodeRemoved() throws Exception + { + final CountDownLatch nodeDeletedLatch = new CountDownLatch(1); + final CountDownLatch nodeAddedLatch = new CountDownLatch(1); + final AtomicInteger invocationCount = new AtomicInteger(); + ReplicationGroupListener listener = new NoopReplicationGroupListener() + { + @Override + public void onReplicationNodeRecovered(ReplicationNode node) + { + nodeAddedLatch.countDown(); + } + + @Override + public void onReplicationNodeAddedToGroup(ReplicationNode node) + { + nodeAddedLatch.countDown(); + } + + @Override + public void onReplicationNodeRemovedFromGroup(ReplicationNode node) + { + invocationCount.getAndIncrement(); + nodeDeletedLatch.countDown(); + } + }; + + TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); + ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener); + assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + String node2Name = TEST_NODE_NAME + "_2"; + String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1); + createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener()); + + assertEquals("Unexpected number of nodes at start of test", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers()); + + // Need to await the listener hearing the addition of the node to the model. + assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + // Now remove the node and ensure we hear the event + replicatedEnvironmentFacade.removeNodeFromGroup(node2Name); + + assertTrue("Node delete not fired within timeout", nodeDeletedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + assertEquals("Unexpected number of nodes after node removal", 1, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers()); + + assertEquals("Unexpected number of listener invocations", 1, invocationCount.get()); + } + + public void testMasterHearsRemoteNodeRoles() throws Exception + { + final String node2Name = TEST_NODE_NAME + "_2"; + final CountDownLatch nodeAddedLatch = new CountDownLatch(1); + final AtomicReference<ReplicationNode> nodeRef = new AtomicReference<ReplicationNode>(); + final CountDownLatch stateLatch = new CountDownLatch(1); + final AtomicReference<NodeState> stateRef = new AtomicReference<NodeState>(); + ReplicationGroupListener listener = new NoopReplicationGroupListener() + { + @Override + public void onReplicationNodeAddedToGroup(ReplicationNode node) + { + nodeRef.set(node); + nodeAddedLatch.countDown(); + } + + @Override + public void onNodeState(ReplicationNode node, NodeState nodeState) + { + if (node2Name.equals(node.getName())) + { + stateRef.set(nodeState); + stateLatch.countDown(); + } + } + }; + + TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); + ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener); + assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1); + createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener()); + + assertEquals("Unexpected number of nodes at start of test", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers()); + + assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + ReplicationNode remoteNode = (ReplicationNode)nodeRef.get(); + assertEquals("Unexpcted node name", node2Name, remoteNode.getName()); + + assertTrue("Node state not fired within timeout", stateLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + assertEquals("Unexpcted node state", State.REPLICA, stateRef.get().getNodeState()); + } + + public void testRemoveNodeFromGroup() throws Exception + { + ReplicatedEnvironmentFacade environmentFacade = createMaster(); + + String node2Name = TEST_NODE_NAME + "_2"; + String node2NodeHostPort = "localhost:" + getNextAvailable(TEST_NODE_PORT + 1); + ReplicatedEnvironmentFacade ref2 = createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener()); + + assertEquals("Unexpected group members count", 2, environmentFacade.getNumberOfElectableGroupMembers()); + ref2.close(); + + environmentFacade.removeNodeFromGroup(node2Name); + assertEquals("Unexpected group members count", 1, environmentFacade.getNumberOfElectableGroupMembers()); + } + + + public void testEnvironmentFacadeDetectsRemovalOfRemoteNode() throws Exception + { + final String replicaName = TEST_NODE_NAME + "_1"; + final CountDownLatch nodeRemovedLatch = new CountDownLatch(1); + final CountDownLatch nodeAddedLatch = new CountDownLatch(1); + final AtomicReference<ReplicationNode> addedNodeRef = new AtomicReference<ReplicationNode>(); + final AtomicReference<ReplicationNode> removedNodeRef = new AtomicReference<ReplicationNode>(); + final CountDownLatch stateLatch = new CountDownLatch(1); + final AtomicReference<NodeState> stateRef = new AtomicReference<NodeState>(); + + ReplicationGroupListener listener = new NoopReplicationGroupListener() + { + @Override + public void onReplicationNodeAddedToGroup(ReplicationNode node) + { + if (addedNodeRef.compareAndSet(null, node)) + { + nodeAddedLatch.countDown(); + } + } + + @Override + public void onReplicationNodeRemovedFromGroup(ReplicationNode node) + { + removedNodeRef.set(node); + nodeRemovedLatch.countDown(); + } + + @Override + public void onNodeState(ReplicationNode node, NodeState nodeState) + { + if (replicaName.equals(node.getName())) + { + stateRef.set(nodeState); + stateLatch.countDown(); + } + } + }; + + TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); + final ReplicatedEnvironmentFacade masterEnvironment = addNode(State.MASTER, stateChangeListener, listener); + assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + masterEnvironment.setDesignatedPrimary(true); + + int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); + String node1NodeHostPort = "localhost:" + replica1Port; + + ReplicatedEnvironmentFacade replica = createReplica(replicaName, node1NodeHostPort, new NoopReplicationGroupListener()); + + assertTrue("Node should be added", nodeAddedLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS)); + + ReplicationNode node = addedNodeRef.get(); + assertEquals("Unexpected node name", replicaName, node.getName()); + + assertTrue("Node state was not heared", stateLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS)); + assertEquals("Unexpected node role", State.REPLICA, stateRef.get().getNodeState()); + assertEquals("Unexpected node name", replicaName, stateRef.get().getNodeName()); + + replica.close(); + masterEnvironment.removeNodeFromGroup(node.getName()); + + assertTrue("Node deleting is undetected by the environment facade", nodeRemovedLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS)); + assertEquals("Unexpected node is deleted", node, removedNodeRef.get()); + } + + public void testCloseStateTransitions() throws Exception + { + ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster(); + + assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, replicatedEnvironmentFacade.getFacadeState()); + replicatedEnvironmentFacade.close(); + assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState()); + } + + public void testEnvironmentRestartOnInsufficientReplicas() throws Exception + { + + ReplicatedEnvironmentFacade master = createMaster(); + + int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); + String replica1NodeName = TEST_NODE_NAME + "_1"; + String replica1NodeHostPort = "localhost:" + replica1Port; + ReplicatedEnvironmentFacade replica1 = createReplica(replica1NodeName, replica1NodeHostPort, new NoopReplicationGroupListener()); + + int replica2Port = getNextAvailable(replica1Port + 1); + String replica2NodeName = TEST_NODE_NAME + "_2"; + String replica2NodeHostPort = "localhost:" + replica2Port; + ReplicatedEnvironmentFacade replica2 = createReplica(replica2NodeName, replica2NodeHostPort, new NoopReplicationGroupListener()); + + String databaseName = "test"; + + DatabaseConfig dbConfig = createDatabase(master, databaseName); + + // close replicas + replica1.close(); + replica2.close(); + + Environment e = master.getEnvironment(); + master.getOpenDatabase(databaseName); + try + { + master.openDatabases(dbConfig, "test2"); + fail("Opening of new database without quorum should fail"); + } + catch(InsufficientReplicasException ex) + { + master.handleDatabaseException(null, ex); + } + + EnumSet<State> states = EnumSet.of(State.MASTER, State.REPLICA); + replica1 = createReplica(replica1NodeName, replica1NodeHostPort, new TestStateChangeListener(states), new NoopReplicationGroupListener()); + replica2 = createReplica(replica2NodeName, replica2NodeHostPort, new TestStateChangeListener(states), new NoopReplicationGroupListener()); + + // Need to poll to await the remote node updating itself + long timeout = System.currentTimeMillis() + 5000; + while(!(State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) ) && System.currentTimeMillis() < timeout) + { + Thread.sleep(200); + } + + assertTrue("The node could not rejoin the cluster. State is " + master.getNodeState(), + State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) ); + + Environment e2 = master.getEnvironment(); + assertNotSame("Environment has not been restarted", e2, e); + } + public void testEnvironmentAutomaticallyRestartsAndBecomesUnknownOnInsufficientReplicas() throws Exception { final CountDownLatch masterLatch = new CountDownLatch(1); @@ -244,7 +546,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase } }; - addNode(State.MASTER, stateChangeListener); + addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener()); assertTrue("Master was not started", masterLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS)); int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); @@ -252,8 +554,8 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase int replica2Port = getNextAvailable(replica1Port + 1); String node2NodeHostPort = "localhost:" + replica2Port; - ReplicatedEnvironmentFacade replica1 = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort); - ReplicatedEnvironmentFacade replica2 = createReplica(TEST_NODE_NAME + "_2", node2NodeHostPort); + ReplicatedEnvironmentFacade replica1 = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener()); + ReplicatedEnvironmentFacade replica2 = createReplica(TEST_NODE_NAME + "_2", node2NodeHostPort, new NoopReplicationGroupListener()); // close replicas replica1.close(); @@ -266,15 +568,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase assertEquals("Node made unknown an unexpected number of times", 1, unknownStateChangeCount.get()); } - public void testCloseStateTransitions() throws Exception - { - ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster(); - - assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, replicatedEnvironmentFacade.getFacadeState()); - replicatedEnvironmentFacade.close(); - assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState()); - } - public void testTransferMasterToSelf() throws Exception { final CountDownLatch firstNodeReplicaStateLatch = new CountDownLatch(1); @@ -295,12 +588,12 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase } } }; - ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener); + ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener()); assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS)); int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); String node1NodeHostPort = "localhost:" + replica1Port; - ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort); + ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener()); assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState()); int replica2Port = getNextAvailable(replica1Port + 1); @@ -323,7 +616,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase } } }; - ReplicatedEnvironmentFacade thirdNode = addNode(TEST_NODE_NAME + "_2", node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener); + ReplicatedEnvironmentFacade thirdNode = addNode(TEST_NODE_NAME + "_2", node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, new NoopReplicationGroupListener()); assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS)); assertEquals(3, thirdNode.getNumberOfElectableGroupMembers()); @@ -353,12 +646,12 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase } } }; - ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener); + ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener()); assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS)); int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); String node1NodeHostPort = "localhost:" + replica1Port; - ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort); + ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener()); assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState()); int replica2Port = getNextAvailable(replica1Port + 1); @@ -382,7 +675,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase } }; String thirdNodeName = TEST_NODE_NAME + "_2"; - ReplicatedEnvironmentFacade thirdNode = addNode(thirdNodeName, node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener); + ReplicatedEnvironmentFacade thirdNode = addNode(thirdNodeName, node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, new NoopReplicationGroupListener()); assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS)); assertEquals(3, thirdNode.getNumberOfElectableGroupMembers()); @@ -394,34 +687,56 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase private ReplicatedEnvironmentFacade createMaster() throws Exception { + return createMaster(new NoopReplicationGroupListener()); + } + + private ReplicatedEnvironmentFacade createMaster(ReplicationGroupListener replicationGroupListener) throws Exception + { TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); - ReplicatedEnvironmentFacade env = addNode(State.MASTER, stateChangeListener); + ReplicatedEnvironmentFacade env = addNode(State.MASTER, stateChangeListener, replicationGroupListener); assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); return env; } - private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort) throws Exception + private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort, ReplicationGroupListener replicationGroupListener) throws Exception { TestStateChangeListener testStateChangeListener = new TestStateChangeListener(State.REPLICA); - ReplicatedEnvironmentFacade replicaEnvironmentFacade = addNode(nodeName, nodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener); + return createReplica(nodeName, nodeHostPort, testStateChangeListener, replicationGroupListener); + } + + private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort, + TestStateChangeListener testStateChangeListener, ReplicationGroupListener replicationGroupListener) + throws InterruptedException + { + ReplicatedEnvironmentFacade replicaEnvironmentFacade = addNode(nodeName, nodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, replicationGroupListener); boolean awaitForStateChange = testStateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS); assertTrue("Replica " + nodeName + " did not go into desired state; current actual state is " + testStateChangeListener.getCurrentActualState(), awaitForStateChange); return replicaEnvironmentFacade; } private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary, - State desiredState, StateChangeListener stateChangeListener) + State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener) { ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary); ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config, null); ref.setStateChangeListener(stateChangeListener); + ref.setReplicationGroupListener(replicationGroupListener); _nodes.put(nodeName, ref); return ref; } - private ReplicatedEnvironmentFacade addNode(State desiredState, StateChangeListener stateChangeListener) + private ReplicatedEnvironmentFacade addNode(State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener) + { + return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener, replicationGroupListener); + } + + private DatabaseConfig createDatabase(ReplicatedEnvironmentFacade environmentFacade, String databaseName) { - return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener); + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + environmentFacade.openDatabases(dbConfig, databaseName); + return dbConfig; } private ReplicatedEnvironmentConfiguration createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, boolean designatedPrimary) @@ -444,4 +759,29 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase when(node.getStorePath()).thenReturn(new File(_storePath, nodeName).getAbsolutePath()); return node; } + + class NoopReplicationGroupListener implements ReplicationGroupListener + { + + @Override + public void onReplicationNodeAddedToGroup(ReplicationNode node) + { + } + + @Override + public void onReplicationNodeRecovered(ReplicationNode node) + { + } + + @Override + public void onReplicationNodeRemovedFromGroup(ReplicationNode node) + { + } + + @Override + public void onNodeState(ReplicationNode node, NodeState nodeState) + { + } + + } } |
