diff options
Diffstat (limited to 'zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java')
-rw-r--r-- | zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java | 441 |
1 files changed, 207 insertions, 234 deletions
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java index f72063418..1bcf5835e 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,7 +19,13 @@ package org.apache.zookeeper.test; import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; - +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -30,9 +36,27 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; - +import javax.management.Attribute; +import javax.management.AttributeNotFoundException; +import javax.management.InstanceNotFoundException; +import javax.management.InvalidAttributeValueException; +import javax.management.MBeanException; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.ReflectionException; +import javax.management.RuntimeMBeanException; import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.ConnectionLossException; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooKeeper.States; +import org.apache.zookeeper.admin.ZooKeeperAdmin; import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.jmx.ZKMBeanInfo; import org.apache.zookeeper.server.admin.Commands; @@ -41,35 +65,15 @@ import org.apache.zookeeper.server.quorum.FollowerZooKeeperServer; import org.apache.zookeeper.server.quorum.QuorumPeerConfig; import org.apache.zookeeper.server.quorum.QuorumPeerTestBase; import org.apache.zookeeper.server.util.PortForwarder; +import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException.ConnectionLossException; -import org.apache.zookeeper.PortAssignment; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.Event.KeeperState; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.ZooKeeper.States; -import org.apache.zookeeper.admin.ZooKeeperAdmin; -import org.junit.Assert; -import org.junit.Test; - -import javax.management.Attribute; -import javax.management.AttributeNotFoundException; -import javax.management.InstanceNotFoundException; -import javax.management.InvalidAttributeValueException; -import javax.management.MBeanException; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; -import javax.management.ReflectionException; -import javax.management.RuntimeMBeanException; @RunWith(Parameterized.class) -public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ +public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher { + protected static final Logger LOG = LoggerFactory.getLogger(ObserverMasterTest.class); public ObserverMasterTest(Boolean testObserverMaster) { @@ -77,9 +81,8 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ } @Parameterized.Parameters - public static List<Object []> data() { return Arrays.asList(new Object [][] { - {Boolean.TRUE}, - {Boolean.FALSE}}); + public static List<Object[]> data() { + return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}}); } private Boolean testObserverMaster; @@ -112,15 +115,15 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ OM_PORT = PortAssignment.unique(); - String quorumCfgSection = - "server.1=127.0.0.1:" + (PORT_QP1) - + ":" + (PORT_QP_LE1) + ";" + CLIENT_PORT_QP1 - + "\nserver.2=127.0.0.1:" + (PORT_QP2) - + ":" + (PORT_QP_LE2) + ";" + CLIENT_PORT_QP2 - + "\nserver.3=127.0.0.1:" - + (PORT_OBS)+ ":" + (PORT_OBS_LE) + ":observer" + ";" + CLIENT_PORT_OBS; - String extraCfgs = testObserverMaster ? String.format("observerMasterPort=%d%n", OM_PORT) : ""; - String extraCfgsObs = testObserverMaster ? String.format("observerMasterPort=%d%n", omProxyPort <= 0 ? OM_PORT : omProxyPort) : ""; + String quorumCfgSection = "server.1=127.0.0.1:" + (PORT_QP1) + ":" + (PORT_QP_LE1) + ";" + CLIENT_PORT_QP1 + + "\nserver.2=127.0.0.1:" + (PORT_QP2) + ":" + (PORT_QP_LE2) + ";" + CLIENT_PORT_QP2 + + "\nserver.3=127.0.0.1:" + (PORT_OBS) + ":" + (PORT_OBS_LE) + ":observer" + ";" + CLIENT_PORT_OBS; + String extraCfgs = testObserverMaster + ? String.format("observerMasterPort=%d%n", OM_PORT) + : ""; + String extraCfgsObs = testObserverMaster + ? String.format("observerMasterPort=%d%n", omProxyPort <= 0 ? OM_PORT : omProxyPort) + : ""; PortForwarder forwarder = null; if (testObserverMaster && omProxyPort >= 0) { @@ -132,12 +135,12 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ q3 = new MainThread(3, CLIENT_PORT_OBS, quorumCfgSection, extraCfgsObs); q1.start(); q2.start(); - Assert.assertTrue("waiting for server 1 being up", - ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, - CONNECTION_TIMEOUT)); - Assert.assertTrue("waiting for server 2 being up", - ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, - CONNECTION_TIMEOUT)); + assertTrue( + "waiting for server 1 being up", + ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, CONNECTION_TIMEOUT)); + assertTrue( + "waiting for server 2 being up", + ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, CONNECTION_TIMEOUT)); return forwarder; } @@ -149,15 +152,15 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ q2.shutdown(); q3.shutdown(); - Assert.assertTrue("Waiting for server 1 to shut down", - ClientBase.waitForServerDown("127.0.0.1:"+CLIENT_PORT_QP1, - ClientBase.CONNECTION_TIMEOUT)); - Assert.assertTrue("Waiting for server 2 to shut down", - ClientBase.waitForServerDown("127.0.0.1:"+CLIENT_PORT_QP2, - ClientBase.CONNECTION_TIMEOUT)); - Assert.assertTrue("Waiting for server 3 to shut down", - ClientBase.waitForServerDown("127.0.0.1:"+CLIENT_PORT_OBS, - ClientBase.CONNECTION_TIMEOUT)); + assertTrue( + "Waiting for server 1 to shut down", + ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT)); + assertTrue( + "Waiting for server 2 to shut down", + ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT)); + assertTrue( + "Waiting for server 3 to shut down", + ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT)); } @Test @@ -182,23 +185,21 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ } // ensure the observer master has commits in the queue before observer sync - zk = new ZooKeeper("127.0.0.1:" + leaderPort, - ClientBase.CONNECTION_TIMEOUT, this); + zk = new ZooKeeper("127.0.0.1:" + leaderPort, ClientBase.CONNECTION_TIMEOUT, this); for (int i = 0; i < 10; i++) { zk.create("/bulk" + i, ("initial data of some size").getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } zk.close(); q3.start(); - Assert.assertTrue("waiting for server 3 being up", - ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, - CONNECTION_TIMEOUT)); + assertTrue( + "waiting for server 3 being up", + ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, CONNECTION_TIMEOUT)); latch = new CountDownLatch(1); - zk = new ZooKeeper("127.0.0.1:" + leaderPort, - ClientBase.CONNECTION_TIMEOUT, this); + zk = new ZooKeeper("127.0.0.1:" + leaderPort, ClientBase.CONNECTION_TIMEOUT, this); latch.await(); - Assert.assertEquals(zk.getState(), States.CONNECTED); + assertEquals(zk.getState(), States.CONNECTED); zk.create("/init", "first".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); final long lastLoggedZxid = leader.getQuorumPeer().getLastLoggedZxid(); @@ -216,8 +217,7 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ } for (int i = 0; i < 10; i++) { - zk.create("/basic" + i, "second".getBytes(),Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); + zk.create("/basic" + i, "second".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } DelayRequestProcessor delayRequestProcessor = null; @@ -226,37 +226,34 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ delayRequestProcessor = DelayRequestProcessor.injectDelayRequestProcessor(followerZooKeeperServer); } - zk.create("/target1", "third".getBytes(),Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - zk.create("/target2", "third".getBytes(),Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); + zk.create("/target1", "third".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.create("/target2", "third".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - LOG.info("observer zxid " + Long.toHexString(q3.getQuorumPeer().getLastLoggedZxid()) + - (testObserverMaster ? "" : " observer master zxid " + - Long.toHexString(follower.getQuorumPeer().getLastLoggedZxid())) + - " leader zxid " + Long.toHexString(leader.getQuorumPeer().getLastLoggedZxid())); + LOG.info("observer zxid " + + Long.toHexString(q3.getQuorumPeer().getLastLoggedZxid()) + + (testObserverMaster ? "" : " observer master zxid " + Long.toHexString(follower.getQuorumPeer().getLastLoggedZxid())) + + " leader zxid " + + Long.toHexString(leader.getQuorumPeer().getLastLoggedZxid())); // restore network forwarder = testObserverMaster ? new PortForwarder(OM_PROXY_PORT, OM_PORT) : null; - Assert.assertTrue("waiting for server 3 being up", - ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, - CONNECTION_TIMEOUT)); - Assert.assertNotNull("Leader switched", leader.getQuorumPeer().leader); + assertTrue( + "waiting for server 3 being up", + ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, CONNECTION_TIMEOUT)); + assertNotNull("Leader switched", leader.getQuorumPeer().leader); if (delayRequestProcessor != null) { delayRequestProcessor.unblockQueue(); } latch = new CountDownLatch(1); - ZooKeeper obsZk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_OBS, - ClientBase.CONNECTION_TIMEOUT, this); + ZooKeeper obsZk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT, this); latch.await(); - zk.create("/finalop", "fourth".getBytes(),Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); + zk.create("/finalop", "fourth".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - Assert.assertEquals("first", new String(obsZk.getData("/init", null, null))); - Assert.assertEquals("third", new String(obsZk.getData("/target1", null, null))); + assertEquals("first", new String(obsZk.getData("/init", null, null))); + assertEquals("third", new String(obsZk.getData("/target1", null, null))); obsZk.close(); shutdown(); @@ -283,53 +280,48 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ latch = new CountDownLatch(2); setUp(-1); q3.start(); - Assert.assertTrue("waiting for server 3 being up", - ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, - CONNECTION_TIMEOUT)); + assertTrue( + "waiting for server 3 being up", + ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, CONNECTION_TIMEOUT)); if (testObserverMaster) { int masterPort = q3.getQuorumPeer().observer.getSocket().getPort(); LOG.info("port " + masterPort + " " + OM_PORT); - Assert.assertEquals("observer failed to connect to observer master", masterPort, OM_PORT); + assertEquals("observer failed to connect to observer master", masterPort, OM_PORT); } - zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_OBS, - ClientBase.CONNECTION_TIMEOUT, this); - zk.create("/obstest", "test".getBytes(),Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); + zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT, this); + zk.create("/obstest", "test".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // Assert that commands are getting forwarded correctly - Assert.assertEquals(new String(zk.getData("/obstest", null, null)), "test"); + assertEquals(new String(zk.getData("/obstest", null, null)), "test"); // Now check that other commands don't blow everything up zk.sync("/", null, null); zk.setData("/obstest", "test2".getBytes(), -1); zk.getChildren("/", false); - Assert.assertEquals(zk.getState(), States.CONNECTED); + assertEquals(zk.getState(), States.CONNECTED); LOG.info("Shutting down server 2"); // Now kill one of the other real servers q2.shutdown(); - Assert.assertTrue("Waiting for server 2 to shut down", - ClientBase.waitForServerDown("127.0.0.1:"+CLIENT_PORT_QP2, - ClientBase.CONNECTION_TIMEOUT)); + assertTrue( + "Waiting for server 2 to shut down", + ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT)); LOG.info("Server 2 down"); // Now the resulting ensemble shouldn't be quorate latch.await(); - Assert.assertNotSame("Client is still connected to non-quorate cluster", - KeeperState.SyncConnected,lastEvent.getState()); + assertNotSame("Client is still connected to non-quorate cluster", KeeperState.SyncConnected, lastEvent.getState()); LOG.info("Latch returned"); try { - Assert.assertNotEquals("Shouldn't get a response when cluster not quorate!", - "test", new String(zk.getData("/obstest", null, null))); - } - catch (ConnectionLossException c) { + assertNotEquals("Shouldn't get a response when cluster not quorate!", "test", new String(zk.getData("/obstest", null, null))); + } catch (ConnectionLossException c) { LOG.info("Connection loss exception caught - ensemble not quorate (this is expected)"); } @@ -342,19 +334,19 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ q2.start(); LOG.info("Waiting for server 2 to come up"); - Assert.assertTrue("waiting for server 2 being up", - ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, - CONNECTION_TIMEOUT)); + assertTrue( + "waiting for server 2 being up", + ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, CONNECTION_TIMEOUT)); LOG.info("Server 2 started, waiting for latch"); latch.await(); // It's possible our session expired - but this is ok, shows we // were able to talk to the ensemble - Assert.assertTrue("Client didn't reconnect to quorate ensemble (state was" + - lastEvent.getState() + ")", - (KeeperState.SyncConnected==lastEvent.getState() || - KeeperState.Expired==lastEvent.getState())); + assertTrue("Client didn't reconnect to quorate ensemble (state was" + + lastEvent.getState() + + ")", (KeeperState.SyncConnected == lastEvent.getState() + || KeeperState.Expired == lastEvent.getState())); LOG.info("perform a revalidation test"); int leaderProxyPort = PortAssignment.unique(); @@ -363,12 +355,10 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ PortForwarder leaderPF = new PortForwarder(leaderProxyPort, leaderPort); latch = new CountDownLatch(1); - ZooKeeper client = new ZooKeeper(String.format("127.0.0.1:%d,127.0.0.1:%d", leaderProxyPort, obsProxyPort), - ClientBase.CONNECTION_TIMEOUT, this); + ZooKeeper client = new ZooKeeper(String.format("127.0.0.1:%d,127.0.0.1:%d", leaderProxyPort, obsProxyPort), ClientBase.CONNECTION_TIMEOUT, this); latch.await(); - client.create("/revalidtest", "test".getBytes(),Ids.OPEN_ACL_UNSAFE, - CreateMode.EPHEMERAL); - Assert.assertNotNull("Read-after write failed", client.exists("/revalidtest", null)); + client.create("/revalidtest", "test".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + assertNotNull("Read-after write failed", client.exists("/revalidtest", null)); latch = new CountDownLatch(2); PortForwarder obsPF = new PortForwarder(obsProxyPort, CLIENT_PORT_OBS); @@ -378,7 +368,7 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ // ignore? } latch.await(); - Assert.assertEquals(new String(client.getData("/revalidtest", null, null)), "test"); + assertEquals(new String(client.getData("/revalidtest", null, null)), "test"); client.close(); obsPF.shutdown(); @@ -389,9 +379,9 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ public void testRevalidation() throws Exception { setUp(-1); q3.start(); - Assert.assertTrue("waiting for server 3 being up", - ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, - CONNECTION_TIMEOUT)); + assertTrue( + "waiting for server 3 being up", + ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, CONNECTION_TIMEOUT)); final int leaderProxyPort = PortAssignment.unique(); final int obsProxyPort = PortAssignment.unique(); @@ -399,12 +389,10 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ PortForwarder leaderPF = new PortForwarder(leaderProxyPort, leaderPort); latch = new CountDownLatch(1); - zk = new ZooKeeper(String.format("127.0.0.1:%d,127.0.0.1:%d", leaderProxyPort, obsProxyPort), - ClientBase.CONNECTION_TIMEOUT, this); + zk = new ZooKeeper(String.format("127.0.0.1:%d,127.0.0.1:%d", leaderProxyPort, obsProxyPort), ClientBase.CONNECTION_TIMEOUT, this); latch.await(); - zk.create("/revalidtest", "test".getBytes(),Ids.OPEN_ACL_UNSAFE, - CreateMode.EPHEMERAL); - Assert.assertNotNull("Read-after write failed", zk.exists("/revalidtest", null)); + zk.create("/revalidtest", "test".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + assertNotNull("Read-after write failed", zk.exists("/revalidtest", null)); latch = new CountDownLatch(2); PortForwarder obsPF = new PortForwarder(obsProxyPort, CLIENT_PORT_OBS); @@ -414,7 +402,7 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ // ignore? } latch.await(); - Assert.assertEquals(new String(zk.getData("/revalidtest", null, null)), "test"); + assertEquals(new String(zk.getData("/revalidtest", null, null)), "test"); obsPF.shutdown(); shutdown(); @@ -424,23 +412,22 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ public void testInOrderCommits() throws Exception { setUp(-1); - zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_QP1, - ClientBase.CONNECTION_TIMEOUT, null); + zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT, null); for (int i = 0; i < 10; i++) { - zk.create("/bulk" + i, ("Initial data of some size").getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.create("/bulk" + + i, ("Initial data of some size").getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } zk.close(); q3.start(); - Assert.assertTrue("waiting for observer to be up", - ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, - CONNECTION_TIMEOUT)); + assertTrue( + "waiting for observer to be up", + ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, CONNECTION_TIMEOUT)); latch = new CountDownLatch(1); - zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_QP1, - ClientBase.CONNECTION_TIMEOUT, this); + zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT, this); latch.await(); - Assert.assertEquals(zk.getState(), States.CONNECTED); + assertEquals(zk.getState(), States.CONNECTED); zk.create("/init", "first".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); final long zxid = q1.getQuorumPeer().getLastLoggedZxid(); @@ -452,11 +439,9 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ } }, 30); - ZooKeeper obsZk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_OBS, - ClientBase.CONNECTION_TIMEOUT, this); + ZooKeeper obsZk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT, this); int followerPort = q1.getQuorumPeer().leader == null ? CLIENT_PORT_QP1 : CLIENT_PORT_QP2; - ZooKeeper fZk = new ZooKeeper("127.0.0.1:" + followerPort, - ClientBase.CONNECTION_TIMEOUT, this); + ZooKeeper fZk = new ZooKeeper("127.0.0.1:" + followerPort, ClientBase.CONNECTION_TIMEOUT, this); final int numTransactions = 10001; CountDownLatch gate = new CountDownLatch(1); CountDownLatch oAsyncLatch = new CountDownLatch(numTransactions); @@ -488,9 +473,7 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ } @Test - public void testAdminCommands() throws IOException, MBeanException, - InstanceNotFoundException, ReflectionException, InterruptedException, MalformedObjectNameException, - AttributeNotFoundException, InvalidAttributeValueException, KeeperException { + public void testAdminCommands() throws IOException, MBeanException, InstanceNotFoundException, ReflectionException, InterruptedException, MalformedObjectNameException, AttributeNotFoundException, InvalidAttributeValueException, KeeperException { // flush all beans, then start for (ZKMBeanInfo beanInfo : MBeanRegistry.getInstance().getRegisteredBeans()) { MBeanRegistry.getInstance().unregister(beanInfo); @@ -499,76 +482,72 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ JMXEnv.setUp(); setUp(-1); q3.start(); - Assert.assertTrue("waiting for observer to be up", - ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, - CONNECTION_TIMEOUT)); + assertTrue( + "waiting for observer to be up", + ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, CONNECTION_TIMEOUT)); // Assert that commands are getting forwarded correctly - zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_OBS, - ClientBase.CONNECTION_TIMEOUT, this); - zk.create("/obstest", "test".getBytes(),Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - Assert.assertEquals(new String(zk.getData("/obstest", null, null)), "test"); + zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT, this); + zk.create("/obstest", "test".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + assertEquals(new String(zk.getData("/obstest", null, null)), "test"); // test stats collection final Map<String, String> emptyMap = Collections.emptyMap(); Map<String, Object> stats = Commands.runCommand("mntr", q3.getQuorumPeer().getActiveServer(), emptyMap).toMap(); - Assert.assertTrue("observer not emitting observer_master_id", stats.containsKey("observer_master_id")); + assertTrue("observer not emitting observer_master_id", stats.containsKey("observer_master_id")); // check the stats for the first peer if (testObserverMaster) { if (q1.getQuorumPeer().leader == null) { - Assert.assertEquals(Integer.valueOf(1), q1.getQuorumPeer().getSynced_observers_metric()); + assertEquals(Integer.valueOf(1), q1.getQuorumPeer().getSynced_observers_metric()); } else { - Assert.assertEquals(Integer.valueOf(0), q1.getQuorumPeer().getSynced_observers_metric()); + assertEquals(Integer.valueOf(0), q1.getQuorumPeer().getSynced_observers_metric()); } } else { if (q1.getQuorumPeer().leader == null) { - Assert.assertNull(q1.getQuorumPeer().getSynced_observers_metric()); + assertNull(q1.getQuorumPeer().getSynced_observers_metric()); } else { - Assert.assertEquals(Integer.valueOf(1), q1.getQuorumPeer().getSynced_observers_metric()); + assertEquals(Integer.valueOf(1), q1.getQuorumPeer().getSynced_observers_metric()); } } // check the stats for the second peer if (testObserverMaster) { if (q2.getQuorumPeer().leader == null) { - Assert.assertEquals(Integer.valueOf(1), q2.getQuorumPeer().getSynced_observers_metric()); + assertEquals(Integer.valueOf(1), q2.getQuorumPeer().getSynced_observers_metric()); } else { - Assert.assertEquals(Integer.valueOf(0), q2.getQuorumPeer().getSynced_observers_metric()); + assertEquals(Integer.valueOf(0), q2.getQuorumPeer().getSynced_observers_metric()); } } else { if (q2.getQuorumPeer().leader == null) { - Assert.assertNull(q2.getQuorumPeer().getSynced_observers_metric()); + assertNull(q2.getQuorumPeer().getSynced_observers_metric()); } else { - Assert.assertEquals(Integer.valueOf(1), q2.getQuorumPeer().getSynced_observers_metric()); + assertEquals(Integer.valueOf(1), q2.getQuorumPeer().getSynced_observers_metric()); } } // test admin commands for disconnection ObjectName connBean = null; for (ObjectName bean : JMXEnv.conn().queryNames(new ObjectName(MBeanRegistry.DOMAIN + ":*"), null)) { - if (bean.getCanonicalName().contains("Learner_Connections") && - bean.getCanonicalName().contains("id:" + q3.getQuorumPeer().getId())) { + if (bean.getCanonicalName().contains("Learner_Connections") && bean.getCanonicalName().contains("id:" + + q3.getQuorumPeer().getId())) { connBean = bean; break; } } - Assert.assertNotNull("could not find connection bean", connBean); + assertNotNull("could not find connection bean", connBean); latch = new CountDownLatch(1); JMXEnv.conn().invoke(connBean, "terminateConnection", new Object[0], null); - Assert.assertTrue("server failed to disconnect on terminate", - latch.await(CONNECTION_TIMEOUT/2, TimeUnit.MILLISECONDS)); - Assert.assertTrue("waiting for server 3 being up", - ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, - CONNECTION_TIMEOUT)); - - final String obsBeanName = - String.format("org.apache.ZooKeeperService:name0=ReplicatedServer_id%d,name1=replica.%d,name2=Observer", - q3.getQuorumPeer().getId(), q3.getQuorumPeer().getId()); + assertTrue("server failed to disconnect on terminate", latch.await(CONNECTION_TIMEOUT + / 2, TimeUnit.MILLISECONDS)); + assertTrue( + "waiting for server 3 being up", + ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, CONNECTION_TIMEOUT)); + + final String obsBeanName = String.format("org.apache.ZooKeeperService:name0=ReplicatedServer_id%d,name1=replica.%d,name2=Observer", q3.getQuorumPeer().getId(), q3.getQuorumPeer().getId()); Set<ObjectName> names = JMXEnv.conn().queryNames(new ObjectName(obsBeanName), null); - Assert.assertEquals("expecting singular observer bean", 1, names.size()); + assertEquals("expecting singular observer bean", 1, names.size()); ObjectName obsBean = names.iterator().next(); if (testObserverMaster) { @@ -576,20 +555,18 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ long observerMasterId = q3.getQuorumPeer().observer.getLearnerMasterId(); latch = new CountDownLatch(1); JMXEnv.conn().setAttribute(obsBean, new Attribute("LearnerMaster", Long.toString(3 - observerMasterId))); - Assert.assertTrue("server failed to disconnect on terminate", - latch.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)); - Assert.assertTrue("waiting for server 3 being up", - ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, - CONNECTION_TIMEOUT)); + assertTrue("server failed to disconnect on terminate", latch.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)); + assertTrue( + "waiting for server 3 being up", + ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, CONNECTION_TIMEOUT)); } else { // show we get an error final long leaderId = q1.getQuorumPeer().leader == null ? 2 : 1; try { JMXEnv.conn().setAttribute(obsBean, new Attribute("LearnerMaster", Long.toString(3 - leaderId))); - Assert.fail("should have seen an exception on previous command"); + fail("should have seen an exception on previous command"); } catch (RuntimeMBeanException e) { - Assert.assertEquals("mbean failed for the wrong reason", - IllegalArgumentException.class, e.getCause().getClass()); + assertEquals("mbean failed for the wrong reason", IllegalArgumentException.class, e.getCause().getClass()); } } @@ -598,26 +575,26 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ } private String createServerString(String type, long serverId, int clientPort) { - return "server." + serverId + "=127.0.0.1:" + - PortAssignment.unique() + ":" + - PortAssignment.unique() + ":" + - type + ";" + clientPort; + return "server." + serverId + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ":" + type + ";" + clientPort; } private void waitServerUp(int clientPort) { - Assert.assertTrue("waiting for server being up", - ClientBase.waitForServerUp("127.0.0.1:" + clientPort, - CONNECTION_TIMEOUT)); + assertTrue( + "waiting for server being up", + ClientBase.waitForServerUp("127.0.0.1:" + clientPort, CONNECTION_TIMEOUT)); } private ZooKeeperAdmin createAdmin(int clientPort) throws IOException { - System.setProperty("zookeeper.DigestAuthenticationProvider.superDigest", - "super:D/InIHSb7yEEbrWz8b9l71RjZJU="/* password is 'test'*/); + System.setProperty("zookeeper.DigestAuthenticationProvider.superDigest", "super:D/InIHSb7yEEbrWz8b9l71RjZJU="/* password is 'test'*/); QuorumPeerConfig.setReconfigEnabled(true); - ZooKeeperAdmin admin = new ZooKeeperAdmin("127.0.0.1:" + clientPort, - ClientBase.CONNECTION_TIMEOUT, new Watcher() { - public void process(WatchedEvent event) {} - }); + ZooKeeperAdmin admin = new ZooKeeperAdmin( + "127.0.0.1:" + clientPort, + ClientBase.CONNECTION_TIMEOUT, + new Watcher() { + public void process(WatchedEvent event) { + + } + }); admin.addAuthInfo("digest", "super:test".getBytes()); return admin; } @@ -625,8 +602,7 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ // This test is known to be flaky and fail due to "reconfig already in progress". // TODO: Investigate intermittent testDynamicReconfig failures. // @Test - public void testDynamicReconfig() throws InterruptedException, IOException, - KeeperException { + public void testDynamicReconfig() throws InterruptedException, IOException, KeeperException { if (!testObserverMaster) { return; } @@ -643,57 +619,53 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ int clientPort2 = PortAssignment.unique(); int omPort1 = PortAssignment.unique(); int omPort2 = PortAssignment.unique(); - String quorumCfgSection = - createServerString("participant", 1, clientPort1) + "\n" + - createServerString("participant", 2, clientPort2); - - MainThread s1 = new MainThread(1, clientPort1, quorumCfgSection, - String.format("observerMasterPort=%d%n",omPort1)); - MainThread s2 = new MainThread(2, clientPort2, quorumCfgSection, - String.format("observerMasterPort=%d%n", omPort2)); + String quorumCfgSection = createServerString("participant", 1, clientPort1) + + "\n" + + createServerString("participant", 2, clientPort2); + + MainThread s1 = new MainThread(1, clientPort1, quorumCfgSection, String.format("observerMasterPort=%d%n", omPort1)); + MainThread s2 = new MainThread(2, clientPort2, quorumCfgSection, String.format("observerMasterPort=%d%n", omPort2)); s1.start(); s2.start(); waitServerUp(clientPort1); waitServerUp(clientPort2); // create observer to follow non-leader observer master - long nonLeaderOMPort = s1.getQuorumPeer().leader == null ? omPort1 - : omPort2; + long nonLeaderOMPort = s1.getQuorumPeer().leader == null ? omPort1 : omPort2; int observerClientPort = PortAssignment.unique(); int observerId = 10; MainThread observer = new MainThread( - observerId, - observerClientPort, quorumCfgSection + "\n" + - createServerString("observer", observerId, - observerClientPort), - String.format("observerMasterPort=%d%n", nonLeaderOMPort)); + observerId, + observerClientPort, + quorumCfgSection + "\n" + createServerString("observer", observerId, observerClientPort), + String.format("observerMasterPort=%d%n", nonLeaderOMPort)); LOG.info("starting observer"); observer.start(); waitServerUp(observerClientPort); // create a client to the observer - final LinkedBlockingQueue<KeeperState> states = - new LinkedBlockingQueue<KeeperState>(); + final LinkedBlockingQueue<KeeperState> states = new LinkedBlockingQueue<KeeperState>(); ZooKeeper observerClient = new ZooKeeper( - "127.0.0.1:" + observerClientPort, - ClientBase.CONNECTION_TIMEOUT, new Watcher() { - @Override - public void process(WatchedEvent event) { - try { - states.put(event.getState()); - } catch (InterruptedException e) {} + "127.0.0.1:" + observerClientPort, + ClientBase.CONNECTION_TIMEOUT, + new Watcher() { + @Override + public void process(WatchedEvent event) { + try { + states.put(event.getState()); + } catch (InterruptedException e) { + } - }); + } + }); // wait for connected KeeperState state = states.poll(1000, TimeUnit.MILLISECONDS); - Assert.assertEquals(KeeperState.SyncConnected, state); + assertEquals(KeeperState.SyncConnected, state); // issue reconfig command ArrayList<String> newServers = new ArrayList<String>(); - String server = "server.3=127.0.0.1:" + PortAssignment.unique() - + ":" + PortAssignment.unique() + ":participant;localhost:" - + PortAssignment.unique(); + String server = "server.3=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ":participant;localhost:" + PortAssignment.unique(); newServers.add(server); ZooKeeperAdmin admin = createAdmin(clientPort1); ReconfigTest.reconfig(admin, newServers, null, null, -1); @@ -704,7 +676,7 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ // shouldn't be disconnected during reconfig, so expect to not // receive any new event state = states.poll(1000, TimeUnit.MILLISECONDS); - Assert.assertNull(state); + assertNull(state); admin.close(); observerClient.close(); @@ -725,6 +697,7 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ } class AsyncWriter implements Runnable { + private final ZooKeeper client; private final int numTransactions; private final boolean issueSync; @@ -732,8 +705,7 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ private final String root; private final CountDownLatch gate; - AsyncWriter(ZooKeeper client, int numTransactions, boolean issueSync, CountDownLatch writerLatch, - String root, CountDownLatch gate) { + AsyncWriter(ZooKeeper client, int numTransactions, boolean issueSync, CountDownLatch writerLatch, String root, CountDownLatch gate) { this.client = client; this.numTransactions = numTransactions; this.issueSync = issueSync; @@ -754,17 +726,16 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ } for (int i = 0; i < numTransactions; i++) { final boolean pleaseLog = i % 100 == 0; - client.create(root + i, "inner thread".getBytes(), Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT, new AsyncCallback.StringCallback() { - @Override - public void processResult(int rc, String path, - Object ctx, String name) { - writerLatch.countDown(); - if (pleaseLog) { - LOG.info("wrote {}", path); - } - } - }, null); + client.create(root + + i, "inner thread".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() { + @Override + public void processResult(int rc, String path, Object ctx, String name) { + writerLatch.countDown(); + if (pleaseLog) { + LOG.info("wrote {}", path); + } + } + }, null); if (pleaseLog) { LOG.info("async wrote {}{}", root, i); if (issueSync) { @@ -773,5 +744,7 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ } } } + } + } |