From fe940cdd8fb23ba09684cefb73233d570f4a20fa Mon Sep 17 00:00:00 2001 From: tison Date: Sat, 17 Aug 2019 08:13:15 -0700 Subject: ZOOKEEPER-3475: Enable Checkstyle configuration on zookeeper-server - [x] org/apache/zookeeper - [x] org/apache/zookeeper/admin - [x] org/apache/zookeeper/cli - [x] org/apache/zookeeper/client - [x] org/apache/zookeeper/common - [x] org/apache/zookeeper/jmx - [x] org/apache/zookeeper/metrics - [x] org/apache/zookeeper/metrics/impl - [x] org/apache/zookeeper/server - [x] org/apache/zookeeper/server/admin - [x] org/apache/zookeeper/server/auth - [x] org/apache/zookeeper/server/command - [x] org/apache/zookeeper/server/metric - [x] org/apache/zookeeper/server/persistence - [x] org/apache/zookeeper/server/quorum - [x] org/apache/zookeeper/server/quorum/auth - [x] org/apache/zookeeper/server/quorum/flexible - [x] org/apache/zookeeper/server/util - [x] org/apache/zookeeper/server/watch - [x] org/apache/zookeeper/test - [x] org/apache/zookeeper/util - [x] org/apache/zookeeper/version/util Author: tison Reviewers: Brian Nixon , Enrico Olivelli , Michael Han Closes #1049 from TisonKun/ZOOKEEPER-3475 --- .../quorum/CommitProcessorConcurrencyTest.java | 390 ++++++++++----------- 1 file changed, 190 insertions(+), 200 deletions(-) (limited to 'zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java') diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java index 95889ef1d..7d342a0d7 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,6 +18,9 @@ package org.apache.zookeeper.server.quorum; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -28,7 +31,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; - import org.apache.jute.BinaryOutputArchive; import org.apache.jute.Record; import org.apache.zookeeper.CreateMode; @@ -44,15 +46,14 @@ import org.apache.zookeeper.server.RequestProcessor; import org.apache.zookeeper.server.WorkerService; import org.apache.zookeeper.server.ZooKeeperServerListener; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class CommitProcessorConcurrencyTest extends ZKTestCase { - protected static final Logger LOG = LoggerFactory - .getLogger(CommitProcessorConcurrencyTest.class); + + protected static final Logger LOG = LoggerFactory.getLogger(CommitProcessorConcurrencyTest.class); BlockingQueue processedRequests; MockCommitProcessor processor; @@ -74,6 +75,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { // This queue is infinite if we use "poll" to get requests, but returns a // finite size when asked. class MockRequestsQueue extends LinkedBlockingQueue { + private static final long serialVersionUID = 1L; int readReqId = 0; @@ -81,12 +83,10 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { public Request poll() { readReqId++; try { - return newRequest(new GetDataRequest("/", false), - OpCode.getData, readReqId % 50, readReqId); + return newRequest(new GetDataRequest("/", false), OpCode.getData, readReqId % 50, readReqId); } catch (IOException e) { e.printStackTrace(); } - ; return null; } @@ -94,13 +94,14 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { public int size() { return 42; } + } class MockCommitProcessor extends CommitProcessor { + MockCommitProcessor() { super(new RequestProcessor() { - public void processRequest(Request request) - throws RequestProcessorException { + public void processRequest(Request request) throws RequestProcessorException { processedRequests.offer(request); } @@ -110,20 +111,19 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { @Override public void notifyStopping(String threadName, int errorCode) { - Assert.fail("Commit processor crashed " + errorCode); + fail("Commit processor crashed " + errorCode); } }); } public void initThreads(int poolSize) { this.stopped = false; - this.workerPool = new WorkerService("CommitProcWork", poolSize, - true); + this.workerPool = new WorkerService("CommitProcWork", poolSize, true); } + } - private Request newRequest(Record rec, int type, int sessionId, int xid) - throws IOException { + private Request newRequest(Record rec, int type, int sessionId, int xid) throws IOException { ByteArrayOutputStream boas = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas); rec.serialize(boa, "request"); @@ -137,15 +137,11 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { * according to the order of the session (first read, then the write). */ @Test - public void committedAndUncommittedOfTheSameSessionRaceTest() - throws Exception { + public void committedAndUncommittedOfTheSameSessionRaceTest() throws Exception { final String path = "/testCvsUCRace"; - Request readReq = newRequest(new GetDataRequest(path, false), - OpCode.getData, 0x0, 0); - Request writeReq = newRequest( - new SetDataRequest(path, new byte[16], -1), OpCode.setData, 0x0, - 1); + Request readReq = newRequest(new GetDataRequest(path, false), OpCode.getData, 0x0, 0); + Request writeReq = newRequest(new SetDataRequest(path, new byte[16], -1), OpCode.setData, 0x0, 1); processor.committedRequests.add(writeReq); processor.queuedRequests.add(readReq); @@ -156,17 +152,13 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { processor.stoppedMainLoop = true; processor.run(); - Assert.assertTrue( - "Request was not processed " + readReq + " instead " - + processedRequests.peek(), - processedRequests.peek() != null - && processedRequests.peek().equals(readReq)); + assertTrue( + "Request was not processed " + readReq + " instead " + processedRequests.peek(), + processedRequests.peek() != null && processedRequests.peek().equals(readReq)); processedRequests.poll(); - Assert.assertTrue( - "Request was not processed " + writeReq + " instead " - + processedRequests.peek(), - processedRequests.peek() != null - && processedRequests.peek().equals(writeReq)); + assertTrue( + "Request was not processed " + writeReq + " instead " + processedRequests.peek(), + processedRequests.peek() != null && processedRequests.peek().equals(writeReq)); } /** @@ -178,24 +170,30 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { * uncommited requests, followed by the reads are processed. */ @Test - public void processAsMuchUncommittedRequestsAsPossibleTest() - throws Exception { + public void processAsMuchUncommittedRequestsAsPossibleTest() throws Exception { final String path = "/testAsMuchAsPossible"; List shouldBeProcessed = new LinkedList(); Set shouldNotBeProcessed = new HashSet(); for (int sessionId = 1; sessionId <= 5; ++sessionId) { for (int readReqId = 1; readReqId <= sessionId; ++readReqId) { - Request readReq = newRequest(new GetDataRequest(path, false), - OpCode.getData, sessionId, readReqId); + Request readReq = newRequest(new GetDataRequest(path, false), OpCode.getData, sessionId, readReqId); shouldBeProcessed.add(readReq); processor.queuedRequests.add(readReq); } Request writeReq = newRequest( - new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), - OpCode.create, sessionId, sessionId + 1); - Request readReq = newRequest(new GetDataRequest(path, false), - OpCode.getData, sessionId, sessionId + 2); + new CreateRequest( + path, + new byte[0], + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), + OpCode.create, + sessionId, + sessionId + 1); + Request readReq = newRequest( + new GetDataRequest(path, false), + OpCode.getData, + sessionId, + sessionId + 2); processor.queuedRequests.add(writeReq); processor.queuedWriteRequests.add(writeReq); processor.queuedRequests.add(readReq); @@ -211,10 +209,8 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { for (Request r : shouldBeProcessed) { LOG.error("Did not process " + r); } - Assert.assertTrue("Not all requests were processed", - shouldBeProcessed.isEmpty()); - Assert.assertFalse("Processed a wrong request", - shouldNotBeProcessed.removeAll(processedRequests)); + assertTrue("Not all requests were processed", shouldBeProcessed.isEmpty()); + assertFalse("Processed a wrong request", shouldNotBeProcessed.removeAll(processedRequests)); } /** @@ -225,23 +221,22 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { * executed after the write, before any other write, along with new reads. */ @Test - public void processAllFollowingUncommittedAfterFirstCommitTest() - throws Exception { + public void processAllFollowingUncommittedAfterFirstCommitTest() throws Exception { final String path = "/testUncommittedFollowingCommited"; Set shouldBeInPending = new HashSet(); Set shouldBeProcessedAfterPending = new HashSet(); Request writeReq = newRequest( - new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), - OpCode.create, 0x1, 1); + new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), + OpCode.create, + 0x1, + 1); processor.queuedRequests.add(writeReq); processor.queuedWriteRequests.add(writeReq); shouldBeInPending.add(writeReq); for (int readReqId = 2; readReqId <= 5; ++readReqId) { - Request readReq = newRequest(new GetDataRequest(path, false), - OpCode.getData, 0x1, readReqId); + Request readReq = newRequest(new GetDataRequest(path, false), OpCode.getData, 0x1, readReqId); processor.queuedRequests.add(readReq); shouldBeInPending.add(readReq); shouldBeProcessedAfterPending.add(readReq); @@ -250,21 +245,15 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { processor.stoppedMainLoop = true; processor.run(); - Assert.assertTrue("Processed without waiting for commit", - processedRequests.isEmpty()); - Assert.assertTrue("Did not handled all of queuedRequests' requests", - processor.queuedRequests.isEmpty()); - Assert.assertTrue("Removed from blockedQueuedRequests before commit", - !processor.queuedWriteRequests.isEmpty()); - - shouldBeInPending - .removeAll(processor.pendingRequests.get(writeReq.sessionId)); + assertTrue("Processed without waiting for commit", processedRequests.isEmpty()); + assertTrue("Did not handled all of queuedRequests' requests", processor.queuedRequests.isEmpty()); + assertTrue("Removed from blockedQueuedRequests before commit", !processor.queuedWriteRequests.isEmpty()); + + shouldBeInPending.removeAll(processor.pendingRequests.get(writeReq.sessionId)); for (Request r : shouldBeInPending) { LOG.error("Should be in pending " + r); } - Assert.assertTrue( - "Not all requests moved to pending from queuedRequests", - shouldBeInPending.isEmpty()); + assertTrue("Not all requests moved to pending from queuedRequests", shouldBeInPending.isEmpty()); processor.committedRequests.add(writeReq); processor.stoppedMainLoop = true; @@ -272,16 +261,11 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { processor.initThreads(defaultSizeOfThreadPool); Thread.sleep(500); - Assert.assertTrue("Did not process committed request", - processedRequests.peek() == writeReq); - Assert.assertTrue("Did not process following read request", - processedRequests.containsAll(shouldBeProcessedAfterPending)); - Assert.assertTrue("Did not process committed request", - processor.committedRequests.isEmpty()); - Assert.assertTrue("Did not process committed request", - processor.pendingRequests.isEmpty()); - Assert.assertTrue("Did not remove from blockedQueuedRequests", - processor.queuedWriteRequests.isEmpty()); + assertTrue("Did not process committed request", processedRequests.peek() == writeReq); + assertTrue("Did not process following read request", processedRequests.containsAll(shouldBeProcessedAfterPending)); + assertTrue("Did not process committed request", processor.committedRequests.isEmpty()); + assertTrue("Did not process committed request", processor.pendingRequests.isEmpty()); + assertTrue("Did not remove from blockedQueuedRequests", processor.queuedWriteRequests.isEmpty()); } /** @@ -299,30 +283,37 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { * in the last iteration, and all lists are empty. */ @Test - public void processAllWritesMaxBatchSize() - throws Exception { + public void processAllWritesMaxBatchSize() throws Exception { final String path = "/processAllWritesMaxBatchSize"; HashSet shouldBeProcessedAfterPending = new HashSet(); Request writeReq = newRequest( - new CreateRequest(path + "_1", new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), - OpCode.create, 0x1, 1); + new CreateRequest( + path + "_1", + new byte[0], + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), + OpCode.create, + 0x1, + 1); processor.queuedRequests.add(writeReq); processor.queuedWriteRequests.add(writeReq); Request writeReq2 = newRequest( - new CreateRequest(path + "_2", new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), - OpCode.create, 0x2, 1); + new CreateRequest( + path + "_2", + new byte[0], + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), + OpCode.create, + 0x2, + 1); processor.queuedRequests.add(writeReq2); processor.queuedWriteRequests.add(writeReq2); for (int readReqId = 2; readReqId <= 5; ++readReqId) { - Request readReq = newRequest(new GetDataRequest(path, false), - OpCode.getData, 0x1, readReqId); - Request readReq2 = newRequest(new GetDataRequest(path, false), - OpCode.getData, 0x2, readReqId); + Request readReq = newRequest(new GetDataRequest(path, false), OpCode.getData, 0x1, readReqId); + Request readReq2 = newRequest(new GetDataRequest(path, false), OpCode.getData, 0x2, readReqId); processor.queuedRequests.add(readReq); shouldBeProcessedAfterPending.add(readReq); processor.queuedRequests.add(readReq2); @@ -330,9 +321,14 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { } Request writeReq3 = newRequest( - new CreateRequest(path + "_3", new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), - OpCode.create, 0x2, 6); + new CreateRequest( + path + "_3", + new byte[0], + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), + OpCode.create, + 0x2, + 6); processor.queuedRequests.add(writeReq3); processor.queuedWriteRequests.add(writeReq3); @@ -341,16 +337,11 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { processor.stoppedMainLoop = true; CommitProcessor.setMaxCommitBatchSize(2); processor.run(); - Assert.assertTrue("Processed without waiting for commit", - processedRequests.isEmpty()); - Assert.assertTrue("Did not handled all of queuedRequests' requests", - processor.queuedRequests.isEmpty()); - Assert.assertTrue("Removed from blockedQueuedRequests before commit", - !processor.queuedWriteRequests.isEmpty()); - Assert.assertTrue("Missing session 1 in pending queue", - processor.pendingRequests.containsKey(writeReq.sessionId)); - Assert.assertTrue("Missing session 2 in pending queue", - processor.pendingRequests.containsKey(writeReq2.sessionId)); + assertTrue("Processed without waiting for commit", processedRequests.isEmpty()); + assertTrue("Did not handled all of queuedRequests' requests", processor.queuedRequests.isEmpty()); + assertTrue("Removed from blockedQueuedRequests before commit", !processor.queuedWriteRequests.isEmpty()); + assertTrue("Missing session 1 in pending queue", processor.pendingRequests.containsKey(writeReq.sessionId)); + assertTrue("Missing session 2 in pending queue", processor.pendingRequests.containsKey(writeReq2.sessionId)); processor.committedRequests.add(writeReq); processor.committedRequests.add(writeReq2); @@ -361,33 +352,34 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { processor.initThreads(defaultSizeOfThreadPool); Thread.sleep(500); - Assert.assertTrue("Did not process committed request", - processedRequests.peek() == writeReq); - Assert.assertTrue("Did not process following read request", - processedRequests.containsAll(shouldBeProcessedAfterPending)); - Assert.assertTrue("Processed committed request", - !processor.committedRequests.isEmpty()); - Assert.assertTrue("Removed commit for write req 3", - processor.committedRequests.peek() == writeReq3); - Assert.assertTrue("Processed committed request", - !processor.pendingRequests.isEmpty()); - Assert.assertTrue("Missing session 2 in pending queue", - processor.pendingRequests.containsKey(writeReq3.sessionId)); - Assert.assertTrue("Missing write 3 in pending queue", - processor.pendingRequests.get(writeReq3.sessionId).peek() == writeReq3); - Assert.assertTrue("Removed from blockedQueuedRequests", - !processor.queuedWriteRequests.isEmpty()); - Assert.assertTrue("Removed write req 3 from blockedQueuedRequests", - processor.queuedWriteRequests.peek() == writeReq3); - - Request readReq3 = newRequest(new GetDataRequest(path, false), - OpCode.getData, 0x1, 7); + assertTrue("Did not process committed request", processedRequests.peek() == writeReq); + assertTrue("Did not process following read request", processedRequests.containsAll(shouldBeProcessedAfterPending)); + assertTrue("Processed committed request", !processor.committedRequests.isEmpty()); + assertTrue("Removed commit for write req 3", processor.committedRequests.peek() == writeReq3); + assertTrue("Processed committed request", !processor.pendingRequests.isEmpty()); + assertTrue("Missing session 2 in pending queue", processor.pendingRequests.containsKey(writeReq3.sessionId)); + assertTrue( + "Missing write 3 in pending queue", + processor.pendingRequests.get(writeReq3.sessionId).peek() == writeReq3); + assertTrue( + "Removed from blockedQueuedRequests", + !processor.queuedWriteRequests.isEmpty()); + assertTrue( + "Removed write req 3 from blockedQueuedRequests", + processor.queuedWriteRequests.peek() == writeReq3); + + Request readReq3 = newRequest(new GetDataRequest(path, false), OpCode.getData, 0x1, 7); processor.queuedRequests.add(readReq3); shouldBeProcessedAfterPending.add(readReq3); Request writeReq4 = newRequest( - new CreateRequest(path + "_4", new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), - OpCode.create, 0x2, 7); + new CreateRequest( + path + "_4", + new byte[0], + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), + OpCode.create, + 0x2, + 7); processor.queuedRequests.add(writeReq4); processor.queuedWriteRequests.add(writeReq4); @@ -399,18 +391,14 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { processor.initThreads(defaultSizeOfThreadPool); Thread.sleep(500); - Assert.assertTrue("Did not process committed request", - processedRequests.peek() == writeReq); - Assert.assertTrue("Did not process following read request", - processedRequests.containsAll(shouldBeProcessedAfterPending)); - Assert.assertTrue("Processed unexpected committed request", - !processor.committedRequests.isEmpty()); - Assert.assertTrue("Unexpected pending request", - processor.pendingRequests.isEmpty()); - Assert.assertTrue("Removed from blockedQueuedRequests", - !processor.queuedWriteRequests.isEmpty()); - Assert.assertTrue("Removed write req 4 from blockedQueuedRequests", - processor.queuedWriteRequests.peek() == writeReq4); + assertTrue("Did not process committed request", processedRequests.peek() == writeReq); + assertTrue("Did not process following read request", processedRequests.containsAll(shouldBeProcessedAfterPending)); + assertTrue("Processed unexpected committed request", !processor.committedRequests.isEmpty()); + assertTrue("Unexpected pending request", processor.pendingRequests.isEmpty()); + assertTrue("Removed from blockedQueuedRequests", !processor.queuedWriteRequests.isEmpty()); + assertTrue( + "Removed write req 4 from blockedQueuedRequests", + processor.queuedWriteRequests.peek() == writeReq4); processor.stoppedMainLoop = true; CommitProcessor.setMaxCommitBatchSize(3); @@ -418,16 +406,11 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { processor.initThreads(defaultSizeOfThreadPool); Thread.sleep(500); - Assert.assertTrue("Did not process committed request", - processedRequests.peek() == writeReq); - Assert.assertTrue("Did not process following read request", - processedRequests.containsAll(shouldBeProcessedAfterPending)); - Assert.assertTrue("Did not process committed request", - processor.committedRequests.isEmpty()); - Assert.assertTrue("Did not process committed request", - processor.pendingRequests.isEmpty()); - Assert.assertTrue("Did not remove from blockedQueuedRequests", - processor.queuedWriteRequests.isEmpty()); + assertTrue("Did not process committed request", processedRequests.peek() == writeReq); + assertTrue("Did not process following read request", processedRequests.containsAll(shouldBeProcessedAfterPending)); + assertTrue("Did not process committed request", processor.committedRequests.isEmpty()); + assertTrue("Did not process committed request", processor.pendingRequests.isEmpty()); + assertTrue("Did not remove from blockedQueuedRequests", processor.queuedWriteRequests.isEmpty()); } @@ -444,9 +427,10 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { Set nonLocalCommits = new HashSet(); for (int i = 0; i < 10; i++) { Request nonLocalCommitReq = newRequest( - new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), - OpCode.create, 51, i + 1); + new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), + OpCode.create, + 51, + i + 1); processor.committedRequests.add(nonLocalCommitReq); nonLocalCommits.add(nonLocalCommitReq); } @@ -455,8 +439,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { processor.stoppedMainLoop = true; processor.run(); } - Assert.assertTrue("commit request was not processed", - processedRequests.containsAll(nonLocalCommits)); + assertTrue("commit request was not processed", processedRequests.containsAll(nonLocalCommits)); } /** @@ -474,42 +457,47 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { // +1 committed requests (also head of queuedRequests) Request firstCommittedReq = newRequest( - new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), - OpCode.create, 0x3, 1); + new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), + OpCode.create, + 0x3, + 1); processor.queuedRequests.add(firstCommittedReq); processor.queuedWriteRequests.add(firstCommittedReq); processor.committedRequests.add(firstCommittedReq); Set allReads = new HashSet(); // +1 read request to queuedRequests - Request firstRead = newRequest(new GetDataRequest(path, false), - OpCode.getData, 0x1, 0); + Request firstRead = newRequest(new GetDataRequest(path, false), OpCode.getData, 0x1, 0); allReads.add(firstRead); processor.queuedRequests.add(firstRead); // +1 non local commit Request secondCommittedReq = newRequest( - new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), - OpCode.create, 0x99, 2); + new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), + OpCode.create, + 0x99, + 2); processor.committedRequests.add(secondCommittedReq); Set waitingCommittedRequests = new HashSet(); // +99 non local committed requests for (int writeReqId = 3; writeReqId < 102; ++writeReqId) { Request writeReq = newRequest( - new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), - OpCode.create, 0x8, writeReqId); + new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), + OpCode.create, + 0x8, + writeReqId); processor.committedRequests.add(writeReq); waitingCommittedRequests.add(writeReq); } // +50 read requests to queuedRequests for (int readReqId = 1; readReqId <= 50; ++readReqId) { - Request readReq = newRequest(new GetDataRequest(path, false), - OpCode.getData, 0x5, readReqId); + Request readReq = newRequest( + new GetDataRequest(path, false), + OpCode.getData, + 0x5, + readReqId); allReads.add(readReq); processor.queuedRequests.add(readReq); } @@ -518,20 +506,15 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { processor.stoppedMainLoop = true; processor.run(); - Assert.assertTrue("Did not process the first write request", - processedRequests.contains(firstCommittedReq)); + assertTrue("Did not process the first write request", processedRequests.contains(firstCommittedReq)); for (Request r : allReads) { - Assert.assertTrue("Processed read request", - !processedRequests.contains(r)); + assertTrue("Processed read request", !processedRequests.contains(r)); } processor.run(); - Assert.assertTrue("did not processed all reads", - processedRequests.containsAll(allReads)); - Assert.assertTrue("Did not process the second write request", - processedRequests.contains(secondCommittedReq)); + assertTrue("did not processed all reads", processedRequests.containsAll(allReads)); + assertTrue("Did not process the second write request", processedRequests.contains(secondCommittedReq)); for (Request r : waitingCommittedRequests) { - Assert.assertTrue("Processed additional committed request", - !processedRequests.contains(r)); + assertTrue("Processed additional committed request", !processedRequests.contains(r)); } } @@ -552,49 +535,50 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { HashSet localRequests = new HashSet(); // queue the blocking write request to queuedRequests Request firstCommittedReq = newRequest( - new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), - OpCode.create, sessionid, readReqId++); + new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), + OpCode.create, + sessionid, + readReqId++); processor.queuedRequests.add(firstCommittedReq); processor.queuedWriteRequests.add(firstCommittedReq); localRequests.add(firstCommittedReq); // queue read requests to queuedRequests - for (; readReqId <= numberofReads+firstCXid; ++readReqId) { - Request readReq = newRequest(new GetDataRequest(path, false), - OpCode.getData, sessionid, readReqId); + for (; readReqId <= numberofReads + firstCXid; ++readReqId) { + Request readReq = newRequest(new GetDataRequest(path, false), OpCode.getData, sessionid, readReqId); processor.queuedRequests.add(readReq); localRequests.add(readReq); } //run once - Assert.assertTrue(processor.queuedRequests.containsAll(localRequests)); + assertTrue(processor.queuedRequests.containsAll(localRequests)); processor.initThreads(defaultSizeOfThreadPool); processor.run(); Thread.sleep(1000); //We verify that the processor is waiting for the commit - Assert.assertTrue(processedRequests.isEmpty()); + assertTrue(processedRequests.isEmpty()); // We add a commit that belongs to the same session but with smaller cxid, // i.e., commit of an update from previous connection of this session. Request preSessionCommittedReq = newRequest( - new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), - OpCode.create, sessionid, firstCXid - 2); + new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), + OpCode.create, + sessionid, + firstCXid - 2); processor.committedRequests.add(preSessionCommittedReq); processor.committedRequests.add(firstCommittedReq); processor.run(); Thread.sleep(1000); //We verify that the commit processor processed the old commit prior to the newer messages - Assert.assertTrue(processedRequests.peek() == preSessionCommittedReq); + assertTrue(processedRequests.peek() == preSessionCommittedReq); processor.run(); Thread.sleep(1000); //We verify that the commit processor handle all messages. - Assert.assertTrue(processedRequests.containsAll(localRequests)); + assertTrue(processedRequests.containsAll(localRequests)); } /** @@ -617,17 +601,17 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { // queue the blocking write request to queuedRequests Request orphanCommittedReq = newRequest( - new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), - OpCode.create, sessionid, lastCXid); + new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), + OpCode.create, + sessionid, + lastCXid); processor.queuedRequests.add(orphanCommittedReq); processor.queuedWriteRequests.add(orphanCommittedReq); localRequests.add(orphanCommittedReq); // queue read requests to queuedRequests - for (; readReqId <= numberofReads+lastCXid; ++readReqId) { - Request readReq = newRequest(new GetDataRequest(path, false), - OpCode.getData, sessionid, readReqId); + for (; readReqId <= numberofReads + lastCXid; ++readReqId) { + Request readReq = newRequest(new GetDataRequest(path, false), OpCode.getData, sessionid, readReqId); processor.queuedRequests.add(readReq); localRequests.add(readReq); } @@ -638,27 +622,33 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { Thread.sleep(1000); //We verify that the processor is waiting for the commit - Assert.assertTrue(processedRequests.isEmpty()); + assertTrue(processedRequests.isEmpty()); // We add a commit that belongs to the same session but with larger cxid, // i.e., commit of an update from the next connection of this session. Request otherSessionCommittedReq = newRequest( - new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), - OpCode.create, sessionid, lastCXid+10); + new CreateRequest( + path, + new byte[0], + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), + OpCode.create, + sessionid, + lastCXid + 10); processor.committedRequests.add(otherSessionCommittedReq); processor.committedRequests.add(orphanCommittedReq); processor.run(); Thread.sleep(1000); //We verify that the commit processor processed the old commit prior to the newer messages - Assert.assertTrue(processedRequests.size() == 1); - Assert.assertTrue(processedRequests.contains(otherSessionCommittedReq)); + assertTrue(processedRequests.size() == 1); + assertTrue(processedRequests.contains(otherSessionCommittedReq)); processor.run(); Thread.sleep(1000); //We verify that the commit processor handle all messages. - Assert.assertTrue(processedRequests.containsAll(localRequests)); + assertTrue(processedRequests.containsAll(localRequests)); } -} \ No newline at end of file + +} -- cgit v1.2.1