summaryrefslogtreecommitdiff
path: root/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java')
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java219
1 files changed, 107 insertions, 112 deletions
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java
index 20388d890..f39548b02 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,13 +17,16 @@
*/
package org.apache.zookeeper.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -40,7 +43,6 @@ import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.server.quorum.LearnerHandler;
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@@ -48,6 +50,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class QuorumTest extends ZKTestCase {
+
private static final Logger LOG = LoggerFactory.getLogger(QuorumTest.class);
public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;
@@ -82,9 +85,7 @@ public class QuorumTest extends ZKTestCase {
}
@Test
- public void testSequentialNodeNames()
- throws IOException, InterruptedException, KeeperException
- {
+ public void testSequentialNodeNames() throws IOException, InterruptedException, KeeperException {
ct.testSequentialNodeNames();
}
@@ -94,38 +95,34 @@ public class QuorumTest extends ZKTestCase {
}
@Test
- public void testClientwithoutWatcherObj() throws IOException,
- InterruptedException, KeeperException
- {
+ public void testClientwithoutWatcherObj() throws IOException, InterruptedException, KeeperException {
ct.testClientwithoutWatcherObj();
}
@Test
- public void testClientWithWatcherObj() throws IOException,
- InterruptedException, KeeperException
- {
+ public void testClientWithWatcherObj() throws IOException, InterruptedException, KeeperException {
ct.testClientWithWatcherObj();
}
@Test
public void testGetView() {
- Assert.assertEquals(5,qb.s1.getView().size());
- Assert.assertEquals(5,qb.s2.getView().size());
- Assert.assertEquals(5,qb.s3.getView().size());
- Assert.assertEquals(5,qb.s4.getView().size());
- Assert.assertEquals(5,qb.s5.getView().size());
+ assertEquals(5, qb.s1.getView().size());
+ assertEquals(5, qb.s2.getView().size());
+ assertEquals(5, qb.s3.getView().size());
+ assertEquals(5, qb.s4.getView().size());
+ assertEquals(5, qb.s5.getView().size());
}
@Test
public void testViewContains() {
// Test view contains self
- Assert.assertTrue(qb.s1.viewContains(qb.s1.getId()));
+ assertTrue(qb.s1.viewContains(qb.s1.getId()));
// Test view contains other servers
- Assert.assertTrue(qb.s1.viewContains(qb.s2.getId()));
+ assertTrue(qb.s1.viewContains(qb.s2.getId()));
// Test view does not contain non-existant servers
- Assert.assertFalse(qb.s1.viewContains(-1L));
+ assertFalse(qb.s1.viewContains(-1L));
}
volatile int counter = 0;
@@ -134,19 +131,27 @@ public class QuorumTest extends ZKTestCase {
public void testLeaderShutdown() throws IOException, InterruptedException, KeeperException {
ZooKeeper zk = new DisconnectableZooKeeper(qb.hostPort, ClientBase.CONNECTION_TIMEOUT, new Watcher() {
public void process(WatchedEvent event) {
- }});
+ }
+ });
zk.create("/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create("/blah/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Leader leader = qb.s1.leader;
- if (leader == null) leader = qb.s2.leader;
- if (leader == null) leader = qb.s3.leader;
- if (leader == null) leader = qb.s4.leader;
- if (leader == null) leader = qb.s5.leader;
- Assert.assertNotNull(leader);
- for(int i = 0; i < 5000; i++) {
+ if (leader == null) {
+ leader = qb.s2.leader;
+ }
+ if (leader == null) {
+ leader = qb.s3.leader;
+ }
+ if (leader == null) {
+ leader = qb.s4.leader;
+ }
+ if (leader == null) {
+ leader = qb.s5.leader;
+ }
+ assertNotNull(leader);
+ for (int i = 0; i < 5000; i++) {
zk.setData("/blah/blah", new byte[0], -1, new AsyncCallback.StatCallback() {
- public void processResult(int rc, String path, Object ctx,
- Stat stat) {
+ public void processResult(int rc, String path, Object ctx, Stat stat) {
counter++;
if (rc != 0) {
errors++;
@@ -154,13 +159,12 @@ public class QuorumTest extends ZKTestCase {
}
}, null);
}
- for(LearnerHandler f : leader.getForwardingFollowers()) {
+ for (LearnerHandler f : leader.getForwardingFollowers()) {
f.getSocket().shutdownInput();
}
- for(int i = 0; i < 5000; i++) {
+ for (int i = 0; i < 5000; i++) {
zk.setData("/blah/blah", new byte[0], -1, new AsyncCallback.StatCallback() {
- public void processResult(int rc, String path, Object ctx,
- Stat stat) {
+ public void processResult(int rc, String path, Object ctx, Stat stat) {
counter++;
if (rc != 0) {
errors++;
@@ -169,18 +173,16 @@ public class QuorumTest extends ZKTestCase {
}, null);
}
// check if all the followers are alive
- Assert.assertTrue(qb.s1.isAlive());
- Assert.assertTrue(qb.s2.isAlive());
- Assert.assertTrue(qb.s3.isAlive());
- Assert.assertTrue(qb.s4.isAlive());
- Assert.assertTrue(qb.s5.isAlive());
+ assertTrue(qb.s1.isAlive());
+ assertTrue(qb.s2.isAlive());
+ assertTrue(qb.s3.isAlive());
+ assertTrue(qb.s4.isAlive());
+ assertTrue(qb.s5.isAlive());
zk.close();
}
@Test
- public void testMultipleWatcherObjs() throws IOException,
- InterruptedException, KeeperException
- {
+ public void testMultipleWatcherObjs() throws IOException, InterruptedException, KeeperException {
ct.testMutipleWatcherObjs();
}
@@ -194,42 +196,43 @@ public class QuorumTest extends ZKTestCase {
*/
@Test
public void testSessionMoved() throws Exception {
- String hostPorts[] = qb.hostPort.split(",");
- DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hostPorts[0],
- ClientBase.CONNECTION_TIMEOUT, new Watcher() {
+ String[] hostPorts = qb.hostPort.split(",");
+ DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hostPorts[0], ClientBase.CONNECTION_TIMEOUT, new Watcher() {
public void process(WatchedEvent event) {
- }});
+ }
+ });
zk.create("/sessionMoveTest", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
// we want to loop through the list twice
- for(int i = 0; i < hostPorts.length*2; i++) {
+ for (int i = 0; i < hostPorts.length * 2; i++) {
zk.dontReconnect();
// This should stomp the zk handle
- DisconnectableZooKeeper zknew =
- new DisconnectableZooKeeper(hostPorts[(i+1)%hostPorts.length],
- ClientBase.CONNECTION_TIMEOUT,
- new Watcher() {public void process(WatchedEvent event) {
- }},
- zk.getSessionId(),
- zk.getSessionPasswd());
+ DisconnectableZooKeeper zknew = new DisconnectableZooKeeper(hostPorts[(i + 1)
+ % hostPorts.length], ClientBase.CONNECTION_TIMEOUT, new Watcher() {
+ public void process(WatchedEvent event) {
+ }
+ }, zk.getSessionId(), zk.getSessionPasswd());
zknew.setData("/", new byte[1], -1);
- final int result[] = new int[1];
+ final int[] result = new int[1];
result[0] = Integer.MAX_VALUE;
zknew.sync("/", new AsyncCallback.VoidCallback() {
- public void processResult(int rc, String path, Object ctx) {
- synchronized(result) { result[0] = rc; result.notify(); }
+ public void processResult(int rc, String path, Object ctx) {
+ synchronized (result) {
+ result[0] = rc;
+ result.notify();
}
- }, null);
- synchronized(result) {
- if(result[0] == Integer.MAX_VALUE) {
+ }
+ }, null);
+ synchronized (result) {
+ if (result[0] == Integer.MAX_VALUE) {
result.wait(5000);
}
}
- LOG.info(hostPorts[(i+1)%hostPorts.length] + " Sync returned " + result[0]);
- Assert.assertTrue(result[0] == KeeperException.Code.OK.intValue());
+ LOG.info(hostPorts[(i + 1) % hostPorts.length] + " Sync returned " + result[0]);
+ assertTrue(result[0] == KeeperException.Code.OK.intValue());
try {
zk.setData("/", new byte[1], -1);
- Assert.fail("Should have lost the connection");
- } catch(KeeperException.ConnectionLossException e) {
+ fail("Should have lost the connection");
+ } catch (KeeperException.ConnectionLossException e) {
}
zk = zknew;
}
@@ -237,12 +240,14 @@ public class QuorumTest extends ZKTestCase {
}
private static class DiscoWatcher implements Watcher {
+
volatile boolean zkDisco = false;
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.Disconnected) {
zkDisco = true;
}
}
+
}
/**
@@ -255,35 +260,26 @@ public class QuorumTest extends ZKTestCase {
*/
@Test
public void testSessionMovedWithMultiOp() throws Exception {
- String hostPorts[] = qb.hostPort.split(",");
- DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hostPorts[0],
- ClientBase.CONNECTION_TIMEOUT, new Watcher() {
+ String[] hostPorts = qb.hostPort.split(",");
+ DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hostPorts[0], ClientBase.CONNECTION_TIMEOUT, new Watcher() {
public void process(WatchedEvent event) {
- }});
- zk.multi(Arrays.asList(
- Op.create("/testSessionMovedWithMultiOp", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
- ));
+ }
+ });
+ zk.multi(Arrays.asList(Op.create("/testSessionMovedWithMultiOp", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)));
// session moved to the next server
- ZooKeeper zknew = new ZooKeeper(hostPorts[1],
- ClientBase.CONNECTION_TIMEOUT,
- new Watcher() {public void process(WatchedEvent event) {
- }},
- zk.getSessionId(),
- zk.getSessionPasswd());
- zknew.multi(Arrays.asList(
- Op.create("/testSessionMovedWithMultiOp-1", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
- ));
+ ZooKeeper zknew = new ZooKeeper(hostPorts[1], ClientBase.CONNECTION_TIMEOUT, new Watcher() {
+ public void process(WatchedEvent event) {
+ }
+ }, zk.getSessionId(), zk.getSessionPasswd());
+ zknew.multi(Arrays.asList(Op.create("/testSessionMovedWithMultiOp-1", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)));
// try to issue the multi op again from the old connection
// expect to have ConnectionLossException instead of keep
// getting SessionMovedException
try {
- zk.multi(Arrays.asList(
- Op.create("/testSessionMovedWithMultiOp-Failed",
- new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
- ));
- Assert.fail("Should have lost the connection");
+ zk.multi(Arrays.asList(Op.create("/testSessionMovedWithMultiOp-Failed", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)));
+ fail("Should have lost the connection");
} catch (KeeperException.ConnectionLossException e) {
}
@@ -298,22 +294,19 @@ public class QuorumTest extends ZKTestCase {
@Test
@Ignore
public void testSessionMove() throws Exception {
- String hps[] = qb.hostPort.split(",");
+ String[] hps = qb.hostPort.split(",");
DiscoWatcher oldWatcher = new DiscoWatcher();
- DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hps[0],
- ClientBase.CONNECTION_TIMEOUT, oldWatcher);
+ DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hps[0], ClientBase.CONNECTION_TIMEOUT, oldWatcher);
zk.create("/t1", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
zk.dontReconnect();
// This should stomp the zk handle
DiscoWatcher watcher = new DiscoWatcher();
- DisconnectableZooKeeper zknew = new DisconnectableZooKeeper(hps[1],
- ClientBase.CONNECTION_TIMEOUT, watcher, zk.getSessionId(),
- zk.getSessionPasswd());
+ DisconnectableZooKeeper zknew = new DisconnectableZooKeeper(hps[1], ClientBase.CONNECTION_TIMEOUT, watcher, zk.getSessionId(), zk.getSessionPasswd());
zknew.create("/t2", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
try {
zk.create("/t3", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- Assert.fail("Should have lost the connection");
- } catch(KeeperException.ConnectionLossException e) {
+ fail("Should have lost the connection");
+ } catch (KeeperException.ConnectionLossException e) {
// wait up to 30 seconds for the disco to be delivered
for (int i = 0; i < 30; i++) {
if (oldWatcher.zkDisco) {
@@ -321,21 +314,19 @@ public class QuorumTest extends ZKTestCase {
}
Thread.sleep(1000);
}
- Assert.assertTrue(oldWatcher.zkDisco);
+ assertTrue(oldWatcher.zkDisco);
}
ArrayList<ZooKeeper> toClose = new ArrayList<ZooKeeper>();
toClose.add(zknew);
// Let's just make sure it can still move
- for(int i = 0; i < 10; i++) {
+ for (int i = 0; i < 10; i++) {
zknew.dontReconnect();
- zknew = new DisconnectableZooKeeper(hps[1],
- ClientBase.CONNECTION_TIMEOUT, new DiscoWatcher(),
- zk.getSessionId(), zk.getSessionPasswd());
+ zknew = new DisconnectableZooKeeper(hps[1], ClientBase.CONNECTION_TIMEOUT, new DiscoWatcher(), zk.getSessionId(), zk.getSessionPasswd());
toClose.add(zknew);
- zknew.create("/t-"+i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ zknew.create("/t-" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
- for (ZooKeeper z: toClose) {
+ for (ZooKeeper z : toClose) {
z.close();
}
zk.close();
@@ -351,8 +342,9 @@ public class QuorumTest extends ZKTestCase {
qu.startQuorum();
int index = 1;
- while(qu.getPeer(index).peer.leader == null)
+ while (qu.getPeer(index).peer.leader == null) {
index++;
+ }
// break the quorum
qu.shutdown(index);
@@ -363,13 +355,14 @@ public class QuorumTest extends ZKTestCase {
// Connect the client after services are restarted (otherwise we would get
// SessionExpiredException as the previous local session was not persisted).
ZooKeeper zk = new ZooKeeper(
- "127.0.0.1:" + qu.getPeer((index == 1)?2:1).peer.getClientPort(),
- ClientBase.CONNECTION_TIMEOUT, watcher);
+ "127.0.0.1:" + qu.getPeer((index == 1) ? 2 : 1).peer.getClientPort(),
+ ClientBase.CONNECTION_TIMEOUT,
+ watcher);
- try{
+ try {
watcher.waitForConnected(CONNECTION_TIMEOUT);
- } catch(TimeoutException e) {
- Assert.fail("client could not connect to reestablished quorum: giving up after 30+ seconds.");
+ } catch (TimeoutException e) {
+ fail("client could not connect to reestablished quorum: giving up after 30+ seconds.");
}
zk.close();
@@ -393,23 +386,25 @@ public class QuorumTest extends ZKTestCase {
qu.startQuorum();
int index = 1;
- while(qu.getPeer(index).peer.leader == null)
+ while (qu.getPeer(index).peer.leader == null) {
index++;
+ }
ZooKeeper zk = new ZooKeeper(
- "127.0.0.1:" + qu.getPeer((index == 1)?2:1).peer.getClientPort(),
- ClientBase.CONNECTION_TIMEOUT, watcher);
+ "127.0.0.1:" + qu.getPeer((index == 1) ? 2 : 1).peer.getClientPort(),
+ ClientBase.CONNECTION_TIMEOUT,
+ watcher);
watcher.waitForConnected(CONNECTION_TIMEOUT);
zk.multi(Arrays.asList(
Op.create("/multi0", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
Op.create("/multi1", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
- Op.create("/multi2", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
- ));
+ Op.create("/multi2", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)));
zk.getData("/multi0", false, null);
zk.getData("/multi1", false, null);
zk.getData("/multi2", false, null);
zk.close();
}
+
}