summaryrefslogtreecommitdiff
path: root/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
diff options
context:
space:
mode:
authortison <wander4096@gmail.com>2019-08-17 08:13:15 -0700
committerMichael Han <hanm@apache.org>2019-08-17 08:13:15 -0700
commitfe940cdd8fb23ba09684cefb73233d570f4a20fa (patch)
tree28de6768b6ef6f233840636cb50a1cd35512e319 /zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
parent04cc5cae1e7d4b007490e68109a676b88ce790a3 (diff)
downloadzookeeper-fe940cdd8fb23ba09684cefb73233d570f4a20fa.tar.gz
ZOOKEEPER-3475: Enable Checkstyle configuration on zookeeper-server
- [x] org/apache/zookeeper - [x] org/apache/zookeeper/admin - [x] org/apache/zookeeper/cli - [x] org/apache/zookeeper/client - [x] org/apache/zookeeper/common - [x] org/apache/zookeeper/jmx - [x] org/apache/zookeeper/metrics - [x] org/apache/zookeeper/metrics/impl - [x] org/apache/zookeeper/server - [x] org/apache/zookeeper/server/admin - [x] org/apache/zookeeper/server/auth - [x] org/apache/zookeeper/server/command - [x] org/apache/zookeeper/server/metric - [x] org/apache/zookeeper/server/persistence - [x] org/apache/zookeeper/server/quorum - [x] org/apache/zookeeper/server/quorum/auth - [x] org/apache/zookeeper/server/quorum/flexible - [x] org/apache/zookeeper/server/util - [x] org/apache/zookeeper/server/watch - [x] org/apache/zookeeper/test - [x] org/apache/zookeeper/util - [x] org/apache/zookeeper/version/util Author: tison <wander4096@gmail.com> Reviewers: Brian Nixon <nixon@fb.com>, Enrico Olivelli <eolivelli@gmail.com>, Michael Han <hanm@apache.org> Closes #1049 from TisonKun/ZOOKEEPER-3475
Diffstat (limited to 'zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java')
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java390
1 files changed, 190 insertions, 200 deletions
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
index 95889ef1d..7d342a0d7 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,6 +18,9 @@
package org.apache.zookeeper.server.quorum;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -28,7 +31,6 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.CreateMode;
@@ -44,15 +46,14 @@ import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.WorkerService;
import org.apache.zookeeper.server.ZooKeeperServerListener;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CommitProcessorConcurrencyTest extends ZKTestCase {
- protected static final Logger LOG = LoggerFactory
- .getLogger(CommitProcessorConcurrencyTest.class);
+
+ protected static final Logger LOG = LoggerFactory.getLogger(CommitProcessorConcurrencyTest.class);
BlockingQueue<Request> processedRequests;
MockCommitProcessor processor;
@@ -74,6 +75,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
// This queue is infinite if we use "poll" to get requests, but returns a
// finite size when asked.
class MockRequestsQueue extends LinkedBlockingQueue<Request> {
+
private static final long serialVersionUID = 1L;
int readReqId = 0;
@@ -81,12 +83,10 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
public Request poll() {
readReqId++;
try {
- return newRequest(new GetDataRequest("/", false),
- OpCode.getData, readReqId % 50, readReqId);
+ return newRequest(new GetDataRequest("/", false), OpCode.getData, readReqId % 50, readReqId);
} catch (IOException e) {
e.printStackTrace();
}
- ;
return null;
}
@@ -94,13 +94,14 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
public int size() {
return 42;
}
+
}
class MockCommitProcessor extends CommitProcessor {
+
MockCommitProcessor() {
super(new RequestProcessor() {
- public void processRequest(Request request)
- throws RequestProcessorException {
+ public void processRequest(Request request) throws RequestProcessorException {
processedRequests.offer(request);
}
@@ -110,20 +111,19 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
@Override
public void notifyStopping(String threadName, int errorCode) {
- Assert.fail("Commit processor crashed " + errorCode);
+ fail("Commit processor crashed " + errorCode);
}
});
}
public void initThreads(int poolSize) {
this.stopped = false;
- this.workerPool = new WorkerService("CommitProcWork", poolSize,
- true);
+ this.workerPool = new WorkerService("CommitProcWork", poolSize, true);
}
+
}
- private Request newRequest(Record rec, int type, int sessionId, int xid)
- throws IOException {
+ private Request newRequest(Record rec, int type, int sessionId, int xid) throws IOException {
ByteArrayOutputStream boas = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas);
rec.serialize(boa, "request");
@@ -137,15 +137,11 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
* according to the order of the session (first read, then the write).
*/
@Test
- public void committedAndUncommittedOfTheSameSessionRaceTest()
- throws Exception {
+ public void committedAndUncommittedOfTheSameSessionRaceTest() throws Exception {
final String path = "/testCvsUCRace";
- Request readReq = newRequest(new GetDataRequest(path, false),
- OpCode.getData, 0x0, 0);
- Request writeReq = newRequest(
- new SetDataRequest(path, new byte[16], -1), OpCode.setData, 0x0,
- 1);
+ Request readReq = newRequest(new GetDataRequest(path, false), OpCode.getData, 0x0, 0);
+ Request writeReq = newRequest(new SetDataRequest(path, new byte[16], -1), OpCode.setData, 0x0, 1);
processor.committedRequests.add(writeReq);
processor.queuedRequests.add(readReq);
@@ -156,17 +152,13 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
processor.stoppedMainLoop = true;
processor.run();
- Assert.assertTrue(
- "Request was not processed " + readReq + " instead "
- + processedRequests.peek(),
- processedRequests.peek() != null
- && processedRequests.peek().equals(readReq));
+ assertTrue(
+ "Request was not processed " + readReq + " instead " + processedRequests.peek(),
+ processedRequests.peek() != null && processedRequests.peek().equals(readReq));
processedRequests.poll();
- Assert.assertTrue(
- "Request was not processed " + writeReq + " instead "
- + processedRequests.peek(),
- processedRequests.peek() != null
- && processedRequests.peek().equals(writeReq));
+ assertTrue(
+ "Request was not processed " + writeReq + " instead " + processedRequests.peek(),
+ processedRequests.peek() != null && processedRequests.peek().equals(writeReq));
}
/**
@@ -178,24 +170,30 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
* uncommited requests, followed by the reads are processed.
*/
@Test
- public void processAsMuchUncommittedRequestsAsPossibleTest()
- throws Exception {
+ public void processAsMuchUncommittedRequestsAsPossibleTest() throws Exception {
final String path = "/testAsMuchAsPossible";
List<Request> shouldBeProcessed = new LinkedList<Request>();
Set<Request> shouldNotBeProcessed = new HashSet<Request>();
for (int sessionId = 1; sessionId <= 5; ++sessionId) {
for (int readReqId = 1; readReqId <= sessionId; ++readReqId) {
- Request readReq = newRequest(new GetDataRequest(path, false),
- OpCode.getData, sessionId, readReqId);
+ Request readReq = newRequest(new GetDataRequest(path, false), OpCode.getData, sessionId, readReqId);
shouldBeProcessed.add(readReq);
processor.queuedRequests.add(readReq);
}
Request writeReq = newRequest(
- new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
- OpCode.create, sessionId, sessionId + 1);
- Request readReq = newRequest(new GetDataRequest(path, false),
- OpCode.getData, sessionId, sessionId + 2);
+ new CreateRequest(
+ path,
+ new byte[0],
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create,
+ sessionId,
+ sessionId + 1);
+ Request readReq = newRequest(
+ new GetDataRequest(path, false),
+ OpCode.getData,
+ sessionId,
+ sessionId + 2);
processor.queuedRequests.add(writeReq);
processor.queuedWriteRequests.add(writeReq);
processor.queuedRequests.add(readReq);
@@ -211,10 +209,8 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
for (Request r : shouldBeProcessed) {
LOG.error("Did not process " + r);
}
- Assert.assertTrue("Not all requests were processed",
- shouldBeProcessed.isEmpty());
- Assert.assertFalse("Processed a wrong request",
- shouldNotBeProcessed.removeAll(processedRequests));
+ assertTrue("Not all requests were processed", shouldBeProcessed.isEmpty());
+ assertFalse("Processed a wrong request", shouldNotBeProcessed.removeAll(processedRequests));
}
/**
@@ -225,23 +221,22 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
* executed after the write, before any other write, along with new reads.
*/
@Test
- public void processAllFollowingUncommittedAfterFirstCommitTest()
- throws Exception {
+ public void processAllFollowingUncommittedAfterFirstCommitTest() throws Exception {
final String path = "/testUncommittedFollowingCommited";
Set<Request> shouldBeInPending = new HashSet<Request>();
Set<Request> shouldBeProcessedAfterPending = new HashSet<Request>();
Request writeReq = newRequest(
- new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
- OpCode.create, 0x1, 1);
+ new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create,
+ 0x1,
+ 1);
processor.queuedRequests.add(writeReq);
processor.queuedWriteRequests.add(writeReq);
shouldBeInPending.add(writeReq);
for (int readReqId = 2; readReqId <= 5; ++readReqId) {
- Request readReq = newRequest(new GetDataRequest(path, false),
- OpCode.getData, 0x1, readReqId);
+ Request readReq = newRequest(new GetDataRequest(path, false), OpCode.getData, 0x1, readReqId);
processor.queuedRequests.add(readReq);
shouldBeInPending.add(readReq);
shouldBeProcessedAfterPending.add(readReq);
@@ -250,21 +245,15 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
processor.stoppedMainLoop = true;
processor.run();
- Assert.assertTrue("Processed without waiting for commit",
- processedRequests.isEmpty());
- Assert.assertTrue("Did not handled all of queuedRequests' requests",
- processor.queuedRequests.isEmpty());
- Assert.assertTrue("Removed from blockedQueuedRequests before commit",
- !processor.queuedWriteRequests.isEmpty());
-
- shouldBeInPending
- .removeAll(processor.pendingRequests.get(writeReq.sessionId));
+ assertTrue("Processed without waiting for commit", processedRequests.isEmpty());
+ assertTrue("Did not handled all of queuedRequests' requests", processor.queuedRequests.isEmpty());
+ assertTrue("Removed from blockedQueuedRequests before commit", !processor.queuedWriteRequests.isEmpty());
+
+ shouldBeInPending.removeAll(processor.pendingRequests.get(writeReq.sessionId));
for (Request r : shouldBeInPending) {
LOG.error("Should be in pending " + r);
}
- Assert.assertTrue(
- "Not all requests moved to pending from queuedRequests",
- shouldBeInPending.isEmpty());
+ assertTrue("Not all requests moved to pending from queuedRequests", shouldBeInPending.isEmpty());
processor.committedRequests.add(writeReq);
processor.stoppedMainLoop = true;
@@ -272,16 +261,11 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
processor.initThreads(defaultSizeOfThreadPool);
Thread.sleep(500);
- Assert.assertTrue("Did not process committed request",
- processedRequests.peek() == writeReq);
- Assert.assertTrue("Did not process following read request",
- processedRequests.containsAll(shouldBeProcessedAfterPending));
- Assert.assertTrue("Did not process committed request",
- processor.committedRequests.isEmpty());
- Assert.assertTrue("Did not process committed request",
- processor.pendingRequests.isEmpty());
- Assert.assertTrue("Did not remove from blockedQueuedRequests",
- processor.queuedWriteRequests.isEmpty());
+ assertTrue("Did not process committed request", processedRequests.peek() == writeReq);
+ assertTrue("Did not process following read request", processedRequests.containsAll(shouldBeProcessedAfterPending));
+ assertTrue("Did not process committed request", processor.committedRequests.isEmpty());
+ assertTrue("Did not process committed request", processor.pendingRequests.isEmpty());
+ assertTrue("Did not remove from blockedQueuedRequests", processor.queuedWriteRequests.isEmpty());
}
/**
@@ -299,30 +283,37 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
* in the last iteration, and all lists are empty.
*/
@Test
- public void processAllWritesMaxBatchSize()
- throws Exception {
+ public void processAllWritesMaxBatchSize() throws Exception {
final String path = "/processAllWritesMaxBatchSize";
HashSet<Request> shouldBeProcessedAfterPending = new HashSet<Request>();
Request writeReq = newRequest(
- new CreateRequest(path + "_1", new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
- OpCode.create, 0x1, 1);
+ new CreateRequest(
+ path + "_1",
+ new byte[0],
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create,
+ 0x1,
+ 1);
processor.queuedRequests.add(writeReq);
processor.queuedWriteRequests.add(writeReq);
Request writeReq2 = newRequest(
- new CreateRequest(path + "_2", new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
- OpCode.create, 0x2, 1);
+ new CreateRequest(
+ path + "_2",
+ new byte[0],
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create,
+ 0x2,
+ 1);
processor.queuedRequests.add(writeReq2);
processor.queuedWriteRequests.add(writeReq2);
for (int readReqId = 2; readReqId <= 5; ++readReqId) {
- Request readReq = newRequest(new GetDataRequest(path, false),
- OpCode.getData, 0x1, readReqId);
- Request readReq2 = newRequest(new GetDataRequest(path, false),
- OpCode.getData, 0x2, readReqId);
+ Request readReq = newRequest(new GetDataRequest(path, false), OpCode.getData, 0x1, readReqId);
+ Request readReq2 = newRequest(new GetDataRequest(path, false), OpCode.getData, 0x2, readReqId);
processor.queuedRequests.add(readReq);
shouldBeProcessedAfterPending.add(readReq);
processor.queuedRequests.add(readReq2);
@@ -330,9 +321,14 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
}
Request writeReq3 = newRequest(
- new CreateRequest(path + "_3", new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
- OpCode.create, 0x2, 6);
+ new CreateRequest(
+ path + "_3",
+ new byte[0],
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create,
+ 0x2,
+ 6);
processor.queuedRequests.add(writeReq3);
processor.queuedWriteRequests.add(writeReq3);
@@ -341,16 +337,11 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
processor.stoppedMainLoop = true;
CommitProcessor.setMaxCommitBatchSize(2);
processor.run();
- Assert.assertTrue("Processed without waiting for commit",
- processedRequests.isEmpty());
- Assert.assertTrue("Did not handled all of queuedRequests' requests",
- processor.queuedRequests.isEmpty());
- Assert.assertTrue("Removed from blockedQueuedRequests before commit",
- !processor.queuedWriteRequests.isEmpty());
- Assert.assertTrue("Missing session 1 in pending queue",
- processor.pendingRequests.containsKey(writeReq.sessionId));
- Assert.assertTrue("Missing session 2 in pending queue",
- processor.pendingRequests.containsKey(writeReq2.sessionId));
+ assertTrue("Processed without waiting for commit", processedRequests.isEmpty());
+ assertTrue("Did not handled all of queuedRequests' requests", processor.queuedRequests.isEmpty());
+ assertTrue("Removed from blockedQueuedRequests before commit", !processor.queuedWriteRequests.isEmpty());
+ assertTrue("Missing session 1 in pending queue", processor.pendingRequests.containsKey(writeReq.sessionId));
+ assertTrue("Missing session 2 in pending queue", processor.pendingRequests.containsKey(writeReq2.sessionId));
processor.committedRequests.add(writeReq);
processor.committedRequests.add(writeReq2);
@@ -361,33 +352,34 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
processor.initThreads(defaultSizeOfThreadPool);
Thread.sleep(500);
- Assert.assertTrue("Did not process committed request",
- processedRequests.peek() == writeReq);
- Assert.assertTrue("Did not process following read request",
- processedRequests.containsAll(shouldBeProcessedAfterPending));
- Assert.assertTrue("Processed committed request",
- !processor.committedRequests.isEmpty());
- Assert.assertTrue("Removed commit for write req 3",
- processor.committedRequests.peek() == writeReq3);
- Assert.assertTrue("Processed committed request",
- !processor.pendingRequests.isEmpty());
- Assert.assertTrue("Missing session 2 in pending queue",
- processor.pendingRequests.containsKey(writeReq3.sessionId));
- Assert.assertTrue("Missing write 3 in pending queue",
- processor.pendingRequests.get(writeReq3.sessionId).peek() == writeReq3);
- Assert.assertTrue("Removed from blockedQueuedRequests",
- !processor.queuedWriteRequests.isEmpty());
- Assert.assertTrue("Removed write req 3 from blockedQueuedRequests",
- processor.queuedWriteRequests.peek() == writeReq3);
-
- Request readReq3 = newRequest(new GetDataRequest(path, false),
- OpCode.getData, 0x1, 7);
+ assertTrue("Did not process committed request", processedRequests.peek() == writeReq);
+ assertTrue("Did not process following read request", processedRequests.containsAll(shouldBeProcessedAfterPending));
+ assertTrue("Processed committed request", !processor.committedRequests.isEmpty());
+ assertTrue("Removed commit for write req 3", processor.committedRequests.peek() == writeReq3);
+ assertTrue("Processed committed request", !processor.pendingRequests.isEmpty());
+ assertTrue("Missing session 2 in pending queue", processor.pendingRequests.containsKey(writeReq3.sessionId));
+ assertTrue(
+ "Missing write 3 in pending queue",
+ processor.pendingRequests.get(writeReq3.sessionId).peek() == writeReq3);
+ assertTrue(
+ "Removed from blockedQueuedRequests",
+ !processor.queuedWriteRequests.isEmpty());
+ assertTrue(
+ "Removed write req 3 from blockedQueuedRequests",
+ processor.queuedWriteRequests.peek() == writeReq3);
+
+ Request readReq3 = newRequest(new GetDataRequest(path, false), OpCode.getData, 0x1, 7);
processor.queuedRequests.add(readReq3);
shouldBeProcessedAfterPending.add(readReq3);
Request writeReq4 = newRequest(
- new CreateRequest(path + "_4", new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
- OpCode.create, 0x2, 7);
+ new CreateRequest(
+ path + "_4",
+ new byte[0],
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create,
+ 0x2,
+ 7);
processor.queuedRequests.add(writeReq4);
processor.queuedWriteRequests.add(writeReq4);
@@ -399,18 +391,14 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
processor.initThreads(defaultSizeOfThreadPool);
Thread.sleep(500);
- Assert.assertTrue("Did not process committed request",
- processedRequests.peek() == writeReq);
- Assert.assertTrue("Did not process following read request",
- processedRequests.containsAll(shouldBeProcessedAfterPending));
- Assert.assertTrue("Processed unexpected committed request",
- !processor.committedRequests.isEmpty());
- Assert.assertTrue("Unexpected pending request",
- processor.pendingRequests.isEmpty());
- Assert.assertTrue("Removed from blockedQueuedRequests",
- !processor.queuedWriteRequests.isEmpty());
- Assert.assertTrue("Removed write req 4 from blockedQueuedRequests",
- processor.queuedWriteRequests.peek() == writeReq4);
+ assertTrue("Did not process committed request", processedRequests.peek() == writeReq);
+ assertTrue("Did not process following read request", processedRequests.containsAll(shouldBeProcessedAfterPending));
+ assertTrue("Processed unexpected committed request", !processor.committedRequests.isEmpty());
+ assertTrue("Unexpected pending request", processor.pendingRequests.isEmpty());
+ assertTrue("Removed from blockedQueuedRequests", !processor.queuedWriteRequests.isEmpty());
+ assertTrue(
+ "Removed write req 4 from blockedQueuedRequests",
+ processor.queuedWriteRequests.peek() == writeReq4);
processor.stoppedMainLoop = true;
CommitProcessor.setMaxCommitBatchSize(3);
@@ -418,16 +406,11 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
processor.initThreads(defaultSizeOfThreadPool);
Thread.sleep(500);
- Assert.assertTrue("Did not process committed request",
- processedRequests.peek() == writeReq);
- Assert.assertTrue("Did not process following read request",
- processedRequests.containsAll(shouldBeProcessedAfterPending));
- Assert.assertTrue("Did not process committed request",
- processor.committedRequests.isEmpty());
- Assert.assertTrue("Did not process committed request",
- processor.pendingRequests.isEmpty());
- Assert.assertTrue("Did not remove from blockedQueuedRequests",
- processor.queuedWriteRequests.isEmpty());
+ assertTrue("Did not process committed request", processedRequests.peek() == writeReq);
+ assertTrue("Did not process following read request", processedRequests.containsAll(shouldBeProcessedAfterPending));
+ assertTrue("Did not process committed request", processor.committedRequests.isEmpty());
+ assertTrue("Did not process committed request", processor.pendingRequests.isEmpty());
+ assertTrue("Did not remove from blockedQueuedRequests", processor.queuedWriteRequests.isEmpty());
}
@@ -444,9 +427,10 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
Set<Request> nonLocalCommits = new HashSet<Request>();
for (int i = 0; i < 10; i++) {
Request nonLocalCommitReq = newRequest(
- new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
- OpCode.create, 51, i + 1);
+ new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create,
+ 51,
+ i + 1);
processor.committedRequests.add(nonLocalCommitReq);
nonLocalCommits.add(nonLocalCommitReq);
}
@@ -455,8 +439,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
processor.stoppedMainLoop = true;
processor.run();
}
- Assert.assertTrue("commit request was not processed",
- processedRequests.containsAll(nonLocalCommits));
+ assertTrue("commit request was not processed", processedRequests.containsAll(nonLocalCommits));
}
/**
@@ -474,42 +457,47 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
// +1 committed requests (also head of queuedRequests)
Request firstCommittedReq = newRequest(
- new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
- OpCode.create, 0x3, 1);
+ new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create,
+ 0x3,
+ 1);
processor.queuedRequests.add(firstCommittedReq);
processor.queuedWriteRequests.add(firstCommittedReq);
processor.committedRequests.add(firstCommittedReq);
Set<Request> allReads = new HashSet<Request>();
// +1 read request to queuedRequests
- Request firstRead = newRequest(new GetDataRequest(path, false),
- OpCode.getData, 0x1, 0);
+ Request firstRead = newRequest(new GetDataRequest(path, false), OpCode.getData, 0x1, 0);
allReads.add(firstRead);
processor.queuedRequests.add(firstRead);
// +1 non local commit
Request secondCommittedReq = newRequest(
- new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
- OpCode.create, 0x99, 2);
+ new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create,
+ 0x99,
+ 2);
processor.committedRequests.add(secondCommittedReq);
Set<Request> waitingCommittedRequests = new HashSet<Request>();
// +99 non local committed requests
for (int writeReqId = 3; writeReqId < 102; ++writeReqId) {
Request writeReq = newRequest(
- new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
- OpCode.create, 0x8, writeReqId);
+ new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create,
+ 0x8,
+ writeReqId);
processor.committedRequests.add(writeReq);
waitingCommittedRequests.add(writeReq);
}
// +50 read requests to queuedRequests
for (int readReqId = 1; readReqId <= 50; ++readReqId) {
- Request readReq = newRequest(new GetDataRequest(path, false),
- OpCode.getData, 0x5, readReqId);
+ Request readReq = newRequest(
+ new GetDataRequest(path, false),
+ OpCode.getData,
+ 0x5,
+ readReqId);
allReads.add(readReq);
processor.queuedRequests.add(readReq);
}
@@ -518,20 +506,15 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
processor.stoppedMainLoop = true;
processor.run();
- Assert.assertTrue("Did not process the first write request",
- processedRequests.contains(firstCommittedReq));
+ assertTrue("Did not process the first write request", processedRequests.contains(firstCommittedReq));
for (Request r : allReads) {
- Assert.assertTrue("Processed read request",
- !processedRequests.contains(r));
+ assertTrue("Processed read request", !processedRequests.contains(r));
}
processor.run();
- Assert.assertTrue("did not processed all reads",
- processedRequests.containsAll(allReads));
- Assert.assertTrue("Did not process the second write request",
- processedRequests.contains(secondCommittedReq));
+ assertTrue("did not processed all reads", processedRequests.containsAll(allReads));
+ assertTrue("Did not process the second write request", processedRequests.contains(secondCommittedReq));
for (Request r : waitingCommittedRequests) {
- Assert.assertTrue("Processed additional committed request",
- !processedRequests.contains(r));
+ assertTrue("Processed additional committed request", !processedRequests.contains(r));
}
}
@@ -552,49 +535,50 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
HashSet<Request> localRequests = new HashSet<Request>();
// queue the blocking write request to queuedRequests
Request firstCommittedReq = newRequest(
- new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
- OpCode.create, sessionid, readReqId++);
+ new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create,
+ sessionid,
+ readReqId++);
processor.queuedRequests.add(firstCommittedReq);
processor.queuedWriteRequests.add(firstCommittedReq);
localRequests.add(firstCommittedReq);
// queue read requests to queuedRequests
- for (; readReqId <= numberofReads+firstCXid; ++readReqId) {
- Request readReq = newRequest(new GetDataRequest(path, false),
- OpCode.getData, sessionid, readReqId);
+ for (; readReqId <= numberofReads + firstCXid; ++readReqId) {
+ Request readReq = newRequest(new GetDataRequest(path, false), OpCode.getData, sessionid, readReqId);
processor.queuedRequests.add(readReq);
localRequests.add(readReq);
}
//run once
- Assert.assertTrue(processor.queuedRequests.containsAll(localRequests));
+ assertTrue(processor.queuedRequests.containsAll(localRequests));
processor.initThreads(defaultSizeOfThreadPool);
processor.run();
Thread.sleep(1000);
//We verify that the processor is waiting for the commit
- Assert.assertTrue(processedRequests.isEmpty());
+ assertTrue(processedRequests.isEmpty());
// We add a commit that belongs to the same session but with smaller cxid,
// i.e., commit of an update from previous connection of this session.
Request preSessionCommittedReq = newRequest(
- new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
- OpCode.create, sessionid, firstCXid - 2);
+ new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create,
+ sessionid,
+ firstCXid - 2);
processor.committedRequests.add(preSessionCommittedReq);
processor.committedRequests.add(firstCommittedReq);
processor.run();
Thread.sleep(1000);
//We verify that the commit processor processed the old commit prior to the newer messages
- Assert.assertTrue(processedRequests.peek() == preSessionCommittedReq);
+ assertTrue(processedRequests.peek() == preSessionCommittedReq);
processor.run();
Thread.sleep(1000);
//We verify that the commit processor handle all messages.
- Assert.assertTrue(processedRequests.containsAll(localRequests));
+ assertTrue(processedRequests.containsAll(localRequests));
}
/**
@@ -617,17 +601,17 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
// queue the blocking write request to queuedRequests
Request orphanCommittedReq = newRequest(
- new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
- OpCode.create, sessionid, lastCXid);
+ new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create,
+ sessionid,
+ lastCXid);
processor.queuedRequests.add(orphanCommittedReq);
processor.queuedWriteRequests.add(orphanCommittedReq);
localRequests.add(orphanCommittedReq);
// queue read requests to queuedRequests
- for (; readReqId <= numberofReads+lastCXid; ++readReqId) {
- Request readReq = newRequest(new GetDataRequest(path, false),
- OpCode.getData, sessionid, readReqId);
+ for (; readReqId <= numberofReads + lastCXid; ++readReqId) {
+ Request readReq = newRequest(new GetDataRequest(path, false), OpCode.getData, sessionid, readReqId);
processor.queuedRequests.add(readReq);
localRequests.add(readReq);
}
@@ -638,27 +622,33 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
Thread.sleep(1000);
//We verify that the processor is waiting for the commit
- Assert.assertTrue(processedRequests.isEmpty());
+ assertTrue(processedRequests.isEmpty());
// We add a commit that belongs to the same session but with larger cxid,
// i.e., commit of an update from the next connection of this session.
Request otherSessionCommittedReq = newRequest(
- new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
- OpCode.create, sessionid, lastCXid+10);
+ new CreateRequest(
+ path,
+ new byte[0],
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create,
+ sessionid,
+ lastCXid + 10);
processor.committedRequests.add(otherSessionCommittedReq);
processor.committedRequests.add(orphanCommittedReq);
processor.run();
Thread.sleep(1000);
//We verify that the commit processor processed the old commit prior to the newer messages
- Assert.assertTrue(processedRequests.size() == 1);
- Assert.assertTrue(processedRequests.contains(otherSessionCommittedReq));
+ assertTrue(processedRequests.size() == 1);
+ assertTrue(processedRequests.contains(otherSessionCommittedReq));
processor.run();
Thread.sleep(1000);
//We verify that the commit processor handle all messages.
- Assert.assertTrue(processedRequests.containsAll(localRequests));
+ assertTrue(processedRequests.containsAll(localRequests));
}
-} \ No newline at end of file
+
+}