summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authortison <wander4096@gmail.com>2022-07-12 13:03:07 +0200
committerMate Szalay-Beko <symat@apache.com>2022-07-12 13:03:07 +0200
commita7e4dea7abccff018d123d54dd5d3ccc1544484e (patch)
tree0513ab42a50da1974057a267d352eba56ef908d5
parentde7c5869d372e46af43979134d0e30b49d2319b1 (diff)
downloadzookeeper-a7e4dea7abccff018d123d54dd5d3ccc1544484e.tar.gz
ZOOKEEPER-4573: Encapsulate request bytebuffer in Request
This patch is based on #1903. This closes #1903. Author: tison <wander4096@gmail.com> Reviewers: Andor Molnar <andor@apache.org>, Mate Szalay-Beko <symat@apache.org> Closes #1904 from tisonkun/encapsulate-request-bytebuffer
-rw-r--r--zookeeper-jute/src/main/java/org/apache/jute/BinaryInputArchive.java18
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java2
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java4
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferInputStream.java7
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferOutputStream.java12
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java45
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java7
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java5
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java42
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java25
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java49
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java10
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java9
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java14
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerCreationTest.java8
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerTest.java22
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServerTest.java36
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java7
18 files changed, 156 insertions, 166 deletions
diff --git a/zookeeper-jute/src/main/java/org/apache/jute/BinaryInputArchive.java b/zookeeper-jute/src/main/java/org/apache/jute/BinaryInputArchive.java
index c4d1bbebf..ae1310af0 100644
--- a/zookeeper-jute/src/main/java/org/apache/jute/BinaryInputArchive.java
+++ b/zookeeper-jute/src/main/java/org/apache/jute/BinaryInputArchive.java
@@ -47,27 +47,27 @@ public class BinaryInputArchive implements InputArchive {
}
}
- private DataInput in;
- private int maxBufferSize;
- private int extraMaxBufferSize;
+ private final DataInput in;
+ private final int maxBufferSize;
+ private final int extraMaxBufferSize;
- public static BinaryInputArchive getArchive(InputStream strm) {
- return new BinaryInputArchive(new DataInputStream(strm));
+ public static BinaryInputArchive getArchive(InputStream stream) {
+ return new BinaryInputArchive(new DataInputStream(stream));
}
private static class BinaryIndex implements Index {
- private int nelems;
+ private int n;
BinaryIndex(int nelems) {
- this.nelems = nelems;
+ this.n = nelems;
}
public boolean done() {
- return (nelems <= 0);
+ return (n <= 0);
}
public void incr() {
- nelems--;
+ n--;
}
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java
index 35af4a2f1..17c0ad279 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java
@@ -141,7 +141,7 @@ abstract class ClientCnxnSocket {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = protocolManager.deserializeConnectResponse(bbia);
- if (protocolManager.isReadonlyAvailable()) {
+ if (!protocolManager.isReadonlyAvailable()) {
LOG.warn("Connected to an old server; r-o mode will be unavailable");
}
this.sessionId = conRsp.getSessionId();
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java b/zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java
index b98c42d25..5aca1711f 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java
@@ -32,7 +32,6 @@ import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.proto.DeleteRequest;
import org.apache.zookeeper.proto.SetACLRequest;
import org.apache.zookeeper.proto.SetDataRequest;
-import org.apache.zookeeper.server.ByteBufferInputStream;
import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
import org.apache.zookeeper.server.Request;
import org.slf4j.Logger;
@@ -127,8 +126,7 @@ public final class AuditHelper {
}
private static void deserialize(Request request, Record record) throws IOException {
- request.request.rewind();
- ByteBufferInputStream.byteBuffer2Record(request.request.slice(), record);
+ request.readRequestRecord(record);
}
private static Result getResult(ProcessTxnResult rc, boolean failedTxn) {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferInputStream.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferInputStream.java
index 9a93abd61..8fe8b6fc9 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferInputStream.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferInputStream.java
@@ -21,12 +21,13 @@ package org.apache.zookeeper.server;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import javax.annotation.Nonnull;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.Record;
public class ByteBufferInputStream extends InputStream {
- ByteBuffer bb;
+ private final ByteBuffer bb;
public ByteBufferInputStream(ByteBuffer bb) {
this.bb = bb;
@@ -46,7 +47,7 @@ public class ByteBufferInputStream extends InputStream {
}
@Override
- public int read(byte[] b, int off, int len) throws IOException {
+ public int read(@Nonnull byte[] b, int off, int len) throws IOException {
if (bb.remaining() == 0) {
return -1;
}
@@ -58,7 +59,7 @@ public class ByteBufferInputStream extends InputStream {
}
@Override
- public int read(byte[] b) throws IOException {
+ public int read(@Nonnull byte[] b) throws IOException {
return read(b, 0, b.length);
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferOutputStream.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferOutputStream.java
index 2531cbaff..35a528cdb 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferOutputStream.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferOutputStream.java
@@ -21,27 +21,33 @@ package org.apache.zookeeper.server;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import javax.annotation.Nonnull;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;
public class ByteBufferOutputStream extends OutputStream {
- ByteBuffer bb;
+ private final ByteBuffer bb;
+
public ByteBufferOutputStream(ByteBuffer bb) {
this.bb = bb;
}
+
@Override
public void write(int b) throws IOException {
bb.put((byte) b);
}
+
@Override
- public void write(byte[] b) throws IOException {
+ public void write(@Nonnull byte[] b) throws IOException {
bb.put(b);
}
+
@Override
- public void write(byte[] b, int off, int len) throws IOException {
+ public void write(@Nonnull byte[] b, int off, int len) throws IOException {
bb.put(b, off, len);
}
+
public static void record2ByteBuffer(Record record, ByteBuffer bb) throws IOException {
BinaryOutputArchive oa;
oa = BinaryOutputArchive.getArchive(new ByteBufferOutputStream(bb));
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
index e73f85d7c..bc5b019f8 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
@@ -20,7 +20,6 @@ package org.apache.zookeeper.server;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -271,7 +270,7 @@ public class FinalRequestProcessor implements RequestProcessor {
case OpCode.multiRead: {
lastOp = "MLTR";
MultiOperationRecord multiReadRecord = new MultiOperationRecord();
- ByteBufferInputStream.byteBuffer2Record(request.request, multiReadRecord);
+ request.readRequestRecord(multiReadRecord);
rsp = new MultiResponse();
OpResult subResult;
for (Op readOp : multiReadRecord) {
@@ -350,7 +349,7 @@ public class FinalRequestProcessor implements RequestProcessor {
case OpCode.sync: {
lastOp = "SYNC";
SyncRequest syncRequest = new SyncRequest();
- ByteBufferInputStream.byteBuffer2Record(request.request, syncRequest);
+ request.readRequestRecord(syncRequest);
rsp = new SyncResponse(syncRequest.getPath());
requestPathMetricsCollector.registerRequest(request.type, syncRequest.getPath());
break;
@@ -365,7 +364,7 @@ public class FinalRequestProcessor implements RequestProcessor {
lastOp = "EXIS";
// TODO we need to figure out the security requirement for this!
ExistsRequest existsRequest = new ExistsRequest();
- ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest);
+ request.readRequestRecord(existsRequest);
path = existsRequest.getPath();
if (path.indexOf('\0') != -1) {
throw new KeeperException.BadArgumentsException();
@@ -378,7 +377,7 @@ public class FinalRequestProcessor implements RequestProcessor {
case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
- ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest);
+ request.readRequestRecord(getDataRequest);
path = getDataRequest.getPath();
rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo);
requestPathMetricsCollector.registerRequest(request.type, path);
@@ -387,9 +386,7 @@ public class FinalRequestProcessor implements RequestProcessor {
case OpCode.setWatches: {
lastOp = "SETW";
SetWatches setWatches = new SetWatches();
- // TODO we really should not need this
- request.request.rewind();
- ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
+ request.readRequestRecord(setWatches);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase()
.setWatches(
@@ -405,9 +402,7 @@ public class FinalRequestProcessor implements RequestProcessor {
case OpCode.setWatches2: {
lastOp = "STW2";
SetWatches2 setWatches = new SetWatches2();
- // TODO we really should not need this
- request.request.rewind();
- ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
+ request.readRequestRecord(setWatches);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase().setWatches(relativeZxid,
setWatches.getDataWatches(),
@@ -421,8 +416,7 @@ public class FinalRequestProcessor implements RequestProcessor {
case OpCode.addWatch: {
lastOp = "ADDW";
AddWatchRequest addWatcherRequest = new AddWatchRequest();
- ByteBufferInputStream.byteBuffer2Record(request.request,
- addWatcherRequest);
+ request.readRequestRecord(addWatcherRequest);
zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());
rsp = new ErrorResponse(0);
break;
@@ -430,7 +424,7 @@ public class FinalRequestProcessor implements RequestProcessor {
case OpCode.getACL: {
lastOp = "GETA";
GetACLRequest getACLRequest = new GetACLRequest();
- ByteBufferInputStream.byteBuffer2Record(request.request, getACLRequest);
+ request.readRequestRecord(getACLRequest);
path = getACLRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
@@ -473,7 +467,7 @@ public class FinalRequestProcessor implements RequestProcessor {
case OpCode.getChildren: {
lastOp = "GETC";
GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
- ByteBufferInputStream.byteBuffer2Record(request.request, getChildrenRequest);
+ request.readRequestRecord(getChildrenRequest);
path = getChildrenRequest.getPath();
rsp = handleGetChildrenRequest(getChildrenRequest, cnxn, request.authInfo);
requestPathMetricsCollector.registerRequest(request.type, path);
@@ -482,7 +476,7 @@ public class FinalRequestProcessor implements RequestProcessor {
case OpCode.getAllChildrenNumber: {
lastOp = "GETACN";
GetAllChildrenNumberRequest getAllChildrenNumberRequest = new GetAllChildrenNumberRequest();
- ByteBufferInputStream.byteBuffer2Record(request.request, getAllChildrenNumberRequest);
+ request.readRequestRecord(getAllChildrenNumberRequest);
path = getAllChildrenNumberRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
@@ -502,7 +496,7 @@ public class FinalRequestProcessor implements RequestProcessor {
case OpCode.getChildren2: {
lastOp = "GETC";
GetChildren2Request getChildren2Request = new GetChildren2Request();
- ByteBufferInputStream.byteBuffer2Record(request.request, getChildren2Request);
+ request.readRequestRecord(getChildren2Request);
Stat stat = new Stat();
path = getChildren2Request.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
@@ -524,7 +518,7 @@ public class FinalRequestProcessor implements RequestProcessor {
case OpCode.checkWatches: {
lastOp = "CHKW";
CheckWatchesRequest checkWatches = new CheckWatchesRequest();
- ByteBufferInputStream.byteBuffer2Record(request.request, checkWatches);
+ request.readRequestRecord(checkWatches);
WatcherType type = WatcherType.fromInt(checkWatches.getType());
path = checkWatches.getPath();
boolean containsWatcher = zks.getZKDatabase().containsWatcher(path, type, cnxn);
@@ -538,7 +532,7 @@ public class FinalRequestProcessor implements RequestProcessor {
case OpCode.removeWatches: {
lastOp = "REMW";
RemoveWatchesRequest removeWatches = new RemoveWatchesRequest();
- ByteBufferInputStream.byteBuffer2Record(request.request, removeWatches);
+ request.readRequestRecord(removeWatches);
WatcherType type = WatcherType.fromInt(removeWatches.getType());
path = removeWatches.getPath();
boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn);
@@ -557,7 +551,7 @@ public class FinalRequestProcessor implements RequestProcessor {
case OpCode.getEphemerals: {
lastOp = "GETE";
GetEphemeralsRequest getEphemerals = new GetEphemeralsRequest();
- ByteBufferInputStream.byteBuffer2Record(request.request, getEphemerals);
+ request.readRequestRecord(getEphemerals);
String prefixPath = getEphemerals.getPrefixPath();
Set<String> allEphems = zks.getZKDatabase().getDataTree().getEphemerals(request.sessionId);
List<String> ephemerals = new ArrayList<>();
@@ -592,10 +586,13 @@ public class FinalRequestProcessor implements RequestProcessor {
// error to the user
LOG.error("Failed to process {}", request, e);
StringBuilder sb = new StringBuilder();
- ByteBuffer bb = request.request;
- bb.rewind();
- while (bb.hasRemaining()) {
- sb.append(String.format("%02x", (0xff & bb.get())));
+ byte[] payload = request.readRequestBytes();
+ if (payload != null) {
+ for (byte b : payload) {
+ sb.append(String.format("%02x", (0xff & b)));
+ }
+ } else {
+ sb.append("request buffer is null");
}
LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), sb);
err = Code.MARSHALLINGERROR;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
index dd62154fc..83e7491e5 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
@@ -40,6 +40,7 @@ import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.ConnectRequest;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.proto.WatcherEvent;
import org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread;
@@ -427,11 +428,13 @@ public class NIOServerCnxn extends ServerCnxn {
}
}
- private void readConnectRequest() throws IOException, InterruptedException, ClientCnxnLimitException {
+ private void readConnectRequest() throws IOException, ClientCnxnLimitException {
if (!isZKServerRunning()) {
throw new IOException("ZooKeeperServer not running");
}
- zkServer.processConnectRequest(this, incomingBuffer);
+ BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
+ ConnectRequest request = protocolManager.deserializeConnectRequest(bia);
+ zkServer.processConnectRequest(this, request);
initialized = true;
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
index 8937039bc..ae482ce2b 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -45,6 +45,7 @@ import org.apache.zookeeper.ClientCnxn;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.ConnectRequest;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.proto.WatcherEvent;
import org.apache.zookeeper.server.command.CommandExecutor;
@@ -482,7 +483,9 @@ public class NettyServerCnxn extends ServerCnxn {
zks.processPacket(this, bb);
} else {
LOG.debug("got conn req request from {}", getRemoteSocketAddress());
- zks.processConnectRequest(this, bb);
+ BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb));
+ ConnectRequest request = protocolManager.deserializeConnectRequest(bia);
+ zks.processConnectRequest(this, request);
initialized = true;
}
bb = null;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
index f83692723..9733a48ac 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
@@ -332,7 +332,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
break;
}
case OpCode.deleteContainer: {
- String path = new String(request.request.array(), UTF_8);
+ String path = new String(request.readRequestBytes(), UTF_8);
String parentPath = getParentPathAndValidate(path);
ChangeRecord nodeRecord = getRecordForPath(path);
if (nodeRecord.childCount > 0) {
@@ -360,7 +360,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
DeleteRequest deleteRequest = (DeleteRequest) record;
if (deserialize) {
- ByteBufferInputStream.byteBuffer2Record(request.request, deleteRequest);
+ request.readRequestRecord(deleteRequest);
}
String path = deleteRequest.getPath();
String parentPath = getParentPathAndValidate(path);
@@ -388,7 +388,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
SetDataRequest setDataRequest = (SetDataRequest) record;
if (deserialize) {
- ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest);
+ request.readRequestRecord(setDataRequest);
}
path = setDataRequest.getPath();
validatePath(path, request.sessionId);
@@ -560,7 +560,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
SetACLRequest setAclRequest = (SetACLRequest) record;
if (deserialize) {
- ByteBufferInputStream.byteBuffer2Record(request.request, setAclRequest);
+ request.readRequestRecord(setAclRequest);
}
path = setAclRequest.getPath();
validatePath(path, request.sessionId);
@@ -577,12 +577,11 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
addChangeRecord(nodeRecord);
break;
case OpCode.createSession:
- request.request.rewind();
- int to = request.request.getInt();
- request.setTxn(new CreateSessionTxn(to));
- request.request.rewind();
+ CreateSessionTxn createSessionTxn = new CreateSessionTxn();
+ request.readRequestRecord(createSessionTxn);
+ request.setTxn(createSessionTxn);
// only add the global session tracker but not to ZKDb
- zks.sessionTracker.trackSession(request.sessionId, to);
+ zks.sessionTracker.trackSession(request.sessionId, createSessionTxn.getTimeOut());
zks.setOwner(request.sessionId, request.getOwner());
break;
case OpCode.closeSession:
@@ -632,7 +631,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
CheckVersionRequest checkVersionRequest = (CheckVersionRequest) record;
if (deserialize) {
- ByteBufferInputStream.byteBuffer2Record(request.request, checkVersionRequest);
+ request.readRequestRecord(checkVersionRequest);
}
path = checkVersionRequest.getPath();
validatePath(path, request.sessionId);
@@ -656,7 +655,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException {
if (deserialize) {
- ByteBufferInputStream.byteBuffer2Record(request.request, record);
+ request.readRequestRecord(record);
}
int flags;
@@ -786,10 +785,8 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
/**
* This method is a helper to pRequest method
- *
- * @param request
*/
- private void pRequestHelper(Request request) throws RequestProcessorException {
+ private void pRequestHelper(Request request) {
try {
switch (request.type) {
case OpCode.createContainer:
@@ -813,7 +810,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
break;
case OpCode.reconfig:
ReconfigRequest reconfigRequest = new ReconfigRequest();
- ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);
+ request.readRequestRecord(reconfigRequest);
pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
break;
case OpCode.setACL:
@@ -827,12 +824,12 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
case OpCode.multi:
MultiOperationRecord multiRequest = new MultiOperationRecord();
try {
- ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
+ request.readRequestRecord(multiRequest);
} catch (IOException e) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi));
throw e;
}
- List<Txn> txns = new ArrayList<Txn>();
+ List<Txn> txns = new ArrayList<>();
//Each op in a multi-op must have the same zxid!
long zxid = zks.getNextZxid();
KeeperException ke = null;
@@ -947,18 +944,15 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
// log at error level as we are returning a marshalling
// error to the user
LOG.error("Failed to process {}", request, e);
-
StringBuilder sb = new StringBuilder();
- ByteBuffer bb = request.request;
- if (bb != null) {
- bb.rewind();
- while (bb.hasRemaining()) {
- sb.append(String.format("%02x", (0xff & bb.get())));
+ byte[] payload = request.readRequestBytes();
+ if (payload != null) {
+ for (byte b : payload) {
+ sb.append(String.format("%02x", (0xff & b)));
}
} else {
sb.append("request buffer is null");
}
-
LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), sb);
if (request.getHdr() != null) {
request.getHdr().setType(OpCode.error);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index 41e3d7fcd..1aee6aee2 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -19,6 +19,7 @@
package org.apache.zookeeper.server;
import static java.nio.charset.StandardCharsets.UTF_8;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.jute.Record;
@@ -78,7 +79,29 @@ public class Request {
public final int type;
- public final ByteBuffer request;
+ private final ByteBuffer request;
+
+ public void readRequestRecord(Record record) throws IOException {
+ if (request != null) {
+ request.rewind();
+ ByteBufferInputStream.byteBuffer2Record(request, record);
+ request.rewind();
+ return;
+ }
+ throw new IOException(new NullPointerException("request"));
+ }
+
+ public byte[] readRequestBytes() {
+ if (request != null) {
+ request.rewind();
+ int len = request.remaining();
+ byte[] b = new byte[len];
+ request.get(b);
+ request.rewind();
+ return b;
+ }
+ return null;
+ }
public final ServerCnxn cnxn;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 4260913b7..0303ca645 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -1375,17 +1375,13 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
@SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "the value won't change after startup")
- public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
- throws IOException, ClientCnxnLimitException {
-
- BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
- ConnectRequest connReq = cnxn.protocolManager.deserializeConnectRequest(bia);
+ public void processConnectRequest(ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException {
LOG.debug(
"Session establishment request from client {} client's lastZxid is 0x{}",
cnxn.getRemoteSocketAddress(),
- Long.toHexString(connReq.getLastZxidSeen()));
+ Long.toHexString(request.getLastZxidSeen()));
- long sessionId = connReq.getSessionId();
+ long sessionId = request.getSessionId();
int tokensNeeded = 1;
if (connThrottle.isConnectionWeightEnabled()) {
if (sessionId == 0) {
@@ -1405,22 +1401,22 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());
ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1);
- if (cnxn.protocolManager.isReadonlyAvailable()) {
+ if (!cnxn.protocolManager.isReadonlyAvailable()) {
LOG.warn(
"Connection request from old client {}; will be dropped if server is in r-o mode",
cnxn.getRemoteSocketAddress());
}
- if (!connReq.getReadOnly() && this instanceof ReadOnlyZooKeeperServer) {
+ if (!request.getReadOnly() && this instanceof ReadOnlyZooKeeperServer) {
String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress();
LOG.info(msg);
throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
}
- if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
+ if (request.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
String msg = "Refusing session request for client "
+ cnxn.getRemoteSocketAddress()
+ " as it has seen zxid 0x"
- + Long.toHexString(connReq.getLastZxidSeen())
+ + Long.toHexString(request.getLastZxidSeen())
+ " our last zxid is 0x"
+ Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
+ " client must try another server";
@@ -1428,8 +1424,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
LOG.info(msg);
throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
}
- int sessionTimeout = connReq.getTimeOut();
- byte[] passwd = connReq.getPasswd();
+ int sessionTimeout = request.getTimeOut();
+ byte[] passwd = request.getPasswd();
int minSessionTimeout = getMinSessionTimeout();
if (sessionTimeout < minSessionTimeout) {
sessionTimeout = minSessionTimeout;
@@ -1447,16 +1443,16 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
LOG.debug(
"Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
Long.toHexString(id),
- Long.toHexString(connReq.getLastZxidSeen()),
- connReq.getTimeOut(),
+ Long.toHexString(request.getLastZxidSeen()),
+ request.getTimeOut(),
cnxn.getRemoteSocketAddress());
} else {
validateSession(cnxn, sessionId);
LOG.debug(
"Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
Long.toHexString(sessionId),
- Long.toHexString(connReq.getLastZxidSeen()),
- connReq.getTimeOut(),
+ Long.toHexString(request.getLastZxidSeen()),
+ request.getTimeOut(),
cnxn.getRemoteSocketAddress());
if (serverCnxnFactory != null) {
serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
@@ -2182,7 +2178,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
case OpCode.create:
case OpCode.create2: {
CreateRequest req = new CreateRequest();
- if (buffer2Record(request.request, req)) {
+ if (readRequestRecord(request, req)) {
mustCheckACL = true;
acl = req.getAcl();
path = parentPath(req.getPath());
@@ -2191,21 +2187,21 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
case OpCode.delete: {
DeleteRequest req = new DeleteRequest();
- if (buffer2Record(request.request, req)) {
+ if (readRequestRecord(request, req)) {
path = parentPath(req.getPath());
}
break;
}
case OpCode.setData: {
SetDataRequest req = new SetDataRequest();
- if (buffer2Record(request.request, req)) {
+ if (readRequestRecord(request, req)) {
path = req.getPath();
}
break;
}
case OpCode.setACL: {
SetACLRequest req = new SetACLRequest();
- if (buffer2Record(request.request, req)) {
+ if (readRequestRecord(request, req)) {
mustCheckACL = true;
acl = req.getAcl();
path = req.getPath();
@@ -2299,16 +2295,13 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
return err == KeeperException.Code.OK.intValue();
}
- private boolean buffer2Record(ByteBuffer request, Record record) {
- boolean rv = false;
+ private boolean readRequestRecord(Request request, Record record) {
try {
- ByteBufferInputStream.byteBuffer2Record(request, record);
- request.rewind();
- rv = true;
+ request.readRequestRecord(record);
+ return true;
} catch (IOException ex) {
+ return false;
}
-
- return rv;
}
public int getOutstandingHandshakeNum() {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 594c87fb9..b6eeb758a 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -254,13 +254,9 @@ public class Learner {
oa.writeLong(request.sessionId);
oa.writeInt(request.cxid);
oa.writeInt(request.type);
- if (request.request != null) {
- request.request.rewind();
- int len = request.request.remaining();
- byte[] b = new byte[len];
- request.request.get(b);
- request.request.rewind();
- oa.write(b);
+ byte[] payload = request.readRequestBytes();
+ if (payload != null) {
+ oa.write(payload);
}
oa.close();
QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
index f27ce8274..2f24347b7 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
@@ -31,7 +31,6 @@ import org.apache.zookeeper.Op;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.metrics.MetricsContext;
import org.apache.zookeeper.proto.CreateRequest;
-import org.apache.zookeeper.server.ByteBufferInputStream;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.ZKDatabase;
@@ -78,9 +77,7 @@ public abstract class QuorumZooKeeperServer extends ZooKeeperServer {
if (OpCode.multi == request.type) {
MultiOperationRecord multiTransactionRecord = new MultiOperationRecord();
- request.request.rewind();
- ByteBufferInputStream.byteBuffer2Record(request.request, multiTransactionRecord);
- request.request.rewind();
+ request.readRequestRecord(multiTransactionRecord);
boolean containsEphemeralCreate = false;
for (Op op : multiTransactionRecord) {
if (op.getType() == OpCode.create || op.getType() == OpCode.create2) {
@@ -97,9 +94,7 @@ public abstract class QuorumZooKeeperServer extends ZooKeeperServer {
}
} else {
CreateRequest createRequest = new CreateRequest();
- request.request.rewind();
- ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
- request.request.rewind();
+ request.readRequestRecord(createRequest);
CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
if (!createMode.isEphemeral()) {
return null;
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java
index 6722473f2..f9fd6d8b5 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java
@@ -27,7 +27,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
@@ -221,11 +220,11 @@ public class CreateContainerTest extends ClientBase {
@Test
@Timeout(value = 30)
public void testMaxPerMinute() throws InterruptedException {
- final BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
+ final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
RequestProcessor processor = new RequestProcessor() {
@Override
public void processRequest(Request request) {
- queue.add(new String(request.request.array()));
+ queue.add(new String(request.readRequestBytes()));
}
@Override
@@ -243,12 +242,9 @@ public class CreateContainerTest extends ClientBase {
return Arrays.asList("/one", "/two", "/three", "/four");
}
};
- Executors.newSingleThreadExecutor().submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- containerManager.checkContainers();
- return null;
- }
+ Executors.newSingleThreadExecutor().submit(() -> {
+ containerManager.checkContainers();
+ return null;
});
assertEquals(queue.poll(5, TimeUnit.SECONDS), "/one");
assertEquals(queue.poll(5, TimeUnit.SECONDS), "/two");
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerCreationTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerCreationTest.java
index ac46b4e0f..6c22091f4 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerCreationTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerCreationTest.java
@@ -18,10 +18,7 @@
package org.apache.zookeeper.server;
-import java.io.ByteArrayOutputStream;
import java.io.File;
-import java.nio.ByteBuffer;
-import org.apache.jute.BinaryOutputArchive;
import org.apache.zookeeper.proto.ConnectRequest;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.test.ClientBase;
@@ -51,10 +48,7 @@ public class ZooKeeperServerCreationTest {
ServerCnxn cnxn = new MockServerCnxn();
ConnectRequest connReq = new ConnectRequest();
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
- connReq.serialize(boa, "connect");
- zks.processConnectRequest(cnxn, ByteBuffer.wrap(baos.toByteArray()));
+ zks.processConnectRequest(cnxn, connReq);
}
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerTest.java
index 4d41dac1a..e9f9b7db5 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerTest.java
@@ -26,12 +26,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.metrics.MetricsUtils;
+import org.apache.zookeeper.proto.ConnectRequest;
import org.apache.zookeeper.server.persistence.FileTxnLog;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.persistence.SnapStream;
@@ -150,21 +150,17 @@ public class ZooKeeperServerTest extends ZKTestCase {
final ZKDatabase zkDatabase = new ZKDatabase(mock(FileTxnSnapLog.class));
zooKeeperServer.setZKDatabase(zkDatabase);
- final ByteBuffer output = ByteBuffer.allocate(30);
- // serialize a connReq
- output.putInt(1);
- // lastZxid
- output.putLong(99L);
- output.putInt(500);
- output.putLong(123L);
- output.putInt(1);
- output.put((byte) 1);
- output.put((byte) 1);
- output.flip();
+ final ConnectRequest request = new ConnectRequest();
+ request.setProtocolVersion(1);
+ request.setLastZxidSeen(99L);
+ request.setTimeOut(500);
+ request.setSessionId(123L);
+ request.setPasswd(new byte[]{ 1 });
+ request.setReadOnly(true);
ServerCnxn.CloseRequestException e = assertThrows(
ServerCnxn.CloseRequestException.class,
- () -> zooKeeperServer.processConnectRequest(new MockServerCnxn(), output));
+ () -> zooKeeperServer.processConnectRequest(new MockServerCnxn(), request));
assertEquals(e.getReason(), ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServerTest.java
index 3ee6016f0..298b5d1d9 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServerTest.java
@@ -21,7 +21,7 @@ package org.apache.zookeeper.server.quorum;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
-import java.nio.ByteBuffer;
+import org.apache.zookeeper.proto.ConnectRequest;
import org.apache.zookeeper.server.MockServerCnxn;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ZKDatabase;
@@ -35,28 +35,26 @@ import org.junit.jupiter.api.Test;
public class ReadOnlyZooKeeperServerTest {
/**
- * test method {@link ZooKeeperServer#processConnectRequest(org.apache.zookeeper.server.ServerCnxn, java.nio.ByteBuffer)}
+ * test method {@link ZooKeeperServer#processConnectRequest(ServerCnxn, ConnectRequest)}
*/
@Test
public void testReadOnlyZookeeperServer() {
ReadOnlyZooKeeperServer readOnlyZooKeeperServer = new ReadOnlyZooKeeperServer(
- mock(FileTxnSnapLog.class), mock(QuorumPeer.class), mock(ZKDatabase.class));
-
- final ByteBuffer output = ByteBuffer.allocate(30);
- // serialize a connReq
- output.putInt(1);
- output.putLong(1L);
- output.putInt(500);
- output.putLong(123L);
- output.putInt(1);
- output.put((byte) 1);
- // set readOnly false
- output.put((byte) 0);
- output.flip();
-
- ServerCnxn.CloseRequestException e = assertThrows(ServerCnxn.CloseRequestException.class, () -> {
- readOnlyZooKeeperServer.processConnectRequest(new MockServerCnxn(), output);
- });
+ mock(FileTxnSnapLog.class),
+ mock(QuorumPeer.class),
+ mock(ZKDatabase.class));
+
+ final ConnectRequest request = new ConnectRequest();
+ request.setProtocolVersion(1);
+ request.setLastZxidSeen(99L);
+ request.setTimeOut(500);
+ request.setSessionId(123L);
+ request.setPasswd(new byte[]{ 1 });
+ request.setReadOnly(false);
+
+ ServerCnxn.CloseRequestException e = assertThrows(
+ ServerCnxn.CloseRequestException.class,
+ () -> readOnlyZooKeeperServer.processConnectRequest(new MockServerCnxn(), request));
assertEquals(e.getReason(), ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java
index 2131158d8..8803a73cb 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java
@@ -39,7 +39,6 @@ import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.proto.CreateRequest;
-import org.apache.zookeeper.server.ByteBufferInputStream;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.test.ClientBase;
@@ -319,15 +318,13 @@ public class SessionUpgradeQuorumTest extends QuorumPeerTestBase {
if (request.type == ZooDefs.OpCode.create && request.cnxn != null) {
CreateRequest createRequest = new CreateRequest();
- request.request.rewind();
- ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
- request.request.rewind();
+ request.readRequestRecord(createRequest);
try {
CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
if (createMode.isEphemeral()) {
request.cnxn.sendCloseSession();
}
- } catch (KeeperException e) {
+ } catch (KeeperException ignore) {
}
return;
}