summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authortison <wander4096@gmail.com>2022-10-05 17:55:54 +0200
committerMate Szalay-Beko <symat@apache.com>2022-10-05 17:55:54 +0200
commit3daefac37e8a7b456542c91adea541a938df1214 (patch)
treede648e6e681a3f9b9af36a8daec06d9e1985bd0d
parente2bc3dd1618405a67e9b412f8ef67eb84141eb76 (diff)
downloadzookeeper-3daefac37e8a7b456542c91adea541a938df1214.tar.gz
ZOOKEEPER-4575: ZooKeeperServer#processPacket take record instead of bytes
This is the first step mentioned in [ZOOKEEPER-102](https://issues.apache.org/jira/browse/ZOOKEEPER-102) and we should not play with ByteBuffer among the request handling code path. Author: tison <wander4096@gmail.com> Reviewers: Enrico Olivelli <eolivelli@apache.org>, Mate Szalay-Beko <symat@apache.org> Closes #1905 from tisonkun/request-supplier
-rw-r--r--.travis.yml20
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/DeleteContainerRequest.java51
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java22
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferRequestRecord.java64
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/ContainerManager.java7
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java54
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java6
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java8
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java97
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java62
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestRecord.java46
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/SimpleRequestRecord.java68
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java75
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java4
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java5
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java6
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java14
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/audit/Slf4JAuditLoggerTest.java6
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java26
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/FinalRequestProcessorTest.java12
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java4
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java10
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java27
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperCriticalThreadMetricsTest.java3
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java3
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java6
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java5
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java6
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java6
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncRequestProcessorMetricTest.java4
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/test/LeaderSessionTrackerTest.java5
31 files changed, 438 insertions, 294 deletions
diff --git a/.travis.yml b/.travis.yml
index 81471d9ed..7fd2dcd02 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -5,15 +5,14 @@ matrix:
- os: linux
arch: arm64
jdk: openjdk11
+ install: |
+ f=$(which javac)
+ while [[ -L $f ]]; do f=$(readlink $f); done
+ export JAVA_HOME=${f%/bin/*}
+
- os: linux
arch: s390x
jdk: openjdk11
- addons:
- apt:
- update: true
- packages:
- - maven
- - libcppunit-dev
cache:
directories:
@@ -21,13 +20,10 @@ cache:
addons:
apt:
+ update: true
packages:
- - libcppunit-dev
-
-install:
- - if [ "${TRAVIS_CPU_ARCH}" == "arm64" ]; then
- sudo apt-get install maven;
- fi
+ - maven
+ - libcppunit-dev
script: mvn clean apache-rat:check verify -DskipTests spotbugs:check checkstyle:check -Pfull-build
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/DeleteContainerRequest.java b/zookeeper-server/src/main/java/org/apache/zookeeper/DeleteContainerRequest.java
new file mode 100644
index 000000000..b7fd12689
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/DeleteContainerRequest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.apache.jute.Record;
+
+public class DeleteContainerRequest implements Record {
+ private String path;
+
+ public DeleteContainerRequest() {
+ }
+
+ public DeleteContainerRequest(String path) {
+ this.path = path;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ @Override
+ public void serialize(OutputArchive archive, String tag) throws IOException {
+ archive.writeBuffer(path.getBytes(StandardCharsets.UTF_8), "path");
+ }
+
+ @Override
+ public void deserialize(InputArchive archive, String tag) throws IOException {
+ byte[] bytes = archive.readBuffer("path");
+ path = new String(bytes, StandardCharsets.UTF_8);
+ }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java b/zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java
index 5aca1711f..d6df7d924 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java
@@ -20,7 +20,6 @@ package org.apache.zookeeper.audit;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import org.apache.jute.Record;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.MultiOperationRecord;
@@ -58,7 +57,7 @@ public final class AuditHelper {
if (!ZKAuditProvider.isAuditEnabled()) {
return;
}
- String op = null;
+ String op;
//For failed transaction rc.path is null
String path = txnResult.path;
String acls = null;
@@ -69,8 +68,7 @@ public final class AuditHelper {
case ZooDefs.OpCode.create2:
case ZooDefs.OpCode.createContainer:
op = AuditConstants.OP_CREATE;
- CreateRequest createRequest = new CreateRequest();
- deserialize(request, createRequest);
+ CreateRequest createRequest = request.readRequestRecord(CreateRequest::new);
createMode = getCreateMode(createRequest);
if (failedTxn) {
path = createRequest.getPath();
@@ -80,23 +78,20 @@ public final class AuditHelper {
case ZooDefs.OpCode.deleteContainer:
op = AuditConstants.OP_DELETE;
if (failedTxn) {
- DeleteRequest deleteRequest = new DeleteRequest();
- deserialize(request, deleteRequest);
+ DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new);
path = deleteRequest.getPath();
}
break;
case ZooDefs.OpCode.setData:
op = AuditConstants.OP_SETDATA;
if (failedTxn) {
- SetDataRequest setDataRequest = new SetDataRequest();
- deserialize(request, setDataRequest);
+ SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new);
path = setDataRequest.getPath();
}
break;
case ZooDefs.OpCode.setACL:
op = AuditConstants.OP_SETACL;
- SetACLRequest setACLRequest = new SetACLRequest();
- deserialize(request, setACLRequest);
+ SetACLRequest setACLRequest = request.readRequestRecord(SetACLRequest::new);
acls = ZKUtil.aclToString(setACLRequest.getAcl());
if (failedTxn) {
path = setACLRequest.getPath();
@@ -125,10 +120,6 @@ public final class AuditHelper {
}
}
- private static void deserialize(Request request, Record record) throws IOException {
- request.readRequestRecord(record);
- }
-
private static Result getResult(ProcessTxnResult rc, boolean failedTxn) {
if (failedTxn) {
return Result.FAILURE;
@@ -191,8 +182,7 @@ public final class AuditHelper {
if (!ZKAuditProvider.isAuditEnabled()) {
return createModes;
}
- MultiOperationRecord multiRequest = new MultiOperationRecord();
- deserialize(request, multiRequest);
+ MultiOperationRecord multiRequest = request.readRequestRecord(MultiOperationRecord::new);
for (Op op : multiRequest) {
if (op.getType() == ZooDefs.OpCode.create || op.getType() == ZooDefs.OpCode.create2
|| op.getType() == ZooDefs.OpCode.createContainer) {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferRequestRecord.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferRequestRecord.java
new file mode 100644
index 000000000..5ddae2460
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferRequestRecord.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *uuuuu
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "/RequuuAS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Supplier;
+import org.apache.jute.Record;
+
+public class ByteBufferRequestRecord implements RequestRecord {
+
+ private final ByteBuffer request;
+
+ private volatile Record record;
+
+ public ByteBufferRequestRecord(ByteBuffer request) {
+ this.request = request;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T extends Record> T readRecord(Supplier<T> constructor) throws IOException {
+ if (record != null) {
+ return (T) record;
+ }
+
+ record = constructor.get();
+ request.rewind();
+ ByteBufferInputStream.byteBuffer2Record(request, record);
+ request.rewind();
+ return (T) record;
+ }
+
+ @Override
+ public byte[] readBytes() {
+ request.rewind();
+ int len = request.remaining();
+ byte[] b = new byte[len];
+ request.get(b);
+ request.rewind();
+ return b;
+ }
+
+ @Override
+ public int limit() {
+ return request.limit();
+ }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ContainerManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ContainerManager.java
index 7abac587a..2664348c2 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ContainerManager.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ContainerManager.java
@@ -18,8 +18,6 @@
package org.apache.zookeeper.server;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
@@ -27,6 +25,7 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.zookeeper.DeleteContainerRequest;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.common.Time;
import org.slf4j.Logger;
@@ -129,8 +128,8 @@ public class ContainerManager {
for (String containerPath : getCandidates()) {
long startMs = Time.currentElapsedTime();
- ByteBuffer path = ByteBuffer.wrap(containerPath.getBytes(UTF_8));
- Request request = new Request(null, 0, 0, ZooDefs.OpCode.deleteContainer, path, null);
+ DeleteContainerRequest record = new DeleteContainerRequest(containerPath);
+ Request request = new Request(null, 0, 0, ZooDefs.OpCode.deleteContainer, RequestRecord.fromRecord(record), null);
try {
LOG.info("Attempting to delete candidate container: {}", containerPath);
postDeleteRequest(request);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
index bc5b019f8..693a6b86e 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
@@ -269,8 +269,7 @@ public class FinalRequestProcessor implements RequestProcessor {
}
case OpCode.multiRead: {
lastOp = "MLTR";
- MultiOperationRecord multiReadRecord = new MultiOperationRecord();
- request.readRequestRecord(multiReadRecord);
+ MultiOperationRecord multiReadRecord = request.readRequestRecord(MultiOperationRecord::new);
rsp = new MultiResponse();
OpResult subResult;
for (Op readOp : multiReadRecord) {
@@ -348,8 +347,7 @@ public class FinalRequestProcessor implements RequestProcessor {
}
case OpCode.sync: {
lastOp = "SYNC";
- SyncRequest syncRequest = new SyncRequest();
- request.readRequestRecord(syncRequest);
+ SyncRequest syncRequest = request.readRequestRecord(SyncRequest::new);
rsp = new SyncResponse(syncRequest.getPath());
requestPathMetricsCollector.registerRequest(request.type, syncRequest.getPath());
break;
@@ -363,8 +361,7 @@ public class FinalRequestProcessor implements RequestProcessor {
case OpCode.exists: {
lastOp = "EXIS";
// TODO we need to figure out the security requirement for this!
- ExistsRequest existsRequest = new ExistsRequest();
- request.readRequestRecord(existsRequest);
+ ExistsRequest existsRequest = request.readRequestRecord(ExistsRequest::new);
path = existsRequest.getPath();
if (path.indexOf('\0') != -1) {
throw new KeeperException.BadArgumentsException();
@@ -376,8 +373,7 @@ public class FinalRequestProcessor implements RequestProcessor {
}
case OpCode.getData: {
lastOp = "GETD";
- GetDataRequest getDataRequest = new GetDataRequest();
- request.readRequestRecord(getDataRequest);
+ GetDataRequest getDataRequest = request.readRequestRecord(GetDataRequest::new);
path = getDataRequest.getPath();
rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo);
requestPathMetricsCollector.registerRequest(request.type, path);
@@ -385,8 +381,7 @@ public class FinalRequestProcessor implements RequestProcessor {
}
case OpCode.setWatches: {
lastOp = "SETW";
- SetWatches setWatches = new SetWatches();
- request.readRequestRecord(setWatches);
+ SetWatches setWatches = request.readRequestRecord(SetWatches::new);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase()
.setWatches(
@@ -401,8 +396,7 @@ public class FinalRequestProcessor implements RequestProcessor {
}
case OpCode.setWatches2: {
lastOp = "STW2";
- SetWatches2 setWatches = new SetWatches2();
- request.readRequestRecord(setWatches);
+ SetWatches2 setWatches = request.readRequestRecord(SetWatches2::new);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase().setWatches(relativeZxid,
setWatches.getDataWatches(),
@@ -415,16 +409,14 @@ public class FinalRequestProcessor implements RequestProcessor {
}
case OpCode.addWatch: {
lastOp = "ADDW";
- AddWatchRequest addWatcherRequest = new AddWatchRequest();
- request.readRequestRecord(addWatcherRequest);
+ AddWatchRequest addWatcherRequest = request.readRequestRecord(AddWatchRequest::new);
zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());
rsp = new ErrorResponse(0);
break;
}
case OpCode.getACL: {
lastOp = "GETA";
- GetACLRequest getACLRequest = new GetACLRequest();
- request.readRequestRecord(getACLRequest);
+ GetACLRequest getACLRequest = request.readRequestRecord(GetACLRequest::new);
path = getACLRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
@@ -466,8 +458,7 @@ public class FinalRequestProcessor implements RequestProcessor {
}
case OpCode.getChildren: {
lastOp = "GETC";
- GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
- request.readRequestRecord(getChildrenRequest);
+ GetChildrenRequest getChildrenRequest = request.readRequestRecord(GetChildrenRequest::new);
path = getChildrenRequest.getPath();
rsp = handleGetChildrenRequest(getChildrenRequest, cnxn, request.authInfo);
requestPathMetricsCollector.registerRequest(request.type, path);
@@ -475,8 +466,7 @@ public class FinalRequestProcessor implements RequestProcessor {
}
case OpCode.getAllChildrenNumber: {
lastOp = "GETACN";
- GetAllChildrenNumberRequest getAllChildrenNumberRequest = new GetAllChildrenNumberRequest();
- request.readRequestRecord(getAllChildrenNumberRequest);
+ GetAllChildrenNumberRequest getAllChildrenNumberRequest = request.readRequestRecord(GetAllChildrenNumberRequest::new);
path = getAllChildrenNumberRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
@@ -495,8 +485,7 @@ public class FinalRequestProcessor implements RequestProcessor {
}
case OpCode.getChildren2: {
lastOp = "GETC";
- GetChildren2Request getChildren2Request = new GetChildren2Request();
- request.readRequestRecord(getChildren2Request);
+ GetChildren2Request getChildren2Request = request.readRequestRecord(GetChildren2Request::new);
Stat stat = new Stat();
path = getChildren2Request.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
@@ -517,8 +506,7 @@ public class FinalRequestProcessor implements RequestProcessor {
}
case OpCode.checkWatches: {
lastOp = "CHKW";
- CheckWatchesRequest checkWatches = new CheckWatchesRequest();
- request.readRequestRecord(checkWatches);
+ CheckWatchesRequest checkWatches = request.readRequestRecord(CheckWatchesRequest::new);
WatcherType type = WatcherType.fromInt(checkWatches.getType());
path = checkWatches.getPath();
boolean containsWatcher = zks.getZKDatabase().containsWatcher(path, type, cnxn);
@@ -531,8 +519,7 @@ public class FinalRequestProcessor implements RequestProcessor {
}
case OpCode.removeWatches: {
lastOp = "REMW";
- RemoveWatchesRequest removeWatches = new RemoveWatchesRequest();
- request.readRequestRecord(removeWatches);
+ RemoveWatchesRequest removeWatches = request.readRequestRecord(RemoveWatchesRequest::new);
WatcherType type = WatcherType.fromInt(removeWatches.getType());
path = removeWatches.getPath();
boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn);
@@ -550,8 +537,7 @@ public class FinalRequestProcessor implements RequestProcessor {
}
case OpCode.getEphemerals: {
lastOp = "GETE";
- GetEphemeralsRequest getEphemerals = new GetEphemeralsRequest();
- request.readRequestRecord(getEphemerals);
+ GetEphemeralsRequest getEphemerals = request.readRequestRecord(GetEphemeralsRequest::new);
String prefixPath = getEphemerals.getPrefixPath();
Set<String> allEphems = zks.getZKDatabase().getDataTree().getEphemerals(request.sessionId);
List<String> ephemerals = new ArrayList<>();
@@ -585,16 +571,8 @@ public class FinalRequestProcessor implements RequestProcessor {
// log at error level as we are returning a marshalling
// error to the user
LOG.error("Failed to process {}", request, e);
- StringBuilder sb = new StringBuilder();
- byte[] payload = request.readRequestBytes();
- if (payload != null) {
- for (byte b : payload) {
- sb.append(String.format("%02x", (0xff & b)));
- }
- } else {
- sb.append("request buffer is null");
- }
- LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), sb);
+ String digest = request.requestDigest();
+ LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), digest);
err = Code.MARSHALLINGERROR;
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
index 83e7491e5..5ffc81da1 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
@@ -42,6 +42,7 @@ import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.ConnectRequest;
import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.proto.RequestHeader;
import org.apache.zookeeper.proto.WatcherEvent;
import org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread;
import org.apache.zookeeper.server.command.CommandExecutor;
@@ -392,7 +393,10 @@ public class NIOServerCnxn extends ServerCnxn {
}
protected void readRequest() throws IOException {
- zkServer.processPacket(this, incomingBuffer);
+ RequestHeader h = new RequestHeader();
+ ByteBufferInputStream.byteBuffer2Record(incomingBuffer, h);
+ RequestRecord request = RequestRecord.fromBytes(incomingBuffer.slice());
+ zkServer.processPacket(this, h, request);
}
// returns whether we are interested in writing, which is determined
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
index ae482ce2b..f95200d56 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -47,6 +47,7 @@ import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.ConnectRequest;
import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.proto.RequestHeader;
import org.apache.zookeeper.proto.WatcherEvent;
import org.apache.zookeeper.server.command.CommandExecutor;
import org.apache.zookeeper.server.command.FourLetterCommands;
@@ -478,9 +479,10 @@ public class NettyServerCnxn extends ServerCnxn {
throw new IOException("ZK down");
}
if (initialized) {
- // TODO: if zks.processPacket() is changed to take a ByteBuffer[],
- // we could implement zero-copy queueing.
- zks.processPacket(this, bb);
+ RequestHeader h = new RequestHeader();
+ ByteBufferInputStream.byteBuffer2Record(bb, h);
+ RequestRecord request = RequestRecord.fromBytes(bb.slice());
+ zks.processPacket(this, h, request);
} else {
LOG.debug("got conn req request from {}", getRemoteSocketAddress());
BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb));
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
index 9733a48ac..35293359f 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
@@ -18,7 +18,6 @@
package org.apache.zookeeper.server;
-import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.StringReader;
@@ -36,6 +35,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.DeleteContainerRequest;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.BadArgumentsException;
import org.apache.zookeeper.KeeperException.Code;
@@ -101,7 +101,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
*/
private static boolean failCreate = false;
- LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
+ LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<>();
private final RequestProcessor nextProcessor;
private final boolean digestEnabled;
@@ -311,13 +311,8 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
/**
* This method will be called inside the ProcessRequestThread, which is a
* singleton, so there will be a single thread calling this code.
- *
- * @param type
- * @param zxid
- * @param request
- * @param record
*/
- protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException {
+ protected void pRequest2Txn(int type, long zxid, Request request, Record record) throws KeeperException, IOException, RequestProcessorException {
if (request.getHdr() == null) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), type));
@@ -328,11 +323,12 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer: {
- pRequest2TxnCreate(type, request, record, deserialize);
+ pRequest2TxnCreate(type, request, record);
break;
}
case OpCode.deleteContainer: {
- String path = new String(request.readRequestBytes(), UTF_8);
+ DeleteContainerRequest txn = (DeleteContainerRequest) record;
+ String path = txn.getPath();
String parentPath = getParentPathAndValidate(path);
ChangeRecord nodeRecord = getRecordForPath(path);
if (nodeRecord.childCount > 0) {
@@ -359,9 +355,6 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
case OpCode.delete:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
DeleteRequest deleteRequest = (DeleteRequest) record;
- if (deserialize) {
- request.readRequestRecord(deleteRequest);
- }
String path = deleteRequest.getPath();
String parentPath = getParentPathAndValidate(path);
ChangeRecord parentRecord = getRecordForPath(parentPath);
@@ -387,9 +380,6 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
case OpCode.setData:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
SetDataRequest setDataRequest = (SetDataRequest) record;
- if (deserialize) {
- request.readRequestRecord(setDataRequest);
- }
path = setDataRequest.getPath();
validatePath(path, request.sessionId);
nodeRecord = getRecordForPath(path);
@@ -559,9 +549,6 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
case OpCode.setACL:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
SetACLRequest setAclRequest = (SetACLRequest) record;
- if (deserialize) {
- request.readRequestRecord(setAclRequest);
- }
path = setAclRequest.getPath();
validatePath(path, request.sessionId);
List<ACL> listACL = fixupACL(path, request.authInfo, setAclRequest.getAcl());
@@ -577,8 +564,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
addChangeRecord(nodeRecord);
break;
case OpCode.createSession:
- CreateSessionTxn createSessionTxn = new CreateSessionTxn();
- request.readRequestRecord(createSessionTxn);
+ CreateSessionTxn createSessionTxn = request.readRequestRecord(CreateSessionTxn::new);
request.setTxn(createSessionTxn);
// only add the global session tracker but not to ZKDb
zks.sessionTracker.trackSession(request.sessionId, createSessionTxn.getTimeOut());
@@ -630,9 +616,6 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
case OpCode.check:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
CheckVersionRequest checkVersionRequest = (CheckVersionRequest) record;
- if (deserialize) {
- request.readRequestRecord(checkVersionRequest);
- }
path = checkVersionRequest.getPath();
validatePath(path, request.sessionId);
nodeRecord = getRecordForPath(path);
@@ -653,11 +636,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
}
}
- private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException {
- if (deserialize) {
- request.readRequestRecord(record);
- }
-
+ private void pRequest2TxnCreate(int type, Request request, Record record) throws IOException, KeeperException {
int flags;
String path;
List<ACL> acl;
@@ -792,39 +771,41 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
case OpCode.createContainer:
case OpCode.create:
case OpCode.create2:
- CreateRequest create2Request = new CreateRequest();
- pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
+ CreateRequest create2Request = request.readRequestRecord(CreateRequest::new);
+ pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request);
break;
case OpCode.createTTL:
- CreateTTLRequest createTtlRequest = new CreateTTLRequest();
- pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true);
+ CreateTTLRequest createTtlRequest = request.readRequestRecord(CreateTTLRequest::new);
+ pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest);
break;
case OpCode.deleteContainer:
+ DeleteContainerRequest deleteContainerRequest = request.readRequestRecord(DeleteContainerRequest::new);
+ pRequest2Txn(request.type, zks.getNextZxid(), request, deleteContainerRequest);
+ break;
case OpCode.delete:
- DeleteRequest deleteRequest = new DeleteRequest();
- pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
+ DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new);
+ pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest);
break;
case OpCode.setData:
- SetDataRequest setDataRequest = new SetDataRequest();
- pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
+ SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new);
+ pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest);
break;
case OpCode.reconfig:
- ReconfigRequest reconfigRequest = new ReconfigRequest();
- request.readRequestRecord(reconfigRequest);
- pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
+ ReconfigRequest reconfigRequest = request.readRequestRecord(ReconfigRequest::new);
+ pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest);
break;
case OpCode.setACL:
- SetACLRequest setAclRequest = new SetACLRequest();
- pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
+ SetACLRequest setAclRequest = request.readRequestRecord(SetACLRequest::new);
+ pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest);
break;
case OpCode.check:
- CheckVersionRequest checkRequest = new CheckVersionRequest();
- pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
+ CheckVersionRequest checkRequest = request.readRequestRecord(CheckVersionRequest::new);
+ pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest);
break;
case OpCode.multi:
- MultiOperationRecord multiRequest = new MultiOperationRecord();
+ MultiOperationRecord multiRequest;
try {
- request.readRequestRecord(multiRequest);
+ multiRequest = request.readRequestRecord(MultiOperationRecord::new);
} catch (IOException e) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi));
throw e;
@@ -854,7 +835,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
} else {
/* Prep the request and convert to a Txn */
try {
- pRequest2Txn(op.getType(), zxid, request, subrequest, false);
+ pRequest2Txn(op.getType(), zxid, request, subrequest);
type = op.getType();
txn = request.getTxn();
} catch (KeeperException e) {
@@ -899,7 +880,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
case OpCode.createSession:
case OpCode.closeSession:
if (!request.isLocalSession()) {
- pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
+ pRequest2Txn(request.type, zks.getNextZxid(), request, null);
}
break;
@@ -944,20 +925,14 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
// log at error level as we are returning a marshalling
// error to the user
LOG.error("Failed to process {}", request, e);
- StringBuilder sb = new StringBuilder();
- byte[] payload = request.readRequestBytes();
- if (payload != null) {
- for (byte b : payload) {
- sb.append(String.format("%02x", (0xff & b)));
- }
- } else {
- sb.append("request buffer is null");
- }
- LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), sb);
- if (request.getHdr() != null) {
- request.getHdr().setType(OpCode.error);
- request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
+ String digest = request.requestDigest();
+ LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), digest);
+ if (request.getHdr() == null) {
+ request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getZxid(), Time.currentWallTime(), request.type));
}
+
+ request.getHdr().setType(OpCode.error);
+ request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
}
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index 1aee6aee2..86a50fc55 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.function.Supplier;
import org.apache.jute.Record;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.OpCode;
@@ -52,12 +53,12 @@ public class Request {
// associated session timeout. Disabled by default.
private static volatile boolean staleLatencyCheck = Boolean.parseBoolean(System.getProperty("zookeeper.request_stale_latency_check", "false"));
- public Request(ServerCnxn cnxn, long sessionId, int xid, int type, ByteBuffer bb, List<Id> authInfo) {
+ public Request(ServerCnxn cnxn, long sessionId, int xid, int type, RequestRecord request, List<Id> authInfo) {
this.cnxn = cnxn;
this.sessionId = sessionId;
this.cxid = xid;
this.type = type;
- this.request = bb;
+ this.request = request;
this.authInfo = authInfo;
}
@@ -79,30 +80,42 @@ public class Request {
public final int type;
- private final ByteBuffer request;
+ private final RequestRecord request;
- public void readRequestRecord(Record record) throws IOException {
+ public <T extends Record> T readRequestRecord(Supplier<T> constructor) throws IOException {
if (request != null) {
- request.rewind();
- ByteBufferInputStream.byteBuffer2Record(request, record);
- request.rewind();
- return;
+ return request.readRecord(constructor);
}
throw new IOException(new NullPointerException("request"));
}
+ public <T extends Record> T readRequestRecordNoException(Supplier<T> constructor) {
+ try {
+ return readRequestRecord(constructor);
+ } catch (IOException e) {
+ return null;
+ }
+ }
+
public byte[] readRequestBytes() {
if (request != null) {
- request.rewind();
- int len = request.remaining();
- byte[] b = new byte[len];
- request.get(b);
- request.rewind();
- return b;
+ return request.readBytes();
}
return null;
}
+ public String requestDigest() {
+ if (request != null) {
+ final StringBuilder sb = new StringBuilder();
+ final byte[] payload = request.readBytes();
+ for (byte b : payload) {
+ sb.append(String.format("%02x", (0xff & b)));
+ }
+ return sb.toString();
+ }
+ return "request buffer is null";
+ }
+
public final ServerCnxn cnxn;
private TxnHeader hdr;
@@ -423,18 +436,19 @@ public class Request {
&& type != OpCode.setWatches
&& type != OpCode.setWatches2
&& type != OpCode.closeSession
- && request != null
- && request.remaining() >= 4) {
+ && request != null) {
try {
// make sure we don't mess with request itself
- ByteBuffer rbuf = request.asReadOnlyBuffer();
- rbuf.clear();
- int pathLen = rbuf.getInt();
- // sanity check
- if (pathLen >= 0 && pathLen < 4096 && rbuf.remaining() >= pathLen) {
- byte[] b = new byte[pathLen];
- rbuf.get(b);
- path = new String(b, UTF_8);
+ byte[] bytes = request.readBytes();
+ if (bytes != null && bytes.length >= 4) {
+ ByteBuffer buf = ByteBuffer.wrap(bytes);
+ int pathLen = buf.getInt();
+ // sanity check
+ if (pathLen >= 0 && pathLen < 4096 && buf.remaining() >= pathLen) {
+ byte[] b = new byte[pathLen];
+ buf.get(b);
+ path = new String(b, UTF_8);
+ }
}
} catch (Exception e) {
// ignore - can't find the path, will output "n/a" instead
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestRecord.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestRecord.java
new file mode 100644
index 000000000..6265f168e
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestRecord.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *uuuuu
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "/RequuuAS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Supplier;
+import org.apache.jute.Record;
+
+public interface RequestRecord {
+
+ static RequestRecord fromBytes(ByteBuffer buffer) {
+ return new ByteBufferRequestRecord(buffer);
+ }
+
+ static RequestRecord fromBytes(byte[] bytes) {
+ return fromBytes(ByteBuffer.wrap(bytes));
+ }
+
+ static RequestRecord fromRecord(Record record) {
+ return new SimpleRequestRecord(record);
+ }
+
+ <T extends Record> T readRecord(Supplier<T> clazz) throws IOException;
+
+ byte[] readBytes();
+
+ int limit();
+
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SimpleRequestRecord.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SimpleRequestRecord.java
new file mode 100644
index 000000000..a1c78ddad
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SimpleRequestRecord.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *uuuuu
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "/RequuuAS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.function.Supplier;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Record;
+
+public class SimpleRequestRecord implements RequestRecord {
+
+ private final Record record;
+
+ private volatile byte[] bytes;
+
+ public SimpleRequestRecord(Record record) {
+ this.record = record;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T extends Record> T readRecord(Supplier<T> constructor) {
+ return (T) record;
+ }
+
+ @SuppressFBWarnings("EI_EXPOSE_REP")
+ @Override
+ public byte[] readBytes() {
+ if (bytes != null) {
+ return bytes;
+ }
+
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+ record.serialize(boa, "request");
+ bytes = baos.toByteArray();
+ return bytes;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Override
+ public int limit() {
+ byte[] bytes = readBytes();
+ return ByteBuffer.wrap(bytes).limit();
+ }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 817e84b3e..f6c2b93eb 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -22,7 +22,6 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
@@ -39,7 +38,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import javax.security.sasl.SaslException;
-import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.Environment;
@@ -276,7 +274,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
// Connection throttling
- private BlueThrottle connThrottle = new BlueThrottle();
+ private final BlueThrottle connThrottle = new BlueThrottle();
@SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification =
"Internally the throttler has a BlockingQueue so "
@@ -290,17 +288,17 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
* too many large requests such that the JVM runs out of usable heap and
* ultimately crashes.
*
- * The limit is enforced by the {@link checkRequestSize(int, boolean)}
+ * The limit is enforced by the {@link #checkRequestSizeWhenReceivingMessage(int)}
* method which is called by the connection layer ({@link NIOServerCnxn},
* {@link NettyServerCnxn}) before allocating a byte buffer and pulling
* data off the TCP socket. The limit is then checked again by the
- * ZooKeeper server in {@link processPacket(ServerCnxn, ByteBuffer)} which
- * also atomically updates {@link currentLargeRequestBytes}. The request is
+ * ZooKeeper server in {@link #processPacket(ServerCnxn, RequestHeader, RequestRecord)} which
+ * also atomically updates {@link #currentLargeRequestBytes}. The request is
* then marked as a large request, with the request size stored in the Request
- * object so that it can later be decremented from {@link currentLargeRequestsBytes}.
+ * object so that it can later be decremented from {@link #currentLargeRequestBytes}.
*
* When a request is completed or dropped, the relevant code path calls the
- * {@link requestFinished(Request)} method which performs the decrement if
+ * {@link #requestFinished(Request)} method which performs the decrement if
* needed.
*/
private volatile int largeRequestMaxBytes = 100 * 1024 * 1024;
@@ -313,7 +311,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
private final AtomicInteger currentLargeRequestBytes = new AtomicInteger(0);
- private AuthenticationHelper authHelper;
+ private final AuthenticationHelper authHelper = new AuthenticationHelper();
void removeCnxn(ServerCnxn cnxn) {
zkDb.removeCnxn(cnxn);
@@ -329,7 +327,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
listener = new ZooKeeperServerListenerImpl(this);
serverStats = new ServerStats(this);
this.requestPathMetricsCollector = new RequestPathMetricsCollector();
- this.authHelper = new AuthenticationHelper();
}
/**
@@ -371,8 +368,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
this.initLargeRequestThrottlingSettings();
- this.authHelper = new AuthenticationHelper();
-
LOG.info(
"Created server with"
+ " tickTime {} ms"
@@ -1015,10 +1010,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
long sessionId = sessionTracker.createSession(timeout);
Random r = new Random(sessionId ^ superSecret);
r.nextBytes(passwd);
- ByteBuffer to = ByteBuffer.allocate(4);
- to.putInt(timeout);
+ CreateSessionTxn txn = new CreateSessionTxn(timeout);
cnxn.setSessionId(sessionId);
- Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
+ Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
submitRequest(si);
return sessionId;
}
@@ -1595,13 +1589,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
}
- public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
- // We have the request, now process and setup for next
- InputStream bais = new ByteBufferInputStream(incomingBuffer);
- BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
- RequestHeader h = new RequestHeader();
- h.deserialize(bia, "header");
-
+ public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException {
// Need to increase the outstanding request count first, otherwise
// there might be a race condition that it enabled recv after
// processing request and then disabled when check throttling.
@@ -1613,14 +1601,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
// in cnxn, since it will close the cnxn anyway.
cnxn.incrOutstandingAndCheckThrottle(h);
- // Through the magic of byte buffers, txn will not be
- // pointing
- // to the start of the txn
- incomingBuffer = incomingBuffer.slice();
if (h.getType() == OpCode.auth) {
LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress());
- AuthPacket authPacket = new AuthPacket();
- ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
+ AuthPacket authPacket = request.readRecord(AuthPacket::new);
String scheme = authPacket.getScheme();
ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
Code authReturn = KeeperException.Code.AUTHFAILED;
@@ -1660,15 +1643,15 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
return;
} else if (h.getType() == OpCode.sasl) {
- processSasl(incomingBuffer, cnxn, h);
+ processSasl(request, cnxn, h);
} else {
if (!authHelper.enforceAuthentication(cnxn, h.getXid())) {
// Authentication enforcement is failed
// Already sent response to user about failure and closed the session, lets return
return;
} else {
- Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
- int length = incomingBuffer.limit();
+ Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
+ int length = request.limit();
if (isLargeRequest(length)) {
// checkRequestSize will throw IOException if request is rejected
checkRequestSizeWhenMessageReceived(length);
@@ -1706,10 +1689,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
return Boolean.getBoolean(ALLOW_SASL_FAILED_CLIENTS);
}
- private void processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException {
+ private void processSasl(RequestRecord request, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException {
LOG.debug("Responding to client SASL token.");
- GetSASLRequest clientTokenRecord = new GetSASLRequest();
- ByteBufferInputStream.byteBuffer2Record(incomingBuffer, clientTokenRecord);
+ GetSASLRequest clientTokenRecord = request.readRecord(GetSASLRequest::new);
byte[] clientToken = clientTokenRecord.getToken();
LOG.debug("Size of client SASL token: {}", clientToken.length);
byte[] responseToken = null;
@@ -2180,8 +2162,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
switch (request.type) {
case OpCode.create:
case OpCode.create2: {
- CreateRequest req = new CreateRequest();
- if (readRequestRecord(request, req)) {
+ CreateRequest req = request.readRequestRecordNoException(CreateRequest::new);
+ if (req != null) {
mustCheckACL = true;
acl = req.getAcl();
path = parentPath(req.getPath());
@@ -2189,22 +2171,22 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
break;
}
case OpCode.delete: {
- DeleteRequest req = new DeleteRequest();
- if (readRequestRecord(request, req)) {
+ DeleteRequest req = request.readRequestRecordNoException(DeleteRequest::new);
+ if (req != null) {
path = parentPath(req.getPath());
}
break;
}
case OpCode.setData: {
- SetDataRequest req = new SetDataRequest();
- if (readRequestRecord(request, req)) {
+ SetDataRequest req = request.readRequestRecordNoException(SetDataRequest::new);
+ if (req != null) {
path = req.getPath();
}
break;
}
case OpCode.setACL: {
- SetACLRequest req = new SetACLRequest();
- if (readRequestRecord(request, req)) {
+ SetACLRequest req = request.readRequestRecordNoException(SetACLRequest::new);
+ if (req != null) {
mustCheckACL = true;
acl = req.getAcl();
path = req.getPath();
@@ -2298,15 +2280,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
return err == KeeperException.Code.OK.intValue();
}
- private boolean readRequestRecord(Request request, Record record) {
- try {
- request.readRequestRecord(record);
- return true;
- } catch (IOException ex) {
- return false;
- }
- }
-
public int getOutstandingHandshakeNum() {
if (serverCnxnFactory instanceof NettyServerCnxnFactory) {
return ((NettyServerCnxnFactory) serverCnxnFactory).getOutstandingHandshakeNum();
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index b6eeb758a..1818bf9bb 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -804,9 +804,7 @@ public class Learner {
continue;
}
packetsCommitted.remove();
- Request request = new Request(null, p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), null, null);
- request.setTxn(p.rec);
- request.setHdr(p.hdr);
+ Request request = new Request(p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec, -1);
request.setTxnDigest(p.digest);
ozk.commitRequest(request);
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
index 259d9fff9..eea11d33a 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
@@ -43,6 +43,7 @@ import org.apache.jute.BinaryOutputArchive;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestRecord;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.TxnLogProposalIterator;
import org.apache.zookeeper.server.ZKDatabase;
@@ -702,9 +703,9 @@ public class LearnerHandler extends ZooKeeperThread {
bb = bb.slice();
Request si;
if (type == OpCode.sync) {
- si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
+ si = new LearnerSyncRequest(this, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo());
} else {
- si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
+ si = new Request(null, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo());
}
si.setOwner(this);
learnerMaster.submitLearnerRequest(si);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java
index d4c83aeab..6892d3dd8 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java
@@ -18,17 +18,17 @@
package org.apache.zookeeper.server.quorum;
-import java.nio.ByteBuffer;
import java.util.List;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestRecord;
public class LearnerSyncRequest extends Request {
LearnerHandler fh;
public LearnerSyncRequest(
- LearnerHandler fh, long sessionId, int xid, int type, ByteBuffer bb, List<Id> authInfo) {
- super(null, sessionId, xid, type, bb, authInfo);
+ LearnerHandler fh, long sessionId, int xid, int type, RequestRecord request, List<Id> authInfo) {
+ super(null, sessionId, xid, type, request, authInfo);
this.fh = fh;
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
index 2f24347b7..240936956 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
@@ -20,7 +20,6 @@ package org.apache.zookeeper.server.quorum;
import java.io.IOException;
import java.io.PrintWriter;
-import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@@ -32,10 +31,12 @@ import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.metrics.MetricsContext;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestRecord;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.txn.CreateSessionTxn;
/**
* Abstract base class for all ZooKeeperServers that participate in
@@ -76,8 +77,7 @@ public abstract class QuorumZooKeeperServer extends ZooKeeperServer {
}
if (OpCode.multi == request.type) {
- MultiOperationRecord multiTransactionRecord = new MultiOperationRecord();
- request.readRequestRecord(multiTransactionRecord);
+ MultiOperationRecord multiTransactionRecord = request.readRequestRecord(MultiOperationRecord::new);
boolean containsEphemeralCreate = false;
for (Op op : multiTransactionRecord) {
if (op.getType() == OpCode.create || op.getType() == OpCode.create2) {
@@ -93,8 +93,7 @@ public abstract class QuorumZooKeeperServer extends ZooKeeperServer {
return null;
}
} else {
- CreateRequest createRequest = new CreateRequest();
- request.readRequestRecord(createRequest);
+ CreateRequest createRequest = request.readRequestRecord(CreateRequest::new);
CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
if (!createMode.isEphemeral()) {
return null;
@@ -116,9 +115,8 @@ public abstract class QuorumZooKeeperServer extends ZooKeeperServer {
synchronized (upgradeableSessionTracker) {
if (upgradeableSessionTracker.isLocalSession(sessionId)) {
int timeout = upgradeableSessionTracker.upgradeSession(sessionId);
- ByteBuffer to = ByteBuffer.allocate(4);
- to.putInt(timeout);
- return new Request(null, sessionId, 0, OpCode.createSession, to, null);
+ CreateSessionTxn txn = new CreateSessionTxn(timeout);
+ return new Request(null, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
}
}
return null;
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/audit/Slf4JAuditLoggerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/audit/Slf4JAuditLoggerTest.java
index df3b831a3..8a700bbdf 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/audit/Slf4JAuditLoggerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/audit/Slf4JAuditLoggerTest.java
@@ -39,9 +39,9 @@ import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.audit.AuditEvent.Result;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
+import org.apache.zookeeper.server.util.AuthUtil;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
import org.apache.zookeeper.test.LoggerTestTool;
@@ -290,9 +290,7 @@ public class Slf4JAuditLoggerTest extends QuorumPeerTestBase {
private String getUser() {
ServerCnxn next = getServerCnxn();
- Request request = new Request(next, -1, -1, -1, null,
- next.getAuthInfo());
- return request.getUsersForAudit();
+ return AuthUtil.getUsers(next.getAuthInfo());
}
private String getIp() {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java
index f9fd6d8b5..0e6c1b58f 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java
@@ -19,10 +19,12 @@
package org.apache.zookeeper.server;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -36,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.DeleteContainerRequest;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.ZooDefs;
@@ -91,7 +94,7 @@ public class CreateContainerTest extends ClientBase {
Stat stat = createWithStatVerifyResult("/foo");
Stat childStat = createWithStatVerifyResult("/foo/child");
// Don't expect to get the same stats for different creates.
- assertFalse(stat.equals(childStat));
+ assertNotEquals(stat, childStat);
}
@SuppressWarnings("ConstantConditions")
@@ -224,7 +227,11 @@ public class CreateContainerTest extends ClientBase {
RequestProcessor processor = new RequestProcessor() {
@Override
public void processRequest(Request request) {
- queue.add(new String(request.readRequestBytes()));
+ try {
+ queue.add(request.readRequestRecord(DeleteContainerRequest::new).getPath());
+ } catch (IOException e) {
+ fail(e);
+ }
}
@Override
@@ -246,14 +253,13 @@ public class CreateContainerTest extends ClientBase {
containerManager.checkContainers();
return null;
});
- assertEquals(queue.poll(5, TimeUnit.SECONDS), "/one");
- assertEquals(queue.poll(5, TimeUnit.SECONDS), "/two");
- assertEquals(queue.size(), 0);
+ assertEquals("/one", queue.poll(5, TimeUnit.SECONDS));
+ assertEquals("/two", queue.poll(5, TimeUnit.SECONDS));
+ assertEquals(0, queue.size());
Thread.sleep(500);
- assertEquals(queue.size(), 0);
-
- assertEquals(queue.poll(5, TimeUnit.SECONDS), "/three");
- assertEquals(queue.poll(5, TimeUnit.SECONDS), "/four");
+ assertEquals(0, queue.size());
+ assertEquals("/three", queue.poll(5, TimeUnit.SECONDS));
+ assertEquals("/four", queue.poll(5, TimeUnit.SECONDS));
}
@Test
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/FinalRequestProcessorTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/FinalRequestProcessorTest.java
index 0181d2ebe..4b47d6cbb 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/FinalRequestProcessorTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/FinalRequestProcessorTest.java
@@ -96,7 +96,7 @@ public class FinalRequestProcessorTest {
// Arrange
// Act
- Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, bb, new ArrayList<Id>());
+ Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, RequestRecord.fromBytes(bb), new ArrayList<Id>());
processor.processRequest(r);
// Assert
@@ -109,7 +109,7 @@ public class FinalRequestProcessorTest {
testACLs.remove(2);
// Act
- Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, bb, new ArrayList<Id>());
+ Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, RequestRecord.fromBytes(bb), new ArrayList<Id>());
processor.processRequest(r);
// Assert
@@ -123,7 +123,7 @@ public class FinalRequestProcessorTest {
authInfo.add(new Id("digest", "otheruser:somesecrethash"));
// Act
- Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, bb, authInfo);
+ Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, RequestRecord.fromBytes(bb), authInfo);
processor.processRequest(r);
// Assert
@@ -137,7 +137,7 @@ public class FinalRequestProcessorTest {
authInfo.add(new Id("digest", "user:secrethash"));
// Act
- Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, bb, authInfo);
+ Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, RequestRecord.fromBytes(bb), authInfo);
processor.processRequest(r);
// Assert
@@ -151,7 +151,7 @@ public class FinalRequestProcessorTest {
authInfo.add(new Id("digest", "adminuser:adminsecret"));
// Act
- Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, bb, authInfo);
+ Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, RequestRecord.fromBytes(bb), authInfo);
processor.processRequest(r);
// Assert
@@ -167,7 +167,7 @@ public class FinalRequestProcessorTest {
authInfo.add(new Id("digest", "adminuser:adminsecret"));
// Act
- Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, bb, authInfo);
+ Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, RequestRecord.fromBytes(bb), authInfo);
processor.processRequest(r);
// Assert
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java
index 001e23635..682691010 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java
@@ -117,7 +117,7 @@ public class MultiOpSessionUpgradeTest extends QuorumBase {
GetDataRequest getDataRequest = new GetDataRequest(path, false);
getDataRequest.serialize(boa, "request");
ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
- return new Request(null, sessionId, 1, ZooDefs.OpCode.getData, bb, new ArrayList<Id>());
+ return new Request(null, sessionId, 1, ZooDefs.OpCode.getData, RequestRecord.fromBytes(bb), new ArrayList<Id>());
}
private Request makeCreateRequest(String path, long sessionId) throws IOException {
@@ -126,7 +126,7 @@ public class MultiOpSessionUpgradeTest extends QuorumBase {
CreateRequest createRequest = new CreateRequest(path, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL.toFlag());
createRequest.serialize(boa, "request");
ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
- return new Request(null, sessionId, 1, ZooDefs.OpCode.create2, bb, new ArrayList<Id>());
+ return new Request(null, sessionId, 1, ZooDefs.OpCode.create2, RequestRecord.fromBytes(bb), new ArrayList<Id>());
}
private QuorumZooKeeperServer getConnectedServer(long sessionId) {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java
index 062cae1f9..bfb0db5f3 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java
@@ -29,15 +29,12 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
@@ -94,12 +91,7 @@ public class PrepRequestProcessorMetricsTest extends ZKTestCase {
}
private Request createRequest(Record record, int opCode) throws IOException {
- // encoding
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
- record.serialize(boa, "request");
- baos.close();
- return new Request(null, 1L, 0, opCode, ByteBuffer.wrap(baos.toByteArray()), null);
+ return new Request(null, 1L, 0, opCode, RequestRecord.fromRecord(record), null);
}
private Request createRequest(String path, int opCode) throws IOException {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java
index 9e7120569..d80c5e08e 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java
@@ -24,11 +24,8 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import java.io.ByteArrayOutputStream;
import java.io.File;
-import java.io.IOException;
import java.io.PrintWriter;
-import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -37,7 +34,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -66,13 +62,10 @@ import org.apache.zookeeper.txn.ErrorTxn;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class PrepRequestProcessorTest extends ClientBase {
- private static final Logger LOG = LoggerFactory.getLogger(PrepRequestProcessorTest.class);
private static final int CONNECTION_TIMEOUT = 3000;
private static String HOSTPORT = "127.0.0.1:" + PortAssignment.unique();
private CountDownLatch pLatch;
@@ -120,34 +113,28 @@ public class PrepRequestProcessorTest extends ClientBase {
public void testPRequest() throws Exception {
pLatch = new CountDownLatch(1);
processor = new PrepRequestProcessor(zks, new MyRequestProcessor());
- Request foo = new Request(null, 1L, 1, OpCode.create, ByteBuffer.allocate(3), null);
+ Request foo = new Request(null, 1L, 1, OpCode.create, RequestRecord.fromBytes(new byte[3]), null);
processor.pRequest(foo);
assertEquals(new ErrorTxn(KeeperException.Code.MARSHALLINGERROR.intValue()), outcome.getTxn(), "Request should have marshalling error");
assertTrue(pLatch.await(5, TimeUnit.SECONDS), "request hasn't been processed in chain");
}
- private Request createRequest(Record record, int opCode) throws IOException {
+ private Request createRequest(Record record, int opCode) {
return createRequest(record, opCode, 1L);
}
- private Request createRequest(Record record, int opCode, long sessionId) throws IOException {
+ private Request createRequest(Record record, int opCode, long sessionId) {
return createRequest(record, opCode, sessionId, false);
}
- private Request createRequest(Record record, int opCode, boolean admin) throws IOException {
+ private Request createRequest(Record record, int opCode, boolean admin) {
return createRequest(record, opCode, 1L, admin);
}
- private Request createRequest(Record record, int opCode, long sessionId, boolean admin) throws IOException {
- // encoding
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
- record.serialize(boa, "request");
- baos.close();
- // Id
- List<Id> ids = Arrays.asList(admin ? new Id("super", "super user") : Ids.ANYONE_ID_UNSAFE);
- return new Request(null, sessionId, 0, opCode, ByteBuffer.wrap(baos.toByteArray()), ids);
+ private Request createRequest(Record record, int opCode, long sessionId, boolean admin) {
+ List<Id> ids = Collections.singletonList(admin ? new Id("super", "super user") : Ids.ANYONE_ID_UNSAFE);
+ return new Request(null, sessionId, 0, opCode, RequestRecord.fromRecord(record), ids);
}
private void process(List<Op> ops) throws Exception {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperCriticalThreadMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperCriticalThreadMetricsTest.java
index 16c7a7dd1..681e8348b 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperCriticalThreadMetricsTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperCriticalThreadMetricsTest.java
@@ -19,7 +19,6 @@
package org.apache.zookeeper.server;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.ZKTestCase;
@@ -67,7 +66,7 @@ public class ZooKeeperCriticalThreadMetricsTest extends ZKTestCase {
PrepRequestProcessor processor = new MyPrepRequestProcessor();
processor.start();
- processor.processRequest(new Request(null, 1L, 1, ZooDefs.OpCode.setData, ByteBuffer.wrap(new byte[10]), null));
+ processor.processRequest(new Request(null, 1L, 1, ZooDefs.OpCode.setData, RequestRecord.fromBytes(new byte[10]), null));
processed.await();
processor.shutdown();
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
index 1050a474b..e5f668d1f 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
@@ -43,6 +43,7 @@ import org.apache.zookeeper.proto.GetDataRequest;
import org.apache.zookeeper.proto.SetDataRequest;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.RequestRecord;
import org.apache.zookeeper.server.WorkerService;
import org.apache.zookeeper.server.ZooKeeperServerListener;
import org.junit.jupiter.api.AfterEach;
@@ -129,7 +130,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas);
rec.serialize(boa, "request");
ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
- return new Request(null, sessionId, xid, type, bb, new ArrayList<Id>());
+ return new Request(null, sessionId, xid, type, RequestRecord.fromBytes(bb), new ArrayList<Id>());
}
/**
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java
index 4b67f5b50..4a4598355 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java
@@ -22,7 +22,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -31,6 +30,7 @@ import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.metrics.MetricsUtils;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.RequestRecord;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.WorkerService;
import org.junit.jupiter.api.AfterEach;
@@ -192,11 +192,11 @@ public class CommitProcessorMetricsTest extends ZKTestCase {
}
private Request createReadRequest(long sessionId, int xid) {
- return new Request(null, sessionId, xid, ZooDefs.OpCode.getData, ByteBuffer.wrap(new byte[10]), null);
+ return new Request(null, sessionId, xid, ZooDefs.OpCode.getData, RequestRecord.fromBytes(new byte[10]), null);
}
private Request createWriteRequest(long sessionId, int xid) {
- return new Request(null, sessionId, xid, ZooDefs.OpCode.setData, ByteBuffer.wrap(new byte[10]), null);
+ return new Request(null, sessionId, xid, ZooDefs.OpCode.setData, RequestRecord.fromBytes(new byte[10]), null);
}
private void processRequestWithWait(Request request) throws Exception {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java
index 46958d194..46a4c3874 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java
@@ -42,6 +42,7 @@ import org.apache.zookeeper.server.FinalRequestProcessor;
import org.apache.zookeeper.server.PrepRequestProcessor;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.RequestRecord;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.test.ClientBase;
import org.junit.jupiter.api.AfterEach;
@@ -160,7 +161,7 @@ public class CommitProcessorTest extends ZKTestCase {
+ (++nodeId), new byte[0], Ids.OPEN_ACL_UNSAFE, 1);
createReq.serialize(boa, "request");
ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
- Request req = new Request(null, sessionId, ++cxid, OpCode.create, bb, new ArrayList<Id>());
+ Request req = new Request(null, sessionId, ++cxid, OpCode.create, RequestRecord.fromBytes(bb), new ArrayList<Id>());
zks.getFirstProcessor().processRequest(req);
}
@@ -174,7 +175,7 @@ public class CommitProcessorTest extends ZKTestCase {
+ nodeId, false);
getDataRequest.serialize(boa, "request");
ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
- Request req = new Request(null, sessionId, ++cxid, OpCode.getData, bb, new ArrayList<Id>());
+ Request req = new Request(null, sessionId, ++cxid, OpCode.getData, RequestRecord.fromBytes(bb), new ArrayList<Id>());
zks.getFirstProcessor().processRequest(req);
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java
index ef1f12194..5216eb703 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java
@@ -23,7 +23,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.net.SocketException;
-import java.nio.ByteBuffer;
import javax.security.sasl.SaslException;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZooDefs;
@@ -32,11 +31,13 @@ import org.apache.zookeeper.server.FinalRequestProcessor;
import org.apache.zookeeper.server.PrepRequestProcessor;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.RequestRecord;
import org.apache.zookeeper.server.SyncRequestProcessor;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.txn.DeleteTxn;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -220,7 +221,8 @@ public class RaceConditionTest extends QuorumPeerTestBase {
* Add a request so that something is there for SyncRequestProcessor
* to process, while we are in shutdown flow
*/
- Request request = new Request(null, 0, 0, ZooDefs.OpCode.delete, ByteBuffer.wrap("/deadLockIssue".getBytes()), null);
+ DeleteTxn deleteTxn = new DeleteTxn("/deadLockIssue");
+ Request request = new Request(null, 0, 0, ZooDefs.OpCode.delete, RequestRecord.fromRecord(deleteTxn), null);
processRequest(request);
super.shutdown();
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java
index 8803a73cb..3d60c8b91 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java
@@ -40,6 +40,7 @@ import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestRecord;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.test.ClientBase;
import org.junit.jupiter.api.AfterEach;
@@ -317,8 +318,7 @@ public class SessionUpgradeQuorumTest extends QuorumPeerTestBase {
}
if (request.type == ZooDefs.OpCode.create && request.cnxn != null) {
- CreateRequest createRequest = new CreateRequest();
- request.readRequestRecord(createRequest);
+ CreateRequest createRequest = request.readRequestRecord(CreateRequest::new);
try {
CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
if (createMode.isEphemeral()) {
@@ -355,7 +355,7 @@ public class SessionUpgradeQuorumTest extends QuorumPeerTestBase {
CreateRequest createRequest = new CreateRequest(path, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL.toFlag());
createRequest.serialize(boa, "request");
ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
- return new Request(null, sessionId, 1, ZooDefs.OpCode.create2, bb, new ArrayList<Id>());
+ return new Request(null, sessionId, 1, ZooDefs.OpCode.create2, RequestRecord.fromBytes(bb), new ArrayList<Id>());
}
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncRequestProcessorMetricTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncRequestProcessorMetricTest.java
index 72bceafa3..17ccdae6e 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncRequestProcessorMetricTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncRequestProcessorMetricTest.java
@@ -26,7 +26,6 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -34,6 +33,7 @@ import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.metrics.MetricsUtils;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.RequestRecord;
import org.apache.zookeeper.server.SyncRequestProcessor;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
@@ -65,7 +65,7 @@ public class SyncRequestProcessorMetricTest {
}
private Request createRquest(long sessionId, int xid) {
- return new Request(null, sessionId, xid, ZooDefs.OpCode.setData, ByteBuffer.wrap(new byte[10]), null);
+ return new Request(null, sessionId, xid, ZooDefs.OpCode.setData, RequestRecord.fromBytes(new byte[10]), null);
}
@Test
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/LeaderSessionTrackerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/LeaderSessionTrackerTest.java
index 99cd171c0..a619866d9 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/LeaderSessionTrackerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/LeaderSessionTrackerTest.java
@@ -33,6 +33,7 @@ import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestRecord;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -100,7 +101,7 @@ public class LeaderSessionTrackerTest extends ZKTestCase {
LOG.info("Fake session Id: {}", Long.toHexString(fakeSessionId));
- Request request = new Request(null, fakeSessionId, 0, OpCode.create, bb, new ArrayList<Id>());
+ Request request = new Request(null, fakeSessionId, 0, OpCode.create, RequestRecord.fromBytes(bb), new ArrayList<Id>());
// Submit request directly to leader
leader.getActiveServer().submitRequest(request);
@@ -138,7 +139,7 @@ public class LeaderSessionTrackerTest extends ZKTestCase {
LOG.info("Local session Id: {}", Long.toHexString(locallSession));
- Request request = new Request(null, locallSession, 0, OpCode.create, bb, new ArrayList<Id>());
+ Request request = new Request(null, locallSession, 0, OpCode.create, RequestRecord.fromBytes(bb), new ArrayList<Id>());
// Submit request directly to leader
leader.getActiveServer().submitRequest(request);