diff options
Diffstat (limited to 'zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java')
-rw-r--r-- | zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java | 219 |
1 files changed, 107 insertions, 112 deletions
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java index 20388d890..f39548b02 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,13 +17,16 @@ */ package org.apache.zookeeper.test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; - import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -40,7 +43,6 @@ import org.apache.zookeeper.server.quorum.Leader; import org.apache.zookeeper.server.quorum.LearnerHandler; import org.apache.zookeeper.test.ClientBase.CountdownWatcher; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; @@ -48,6 +50,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class QuorumTest extends ZKTestCase { + private static final Logger LOG = LoggerFactory.getLogger(QuorumTest.class); public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT; @@ -82,9 +85,7 @@ public class QuorumTest extends ZKTestCase { } @Test - public void testSequentialNodeNames() - throws IOException, InterruptedException, KeeperException - { + public void testSequentialNodeNames() throws IOException, InterruptedException, KeeperException { ct.testSequentialNodeNames(); } @@ -94,38 +95,34 @@ public class QuorumTest extends ZKTestCase { } @Test - public void testClientwithoutWatcherObj() throws IOException, - InterruptedException, KeeperException - { + public void testClientwithoutWatcherObj() throws IOException, InterruptedException, KeeperException { ct.testClientwithoutWatcherObj(); } @Test - public void testClientWithWatcherObj() throws IOException, - InterruptedException, KeeperException - { + public void testClientWithWatcherObj() throws IOException, InterruptedException, KeeperException { ct.testClientWithWatcherObj(); } @Test public void testGetView() { - Assert.assertEquals(5,qb.s1.getView().size()); - Assert.assertEquals(5,qb.s2.getView().size()); - Assert.assertEquals(5,qb.s3.getView().size()); - Assert.assertEquals(5,qb.s4.getView().size()); - Assert.assertEquals(5,qb.s5.getView().size()); + assertEquals(5, qb.s1.getView().size()); + assertEquals(5, qb.s2.getView().size()); + assertEquals(5, qb.s3.getView().size()); + assertEquals(5, qb.s4.getView().size()); + assertEquals(5, qb.s5.getView().size()); } @Test public void testViewContains() { // Test view contains self - Assert.assertTrue(qb.s1.viewContains(qb.s1.getId())); + assertTrue(qb.s1.viewContains(qb.s1.getId())); // Test view contains other servers - Assert.assertTrue(qb.s1.viewContains(qb.s2.getId())); + assertTrue(qb.s1.viewContains(qb.s2.getId())); // Test view does not contain non-existant servers - Assert.assertFalse(qb.s1.viewContains(-1L)); + assertFalse(qb.s1.viewContains(-1L)); } volatile int counter = 0; @@ -134,19 +131,27 @@ public class QuorumTest extends ZKTestCase { public void testLeaderShutdown() throws IOException, InterruptedException, KeeperException { ZooKeeper zk = new DisconnectableZooKeeper(qb.hostPort, ClientBase.CONNECTION_TIMEOUT, new Watcher() { public void process(WatchedEvent event) { - }}); + } + }); zk.create("/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.create("/blah/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Leader leader = qb.s1.leader; - if (leader == null) leader = qb.s2.leader; - if (leader == null) leader = qb.s3.leader; - if (leader == null) leader = qb.s4.leader; - if (leader == null) leader = qb.s5.leader; - Assert.assertNotNull(leader); - for(int i = 0; i < 5000; i++) { + if (leader == null) { + leader = qb.s2.leader; + } + if (leader == null) { + leader = qb.s3.leader; + } + if (leader == null) { + leader = qb.s4.leader; + } + if (leader == null) { + leader = qb.s5.leader; + } + assertNotNull(leader); + for (int i = 0; i < 5000; i++) { zk.setData("/blah/blah", new byte[0], -1, new AsyncCallback.StatCallback() { - public void processResult(int rc, String path, Object ctx, - Stat stat) { + public void processResult(int rc, String path, Object ctx, Stat stat) { counter++; if (rc != 0) { errors++; @@ -154,13 +159,12 @@ public class QuorumTest extends ZKTestCase { } }, null); } - for(LearnerHandler f : leader.getForwardingFollowers()) { + for (LearnerHandler f : leader.getForwardingFollowers()) { f.getSocket().shutdownInput(); } - for(int i = 0; i < 5000; i++) { + for (int i = 0; i < 5000; i++) { zk.setData("/blah/blah", new byte[0], -1, new AsyncCallback.StatCallback() { - public void processResult(int rc, String path, Object ctx, - Stat stat) { + public void processResult(int rc, String path, Object ctx, Stat stat) { counter++; if (rc != 0) { errors++; @@ -169,18 +173,16 @@ public class QuorumTest extends ZKTestCase { }, null); } // check if all the followers are alive - Assert.assertTrue(qb.s1.isAlive()); - Assert.assertTrue(qb.s2.isAlive()); - Assert.assertTrue(qb.s3.isAlive()); - Assert.assertTrue(qb.s4.isAlive()); - Assert.assertTrue(qb.s5.isAlive()); + assertTrue(qb.s1.isAlive()); + assertTrue(qb.s2.isAlive()); + assertTrue(qb.s3.isAlive()); + assertTrue(qb.s4.isAlive()); + assertTrue(qb.s5.isAlive()); zk.close(); } @Test - public void testMultipleWatcherObjs() throws IOException, - InterruptedException, KeeperException - { + public void testMultipleWatcherObjs() throws IOException, InterruptedException, KeeperException { ct.testMutipleWatcherObjs(); } @@ -194,42 +196,43 @@ public class QuorumTest extends ZKTestCase { */ @Test public void testSessionMoved() throws Exception { - String hostPorts[] = qb.hostPort.split(","); - DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hostPorts[0], - ClientBase.CONNECTION_TIMEOUT, new Watcher() { + String[] hostPorts = qb.hostPort.split(","); + DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hostPorts[0], ClientBase.CONNECTION_TIMEOUT, new Watcher() { public void process(WatchedEvent event) { - }}); + } + }); zk.create("/sessionMoveTest", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); // we want to loop through the list twice - for(int i = 0; i < hostPorts.length*2; i++) { + for (int i = 0; i < hostPorts.length * 2; i++) { zk.dontReconnect(); // This should stomp the zk handle - DisconnectableZooKeeper zknew = - new DisconnectableZooKeeper(hostPorts[(i+1)%hostPorts.length], - ClientBase.CONNECTION_TIMEOUT, - new Watcher() {public void process(WatchedEvent event) { - }}, - zk.getSessionId(), - zk.getSessionPasswd()); + DisconnectableZooKeeper zknew = new DisconnectableZooKeeper(hostPorts[(i + 1) + % hostPorts.length], ClientBase.CONNECTION_TIMEOUT, new Watcher() { + public void process(WatchedEvent event) { + } + }, zk.getSessionId(), zk.getSessionPasswd()); zknew.setData("/", new byte[1], -1); - final int result[] = new int[1]; + final int[] result = new int[1]; result[0] = Integer.MAX_VALUE; zknew.sync("/", new AsyncCallback.VoidCallback() { - public void processResult(int rc, String path, Object ctx) { - synchronized(result) { result[0] = rc; result.notify(); } + public void processResult(int rc, String path, Object ctx) { + synchronized (result) { + result[0] = rc; + result.notify(); } - }, null); - synchronized(result) { - if(result[0] == Integer.MAX_VALUE) { + } + }, null); + synchronized (result) { + if (result[0] == Integer.MAX_VALUE) { result.wait(5000); } } - LOG.info(hostPorts[(i+1)%hostPorts.length] + " Sync returned " + result[0]); - Assert.assertTrue(result[0] == KeeperException.Code.OK.intValue()); + LOG.info(hostPorts[(i + 1) % hostPorts.length] + " Sync returned " + result[0]); + assertTrue(result[0] == KeeperException.Code.OK.intValue()); try { zk.setData("/", new byte[1], -1); - Assert.fail("Should have lost the connection"); - } catch(KeeperException.ConnectionLossException e) { + fail("Should have lost the connection"); + } catch (KeeperException.ConnectionLossException e) { } zk = zknew; } @@ -237,12 +240,14 @@ public class QuorumTest extends ZKTestCase { } private static class DiscoWatcher implements Watcher { + volatile boolean zkDisco = false; public void process(WatchedEvent event) { if (event.getState() == KeeperState.Disconnected) { zkDisco = true; } } + } /** @@ -255,35 +260,26 @@ public class QuorumTest extends ZKTestCase { */ @Test public void testSessionMovedWithMultiOp() throws Exception { - String hostPorts[] = qb.hostPort.split(","); - DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hostPorts[0], - ClientBase.CONNECTION_TIMEOUT, new Watcher() { + String[] hostPorts = qb.hostPort.split(","); + DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hostPorts[0], ClientBase.CONNECTION_TIMEOUT, new Watcher() { public void process(WatchedEvent event) { - }}); - zk.multi(Arrays.asList( - Op.create("/testSessionMovedWithMultiOp", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL) - )); + } + }); + zk.multi(Arrays.asList(Op.create("/testSessionMovedWithMultiOp", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL))); // session moved to the next server - ZooKeeper zknew = new ZooKeeper(hostPorts[1], - ClientBase.CONNECTION_TIMEOUT, - new Watcher() {public void process(WatchedEvent event) { - }}, - zk.getSessionId(), - zk.getSessionPasswd()); - zknew.multi(Arrays.asList( - Op.create("/testSessionMovedWithMultiOp-1", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL) - )); + ZooKeeper zknew = new ZooKeeper(hostPorts[1], ClientBase.CONNECTION_TIMEOUT, new Watcher() { + public void process(WatchedEvent event) { + } + }, zk.getSessionId(), zk.getSessionPasswd()); + zknew.multi(Arrays.asList(Op.create("/testSessionMovedWithMultiOp-1", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL))); // try to issue the multi op again from the old connection // expect to have ConnectionLossException instead of keep // getting SessionMovedException try { - zk.multi(Arrays.asList( - Op.create("/testSessionMovedWithMultiOp-Failed", - new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL) - )); - Assert.fail("Should have lost the connection"); + zk.multi(Arrays.asList(Op.create("/testSessionMovedWithMultiOp-Failed", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL))); + fail("Should have lost the connection"); } catch (KeeperException.ConnectionLossException e) { } @@ -298,22 +294,19 @@ public class QuorumTest extends ZKTestCase { @Test @Ignore public void testSessionMove() throws Exception { - String hps[] = qb.hostPort.split(","); + String[] hps = qb.hostPort.split(","); DiscoWatcher oldWatcher = new DiscoWatcher(); - DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hps[0], - ClientBase.CONNECTION_TIMEOUT, oldWatcher); + DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hps[0], ClientBase.CONNECTION_TIMEOUT, oldWatcher); zk.create("/t1", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); zk.dontReconnect(); // This should stomp the zk handle DiscoWatcher watcher = new DiscoWatcher(); - DisconnectableZooKeeper zknew = new DisconnectableZooKeeper(hps[1], - ClientBase.CONNECTION_TIMEOUT, watcher, zk.getSessionId(), - zk.getSessionPasswd()); + DisconnectableZooKeeper zknew = new DisconnectableZooKeeper(hps[1], ClientBase.CONNECTION_TIMEOUT, watcher, zk.getSessionId(), zk.getSessionPasswd()); zknew.create("/t2", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); try { zk.create("/t3", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); - Assert.fail("Should have lost the connection"); - } catch(KeeperException.ConnectionLossException e) { + fail("Should have lost the connection"); + } catch (KeeperException.ConnectionLossException e) { // wait up to 30 seconds for the disco to be delivered for (int i = 0; i < 30; i++) { if (oldWatcher.zkDisco) { @@ -321,21 +314,19 @@ public class QuorumTest extends ZKTestCase { } Thread.sleep(1000); } - Assert.assertTrue(oldWatcher.zkDisco); + assertTrue(oldWatcher.zkDisco); } ArrayList<ZooKeeper> toClose = new ArrayList<ZooKeeper>(); toClose.add(zknew); // Let's just make sure it can still move - for(int i = 0; i < 10; i++) { + for (int i = 0; i < 10; i++) { zknew.dontReconnect(); - zknew = new DisconnectableZooKeeper(hps[1], - ClientBase.CONNECTION_TIMEOUT, new DiscoWatcher(), - zk.getSessionId(), zk.getSessionPasswd()); + zknew = new DisconnectableZooKeeper(hps[1], ClientBase.CONNECTION_TIMEOUT, new DiscoWatcher(), zk.getSessionId(), zk.getSessionPasswd()); toClose.add(zknew); - zknew.create("/t-"+i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + zknew.create("/t-" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } - for (ZooKeeper z: toClose) { + for (ZooKeeper z : toClose) { z.close(); } zk.close(); @@ -351,8 +342,9 @@ public class QuorumTest extends ZKTestCase { qu.startQuorum(); int index = 1; - while(qu.getPeer(index).peer.leader == null) + while (qu.getPeer(index).peer.leader == null) { index++; + } // break the quorum qu.shutdown(index); @@ -363,13 +355,14 @@ public class QuorumTest extends ZKTestCase { // Connect the client after services are restarted (otherwise we would get // SessionExpiredException as the previous local session was not persisted). ZooKeeper zk = new ZooKeeper( - "127.0.0.1:" + qu.getPeer((index == 1)?2:1).peer.getClientPort(), - ClientBase.CONNECTION_TIMEOUT, watcher); + "127.0.0.1:" + qu.getPeer((index == 1) ? 2 : 1).peer.getClientPort(), + ClientBase.CONNECTION_TIMEOUT, + watcher); - try{ + try { watcher.waitForConnected(CONNECTION_TIMEOUT); - } catch(TimeoutException e) { - Assert.fail("client could not connect to reestablished quorum: giving up after 30+ seconds."); + } catch (TimeoutException e) { + fail("client could not connect to reestablished quorum: giving up after 30+ seconds."); } zk.close(); @@ -393,23 +386,25 @@ public class QuorumTest extends ZKTestCase { qu.startQuorum(); int index = 1; - while(qu.getPeer(index).peer.leader == null) + while (qu.getPeer(index).peer.leader == null) { index++; + } ZooKeeper zk = new ZooKeeper( - "127.0.0.1:" + qu.getPeer((index == 1)?2:1).peer.getClientPort(), - ClientBase.CONNECTION_TIMEOUT, watcher); + "127.0.0.1:" + qu.getPeer((index == 1) ? 2 : 1).peer.getClientPort(), + ClientBase.CONNECTION_TIMEOUT, + watcher); watcher.waitForConnected(CONNECTION_TIMEOUT); zk.multi(Arrays.asList( Op.create("/multi0", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT), Op.create("/multi1", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT), - Op.create("/multi2", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) - )); + Op.create("/multi2", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT))); zk.getData("/multi0", false, null); zk.getData("/multi1", false, null); zk.getData("/multi2", false, null); zk.close(); } + } |