summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorli4wang <68786536+li4wang@users.noreply.github.com>2023-01-04 07:40:32 -0800
committerGitHub <noreply@github.com>2023-01-04 16:40:32 +0100
commitb069edeb2436a051ad2b18e03fd67ae8ec23875c (patch)
tree3f2bcacab8b3b1ad49cbea15dbb93b3b73c7bd25
parent9a43bc95aa8c95808d585b7aafb375a1e438a581 (diff)
downloadzookeeper-b069edeb2436a051ad2b18e03fd67ae8ec23875c.tar.gz
ZOOKEEPER-4570: Admin server API for taking snapshot and stream out data (#1943)
Provides a snapshot command for taking snapshot and streaming out data Author: Li Wang <liwang@apple.com> Co-authored-by: Li Wang <liwang@apple.com> Co-authored-by: Enrico Olivelli <eolivelli@apache.org>
-rw-r--r--zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md51
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java36
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java12
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java52
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandOutputter.java6
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandResponse.java78
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java102
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java26
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JsonOutputter.java6
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/StreamOutputter.java52
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java17
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java10
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java4
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RateLimiter.java60
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/TakeSnapshotTest.java103
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java4
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java55
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/SnapshotDigestTest.java3
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java3
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandResponseTest.java7
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java93
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java6
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotCommandTest.java257
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogTest.java8
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java12
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/util/RateLimiterTest.java62
26 files changed, 1073 insertions, 52 deletions
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index 8e19f0418..b4c01d35d 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -1208,7 +1208,32 @@ property, when available, is noted below.
The default value is false.
-
+* *serializeLastProcessedZxid.enabled*
+ (Jave system property: **zookeeper.serializeLastProcessedZxid.enabled**)
+ **New in 3.9.0:**
+ If enabled, ZooKeeper serializes the lastProcessedZxid when snapshot and deserializes it
+ when restore. Defaults to true. Needs to be enabled for performing snapshot and restore
+ via admin server commands, as there is no snapshot file name to extract the lastProcessedZxid.
+
+ This feature is backward and forward compatible. Here are the different scenarios.
+
+ 1. Snapshot triggered by server internally
+ a. When loading old snapshot with new code, it will throw EOFException when trying to
+ read the non-exist lastProcessedZxid value, and the exception will be caught.
+ The lastProcessedZxid will be set using the snapshot file name.
+
+ b. When loading new snapshot with old code, it will finish successfully after deserializing the
+ digest value, the lastProcessedZxid at the end of snapshot file will be ignored.
+ The lastProcessedZxid will be set using the snapshot file name.
+
+ 2. Sync up between leader and follower
+ The lastProcessedZxid will not be serialized by leader and deserialized by follower
+ in both new and old code. It will be set to the lastProcessedZxid sent from leader
+ via QuorumPacket.
+
+ 3. Snapshot triggered via admin server APIs
+ The feature flag need to be enabled for the snapshot command to work.
+
<a name="sc_clusterOptions"></a>
#### Cluster Options
@@ -2087,6 +2112,20 @@ Both subsystems need to have sufficient amount of threads to achieve peak read t
#### AdminServer configuration
+**New in 3.9.0:** The following
+options are used to configure the [AdminServer](#sc_adminserver).
+
+* *admin.snapshot.enabled* :
+ (Java system property: **zookeeper.admin.snapshot.enabled**)
+ The flag for enabling the snapshot command. Defaults to false.
+ It will be enabled by default once the auth support for admin server commands
+ is available.
+
+* *admin.snapshot.intervalInMS* :
+ (Java system property: **zookeeper.admin.snapshot.intervalInMS**)
+ The time interval for rate limiting snapshot command to protect the server.
+ Defaults to 5 mins.
+
**New in 3.7.1:** The following
options are used to configure the [AdminServer](#sc_adminserver).
@@ -2620,6 +2659,16 @@ Available commands include:
Server information.
Returns multiple fields giving a brief overview of server state.
+* *snapshot/snap* :
+ Takes a snapshot of the current server in the datadir and stream out data.
+ Optional query parameter:
+ "streaming": Boolean (defaults to true if the parameter is not present)
+ Returns the following via Http headers:
+ "last_zxid": String
+ "snapshot_size": String
+ Note: this API is rate-limited (once every 5 mins by default) to protect the server
+ from being over-loaded.
+
* *stats/stat* :
Same as *server_stats* but also returns the "connections" field (see *connections*
for details).
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
index 2818e15aa..cd45a7c34 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
@@ -1745,6 +1745,42 @@ public class DataTree {
}
/**
+ * Serializes the lastProcessedZxid so we can get it from snapshot instead the snapshot file name.
+ * This is needed for performing snapshot and restore via admin server commands.
+ *
+ * @param oa the output stream to write to
+ * @return true if the lastProcessedZxid is serialized successfully, otherwise false
+ * @throws IOException if there is an I/O error
+ */
+ public boolean serializeLastProcessedZxid(final OutputArchive oa) throws IOException {
+ if (!ZooKeeperServer.isSerializeLastProcessedZxidEnabled()) {
+ return false;
+ }
+ oa.writeLong(lastProcessedZxid, "lastZxid");
+ return true;
+ }
+
+ /**
+ * Deserializes the lastProcessedZxid from the input stream and updates the lastProcessedZxid field.
+ *
+ * @param ia the input stream to read from
+ * @return true if lastProcessedZxid is deserialized successfully, otherwise false
+ * @throws IOException if there is an I/O error
+ */
+ public boolean deserializeLastProcessedZxid(final InputArchive ia) throws IOException {
+ if (!ZooKeeperServer.isSerializeLastProcessedZxidEnabled()) {
+ return false;
+ }
+ try {
+ lastProcessedZxid = ia.readLong("lastZxid");
+ } catch (final EOFException e) {
+ LOG.warn("Got EOFException while reading the last processed zxid, likely due to reading an older snapshot.");
+ return false;
+ }
+ return true;
+ }
+
+ /**
* Compares the actual tree's digest with that in the snapshot.
* Resets digestFromLoadedSnapshot after comparision.
*
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index af156542f..ef28e32e1 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -72,6 +72,8 @@ public final class ServerMetrics {
FSYNC_TIME = metricsContext.getSummary("fsynctime", DetailLevel.BASIC);
SNAPSHOT_TIME = metricsContext.getSummary("snapshottime", DetailLevel.BASIC);
+ SNAPSHOT_ERROR_COUNT = metricsContext.getCounter("snapshot_error_count");
+ SNAPSHOT_RATE_LIMITED_COUNT = metricsContext.getCounter("snapshot_rate_limited_count");
DB_INIT_TIME = metricsContext.getSummary("dbinittime", DetailLevel.BASIC);
READ_LATENCY = metricsContext.getSummary("readlatency", DetailLevel.ADVANCED);
UPDATE_LATENCY = metricsContext.getSummary("updatelatency", DetailLevel.ADVANCED);
@@ -277,6 +279,16 @@ public final class ServerMetrics {
public final Summary SNAPSHOT_TIME;
/**
+ * Snapshot error count
+ */
+ public final Counter SNAPSHOT_ERROR_COUNT;
+
+ /**
+ * Snapshot rate limited count
+ */
+ public final Counter SNAPSHOT_RATE_LIMITED_COUNT;
+
+ /**
* Db init time (snapshot loading + txnlog replay)
*/
public final Summary DB_INIT_TIME;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index f6c2b93eb..d61f269c9 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -120,6 +120,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled";
private static boolean digestEnabled;
+ public static final String ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED = "zookeeper.serializeLastProcessedZxid.enabled";
+ private static boolean serializeLastProcessedZxidEnabled;
+
// Add a enable/disable option for now, we should remove this one when
// this feature is confirmed to be stable
public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled";
@@ -153,6 +156,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
closeSessionTxnEnabled = Boolean.parseBoolean(
System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true"));
LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled);
+
+ setSerializeLastProcessedZxidEnabled(Boolean.parseBoolean(
+ System.getProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true")));
}
// @VisibleForTesting
@@ -535,23 +541,46 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
takeSnapshot();
}
- public void takeSnapshot() {
+ public void takeSnapshot() throws IOException {
takeSnapshot(false);
}
- public void takeSnapshot(boolean syncSnap) {
+ public void takeSnapshot(boolean syncSnap) throws IOException {
+ takeSnapshot(syncSnap, true, false);
+ }
+
+ /**
+ * Takes a snapshot on the server.
+ *
+ * @param syncSnap syncSnap sync the snapshot immediately after write
+ * @param isSevere if true system exist, otherwise throw IOException
+ * @param fastForwardFromEdits whether fast forward database to the latest recorded transactions
+ *
+ * @return file snapshot file object
+ * @throws IOException
+ */
+ public synchronized File takeSnapshot(boolean syncSnap, boolean isSevere, boolean fastForwardFromEdits) throws IOException {
long start = Time.currentElapsedTime();
+ File snapFile = null;
try {
- txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
+ if (fastForwardFromEdits) {
+ zkDb.fastForwardDataBase();
+ }
+ snapFile = txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
} catch (IOException e) {
- LOG.error("Severe unrecoverable error, exiting", e);
- // This is a severe error that we cannot recover from,
- // so we need to exit
- ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue());
+ if (isSevere) {
+ LOG.error("Severe unrecoverable error, exiting", e);
+ // This is a severe error that we cannot recover from,
+ // so we need to exit
+ ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue());
+ } else {
+ throw e;
+ }
}
long elapsed = Time.currentElapsedTime() - start;
LOG.info("Snapshot taken in {} ms", elapsed);
ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed);
+ return snapFile;
}
public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() {
@@ -2139,6 +2168,15 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
ZooKeeperServer.digestEnabled = digestEnabled;
}
+ public static boolean isSerializeLastProcessedZxidEnabled() {
+ return serializeLastProcessedZxidEnabled;
+ }
+
+ public static void setSerializeLastProcessedZxidEnabled(boolean serializeLastZxidEnabled) {
+ serializeLastProcessedZxidEnabled = serializeLastZxidEnabled;
+ LOG.info("{} = {}", ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, serializeLastZxidEnabled);
+ }
+
/**
* Trim a path to get the immediate predecessor.
*
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandOutputter.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandOutputter.java
index a8fe8bdc0..b9a87830e 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandOutputter.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandOutputter.java
@@ -18,6 +18,7 @@
package org.apache.zookeeper.server.admin;
+import java.io.OutputStream;
import java.io.PrintWriter;
/**
@@ -31,6 +32,9 @@ public interface CommandOutputter {
/** The MIME type of this output (e.g., "application/json") */
String getContentType();
- void output(CommandResponse response, PrintWriter pw);
+ /** Print out data as output */
+ default void output(CommandResponse response, PrintWriter pw) {}
+ /** Stream out data as output */
+ default void output(final CommandResponse response, final OutputStream os) {}
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandResponse.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandResponse.java
index d9e7239a1..f47b4e942 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandResponse.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandResponse.java
@@ -17,8 +17,11 @@
package org.apache.zookeeper.server.admin;
+import java.io.InputStream;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
+import javax.servlet.http.HttpServletResponse;
/**
* A response from running a {@link Command}.
@@ -37,6 +40,9 @@ public class CommandResponse {
private final String command;
private final String error;
private final Map<String, Object> data;
+ private final Map<String, String> headers;
+ private int statusCode;
+ private InputStream inputStream;
/**
* Creates a new response with no error string.
@@ -44,18 +50,35 @@ public class CommandResponse {
* @param command command name
*/
public CommandResponse(String command) {
- this(command, null);
+ this(command, null, HttpServletResponse.SC_OK);
}
/**
* Creates a new response.
*
* @param command command name
* @param error error string (may be null)
+ * @param statusCode http status code
*/
- public CommandResponse(String command, String error) {
+ public CommandResponse(String command, String error, int statusCode) {
+ this(command, error, statusCode, null);
+ }
+
+
+ /**
+ * Creates a new response.
+ *
+ * @param command command name
+ * @param error error string (may be null)
+ * @param statusCode http status code
+ * @param inputStream inputStream to send out data (may be null)
+ */
+ public CommandResponse(final String command, final String error, final int statusCode, final InputStream inputStream) {
this.command = command;
this.error = error;
data = new LinkedHashMap<String, Object>();
+ headers = new HashMap<>();
+ this.statusCode = statusCode;
+ this.inputStream = inputStream;
}
/**
@@ -77,6 +100,38 @@ public class CommandResponse {
}
/**
+ * Gets the http status code
+ *
+ * @return http status code
+ */
+ public int getStatusCode() {
+ return statusCode;
+ }
+
+ /**
+ * Sets the http status code
+ */
+ public void setStatusCode(int statusCode) {
+ this.statusCode = statusCode;
+ }
+
+ /**
+ * Gets the InputStream (may be null).
+ *
+ * @return InputStream
+ */
+ public InputStream getInputStream() {
+ return inputStream;
+ }
+
+ /**
+ * Sets the InputStream
+ */
+ public void setInputStream(final InputStream inputStream) {
+ this.inputStream = inputStream;
+ }
+
+ /**
* Adds a key/value pair to this response.
*
* @param key key
@@ -97,6 +152,25 @@ public class CommandResponse {
}
/**
+ * Adds a header to this response.
+ *
+ * @param name name of the header
+ * @param value value of the header
+ */
+ public void addHeader(final String name, final String value) {
+ headers.put(name, value);
+ }
+
+ /**
+ * Returns all headers
+ *
+ * @return map representation of all headers
+ */
+ public Map<String, String> getHeaders() {
+ return headers;
+ }
+
+ /**
* Converts this response to a map. The returned map is mutable, and
* changes to it do not reflect back into this response.
*
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
index 236c7ec24..848583a0f 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
@@ -18,8 +18,11 @@
package org.apache.zookeeper.server.admin;
+import static org.apache.zookeeper.server.persistence.FileSnap.SNAPSHOT_FILE_PREFIX;
import com.fasterxml.jackson.annotation.JsonProperty;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.File;
+import java.io.FileInputStream;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
@@ -31,7 +34,9 @@ import java.util.Properties;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import javax.servlet.http.HttpServletResponse;
import org.apache.zookeeper.Environment;
import org.apache.zookeeper.Environment.Entry;
import org.apache.zookeeper.Version;
@@ -41,6 +46,7 @@ import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.server.persistence.SnapshotInfo;
+import org.apache.zookeeper.server.persistence.Util;
import org.apache.zookeeper.server.quorum.Follower;
import org.apache.zookeeper.server.quorum.FollowerZooKeeperServer;
import org.apache.zookeeper.server.quorum.Leader;
@@ -51,7 +57,9 @@ import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer;
import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.apache.zookeeper.server.util.RateLimiter;
import org.apache.zookeeper.server.util.ZxidUtils;
+import org.eclipse.jetty.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -104,10 +112,12 @@ public class Commands {
Map<String, String> kwargs) {
Command command = getCommand(cmdName);
if (command == null) {
- return new CommandResponse(cmdName, "Unknown command: " + cmdName);
+ // set the status code to 200 to keep the current behavior of existing commands
+ return new CommandResponse(cmdName, "Unknown command: " + cmdName, HttpServletResponse.SC_OK);
}
if (command.isServerRequired() && (zkServer == null || !zkServer.isRunning())) {
- return new CommandResponse(cmdName, "This ZooKeeper instance is not currently serving requests");
+ // set the status code to 200 to keep the current behavior of existing commands
+ return new CommandResponse(cmdName, "This ZooKeeper instance is not currently serving requests", HttpServletResponse.SC_OK);
}
return command.run(zkServer, kwargs);
}
@@ -144,6 +154,7 @@ public class Commands {
registerCommand(new ObserverCnxnStatResetCommand());
registerCommand(new RuokCommand());
registerCommand(new SetTraceMaskCommand());
+ registerCommand(new SnapshotCommand());
registerCommand(new SrvrCommand());
registerCommand(new StatCommand());
registerCommand(new StatResetCommand());
@@ -531,6 +542,93 @@ public class Commands {
}
/**
+ * Take a snapshot of current server and stream out the data.
+ *
+ * Argument:
+ * - "streaming": optional String to indicate whether streaming out data
+ *
+ * Returned snapshot as stream if streaming is true and metadata of the snapshot
+ * - "last_zxid": String
+ * - "snapshot_size": String
+ */
+ public static class SnapshotCommand extends CommandBase {
+ static final String REQUEST_QUERY_PARAM_STREAMING = "streaming";
+
+ static final String RESPONSE_HEADER_LAST_ZXID = "last_zxid";
+ static final String RESPONSE_HEADER_SNAPSHOT_SIZE = "snapshot_size";
+
+ static final String ADMIN_SNAPSHOT_ENABLED = "zookeeper.admin.snapshot.enabled";
+ static final String ADMIN_SNAPSHOT_INTERVAL = "zookeeper.admin.snapshot.intervalInMS";
+
+ private static final long snapshotInterval = Integer.parseInt(System.getProperty(ADMIN_SNAPSHOT_INTERVAL, "300000"));
+
+ private final RateLimiter rateLimiter;
+
+ public SnapshotCommand() {
+ super(Arrays.asList("snapshot", "snap"));
+ rateLimiter = new RateLimiter(1, snapshotInterval, TimeUnit.MICROSECONDS);
+ }
+
+ @SuppressFBWarnings(value = "OBL_UNSATISFIED_OBLIGATION",
+ justification = "FileInputStream is passed to CommandResponse and closed in StreamOutputter")
+ @Override
+ public CommandResponse run(final ZooKeeperServer zkServer, final Map<String, String> kwargs) {
+ final CommandResponse response = initializeResponse();
+
+ // check feature flag
+ final boolean snapshotEnabled = Boolean.parseBoolean(System.getProperty(ADMIN_SNAPSHOT_ENABLED, "false"));
+ if (!snapshotEnabled) {
+ response.setStatusCode(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
+ LOG.warn("Snapshot command is disabled");
+ return response;
+ }
+
+ if (!zkServer.isSerializeLastProcessedZxidEnabled()) {
+ response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ LOG.warn("Snapshot command requires serializeLastProcessedZxidEnable flag is set to true");
+ return response;
+ }
+
+ // check rate limiting
+ if (!rateLimiter.allow()) {
+ response.setStatusCode(HttpStatus.TOO_MANY_REQUESTS_429);
+ ServerMetrics.getMetrics().SNAPSHOT_RATE_LIMITED_COUNT.add(1);
+ LOG.warn("Snapshot request was rate limited");
+ return response;
+ }
+
+ // check the streaming query param
+ boolean streaming = true;
+ if (kwargs.containsKey(REQUEST_QUERY_PARAM_STREAMING)) {
+ streaming = Boolean.parseBoolean(kwargs.get(REQUEST_QUERY_PARAM_STREAMING));
+ }
+
+ // take snapshot and stream out data if needed
+ try {
+ final File snapshotFile = zkServer.takeSnapshot(false, false, true);
+ final long lastZxid = Util.getZxidFromName(snapshotFile.getName(), SNAPSHOT_FILE_PREFIX);
+ response.addHeader(RESPONSE_HEADER_LAST_ZXID, "0x" + ZxidUtils.zxidToString(lastZxid));
+
+ final long size = snapshotFile.length();
+ response.addHeader(RESPONSE_HEADER_SNAPSHOT_SIZE, String.valueOf(size));
+
+ if (size == 0) {
+ response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ ServerMetrics.getMetrics().SNAPSHOT_ERROR_COUNT.add(1);
+ LOG.warn("Snapshot file {} is empty", snapshotFile);
+ } else if (streaming) {
+ response.setInputStream(new FileInputStream(snapshotFile));
+ }
+ } catch (final Exception e) {
+ response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ ServerMetrics.getMetrics().SNAPSHOT_ERROR_COUNT.add(1);
+ LOG.warn("Exception occurred when taking the snapshot via the snapshot admin command", e);
+ }
+ return response;
+ }
+ }
+
+ /**
* Server information. Returned map contains:
* - "version": String
* version of server
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java
index 99241dcbd..3c82f8552 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java
@@ -33,6 +33,7 @@ import org.apache.zookeeper.common.QuorumX509Util;
import org.apache.zookeeper.common.SecretUtils;
import org.apache.zookeeper.common.X509Util;
import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.auth.IPAuthenticationProvider;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.security.ConstraintMapping;
import org.eclipse.jetty.security.ConstraintSecurityHandler;
@@ -259,15 +260,26 @@ public class JettyAdminServer implements AdminServer {
}
// Run the command
- CommandResponse cmdResponse = Commands.runCommand(cmd, zkServer, kwargs);
+ final CommandResponse cmdResponse = Commands.runCommand(cmd, zkServer, kwargs);
+ response.setStatus(cmdResponse.getStatusCode());
- // Format and print the output of the command
- CommandOutputter outputter = new JsonOutputter();
- response.setStatus(HttpServletResponse.SC_OK);
- response.setContentType(outputter.getContentType());
- outputter.output(cmdResponse, response.getWriter());
+ final Map<String, String> headers = cmdResponse.getHeaders();
+ for (final Map.Entry<String, String> header : headers.entrySet()) {
+ response.addHeader(header.getKey(), header.getValue());
+ }
+ final String clientIP = IPAuthenticationProvider.getClientIPAddress(request);
+ if (cmdResponse.getInputStream() == null) {
+ // Format and print the output of the command
+ CommandOutputter outputter = new JsonOutputter(clientIP);
+ response.setContentType(outputter.getContentType());
+ outputter.output(cmdResponse, response.getWriter());
+ } else {
+ // Stream out the output of the command
+ CommandOutputter outputter = new StreamOutputter(clientIP);
+ response.setContentType(outputter.getContentType());
+ outputter.output(cmdResponse, response.getOutputStream());
+ }
}
-
}
/**
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JsonOutputter.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JsonOutputter.java
index 7d9457453..44c88de9d 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JsonOutputter.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JsonOutputter.java
@@ -35,12 +35,14 @@ public class JsonOutputter implements CommandOutputter {
public static final String ERROR_RESPONSE = "{\"error\": \"Exception writing command response to JSON\"}";
private ObjectMapper mapper;
+ private final String clientIP;
- public JsonOutputter() {
+ public JsonOutputter(final String clientIP) {
mapper = new ObjectMapper();
mapper.configure(SerializationFeature.WRITE_ENUMS_USING_TO_STRING, true);
mapper.configure(SerializationFeature.INDENT_OUTPUT, true);
mapper.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
+ this.clientIP = clientIP;
}
@Override
@@ -59,7 +61,7 @@ public class JsonOutputter implements CommandOutputter {
LOG.warn("Exception writing command response to JSON:", e);
pw.write(ERROR_RESPONSE);
} catch (IOException e) {
- LOG.warn("Exception writing command response to JSON:", e);
+ LOG.warn("Exception writing command response as JSON to {}", clientIP, e);
pw.write(ERROR_RESPONSE);
}
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/StreamOutputter.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/StreamOutputter.java
new file mode 100644
index 000000000..e8f68e649
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/StreamOutputter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.admin;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.zookeeper.common.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A class for streaming data out.
+ */
+public class StreamOutputter implements CommandOutputter{
+ private static final Logger LOG = LoggerFactory.getLogger(StreamOutputter.class);
+ private final String clientIP;
+
+ public StreamOutputter(final String clientIP) {
+ this.clientIP = clientIP;
+ }
+
+ @Override
+ public String getContentType() {
+ return "application/octet-stream";
+ }
+
+ @Override
+ public void output(final CommandResponse response, final OutputStream os) {
+ try (final InputStream is = response.getInputStream()){
+ IOUtils.copyBytes(is, os, 1024, true);
+ } catch (final IOException e) {
+ LOG.error("Exception occurred when streaming out data to {}", clientIP, e);
+ }
+ }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java
index b93e55a32..9f6fb4005 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java
@@ -18,11 +18,14 @@
package org.apache.zookeeper.server.auth;
+import java.util.StringTokenizer;
+import javax.servlet.http.HttpServletRequest;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.server.ServerCnxn;
public class IPAuthenticationProvider implements AuthenticationProvider {
+ private static final String X_FORWARDED_FOR_HEADER_NAME = "X-Forwarded-For";
public String getScheme() {
return "ip";
@@ -128,4 +131,18 @@ public class IPAuthenticationProvider implements AuthenticationProvider {
return true;
}
+ /**
+ * Returns the HTTP(s) client IP address
+ * @param request HttpServletRequest
+ * @return IP address
+ */
+ public static String getClientIPAddress(final HttpServletRequest request) {
+ // to handle the case that a HTTP(s) client connects via a proxy or load balancer
+ final String xForwardedForHeader = request.getHeader(X_FORWARDED_FOR_HEADER_NAME);
+ if (xForwardedForHeader == null) {
+ return request.getRemoteAddr();
+ }
+ // the format of the field is: X-Forwarded-For: client, proxy1, proxy2 ...
+ return new StringTokenizer(xForwardedForHeader, ",").nextToken().trim();
+ }
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
index df8a59c1a..1a91d1c30 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
@@ -99,6 +99,11 @@ public class FileSnap implements SnapShot {
SnapStream.checkSealIntegrity(snapIS, ia);
}
+ // deserialize the last processed zxid and check the intact
+ if (dt.deserializeLastProcessedZxid(ia)) {
+ SnapStream.checkSealIntegrity(snapIS, ia);
+ }
+
foundValid = true;
break;
} catch (IOException e) {
@@ -255,6 +260,11 @@ public class FileSnap implements SnapShot {
SnapStream.sealStream(snapOS, oa);
}
+ // serialize the last processed zxid and add another CRC check
+ if (dt.serializeLastProcessedZxid(oa)) {
+ SnapStream.sealStream(snapOS, oa);
+ }
+
lastSnapshotInfo = new SnapshotInfo(
Util.getZxidFromName(snapShot.getName(), SNAPSHOT_FILE_PREFIX),
snapShot.lastModified() / 1000);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
index 403720bda..16f9cf71e 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
@@ -468,9 +468,10 @@ public class FileTxnSnapLog {
* @param sessionsWithTimeouts the session timeouts to be
* serialized onto disk
* @param syncSnap sync the snapshot immediately after write
+ * @return the snapshot file
* @throws IOException
*/
- public void save(
+ public File save(
DataTree dataTree,
ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
boolean syncSnap) throws IOException {
@@ -479,6 +480,7 @@ public class FileTxnSnapLog {
LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapshotFile);
try {
snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
+ return snapshotFile;
} catch (IOException e) {
if (snapshotFile.length() == 0) {
/* This may be caused by a full disk. In such a case, the server
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RateLimiter.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RateLimiter.java
new file mode 100644
index 000000000..cb9473306
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RateLimiter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.zookeeper.common.Time;
+
+/**
+ * A class that provides simple interval-based rate limiting implementation.
+ */
+public class RateLimiter {
+ private final int rate;
+ private final long intervalInMs;
+ private long lastTimeReset;
+ private final AtomicInteger remained;
+
+ public RateLimiter(final int rate, final long interval, final TimeUnit unit) {
+ this.rate = rate;
+ this.intervalInMs = unit.toMillis(interval);
+ this.lastTimeReset = Time.currentElapsedTime();
+ this.remained = new AtomicInteger(rate);
+ }
+
+ public boolean allow() {
+ final long now = Time.currentElapsedTime();
+
+ // reset the rate if interval passed
+ if (now > lastTimeReset + intervalInMs) {
+ remained.set(rate);
+ lastTimeReset = now;
+ }
+
+ int value = remained.get();
+ boolean allowed = false;
+
+ // to handle race condition
+ while (!allowed && value > 0) {
+ allowed = remained.compareAndSet(value, value - 1);
+ value = remained.get();
+ }
+ return allowed;
+ }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/TakeSnapshotTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/TakeSnapshotTest.java
new file mode 100644
index 000000000..a9953b7c0
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/TakeSnapshotTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.File;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TakeSnapshotTest extends ClientBase {
+ private static final String BASE_PATH = "/takeSnapshotTest";
+ private static final int NODE_COUNT = 100;
+ private static final String HOSTPORT = "127.0.0.1:" + PortAssignment.unique();
+ private ZooKeeper zk;
+
+ @TempDir
+ static File dataDir;
+
+ @TempDir
+ static File logDir;
+
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+ ClientBase.setupTestEnv();
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (zk != null) {
+ zk.close();
+ }
+ }
+
+ @Test
+ public void testTakeSnapshotAndRestore() throws Exception {
+ ZooKeeperServer zks = new ZooKeeperServer(dataDir, logDir, 3000);
+ ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true);
+
+ final int port = Integer.parseInt(HOSTPORT.split(":")[1]);
+ final ServerCnxnFactory serverCnxnFactory = ServerCnxnFactory.createFactory(port, -1);
+ serverCnxnFactory.startup(zks);
+ assertTrue(ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
+
+ try {
+ zk = ClientBase.createZKClient(HOSTPORT);
+ for (int i = 0; i < NODE_COUNT; i++) {
+ final String path = BASE_PATH + "-" + i;
+ zk.create(path, String.valueOf(i).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+
+ // takeSnapshot
+ zks.takeSnapshot(false, false, true);
+
+ // clean up
+ zk.close();
+ zks.shutdown();
+
+ // start server again and assert the data restored from snapshot
+ zks = new ZooKeeperServer(dataDir, logDir, 3000);
+ ZooKeeperServer.setSerializeLastProcessedZxidEnabled(false);
+
+ serverCnxnFactory.startup(zks);
+ assertTrue(ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
+
+ zk = ClientBase.createZKClient(HOSTPORT);
+ for (int i = 0; i < NODE_COUNT; i++) {
+ final String path = BASE_PATH + "-" + i;
+ final String expectedData = String.valueOf(i);
+ assertArrayEquals(expectedData.getBytes(), zk.getData(path, null, null));
+ }
+ assertEquals(NODE_COUNT + 3, zk.getAllChildrenNumber("/"));
+ } finally {
+ zks.shutdown();
+ serverCnxnFactory.shutdown();
+ }
+ }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
index eb8fcbf93..826a875b4 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
@@ -71,6 +71,10 @@ public class ZKTestCase {
// accidentally attempting to start multiple admin servers on the
// same port.
System.setProperty("zookeeper.admin.enableServer", "false");
+
+ // disable rate limiting on the snapshot admin API
+ System.setProperty("zookeeper.admin.snapshot.intervalInMS", "0");
+
// ZOOKEEPER-2693 disables all 4lw by default.
// Here we enable the 4lw which ZooKeeper tests depends.
System.setProperty("zookeeper.4lw.commands.whitelist", "*");
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java
index 151e87343..07a69f14f 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java
@@ -517,6 +517,21 @@ public class DataTreeTest extends ZKTestCase {
}
@Test
+ public void testSerializeLastProcessedZxid_Enabled() throws Exception {
+ testSerializeLastProcessedZxid(true, true);
+ }
+
+ @Test
+ public void testSerializeLastProcessedZxid_Disabled() throws Exception {
+ testSerializeLastProcessedZxid(false, false);
+ }
+
+ @Test
+ public void testSerializeLastProcessedZxid_BackwardCompatibility() throws Exception {
+ testSerializeLastProcessedZxid(true, false);
+ }
+
+ @Test
public void testDataTreeMetrics() throws Exception {
ServerMetrics.getMetrics().resetAll();
@@ -616,4 +631,44 @@ public class DataTreeTest extends ZKTestCase {
}
}
+ private DataTree buildDataTreeForTest() {
+ final DataTree dt = new DataTree();
+ assertEquals(dt.lastProcessedZxid, 0);
+
+ dt.processTxn(
+ new TxnHeader(100, 1000, 1, 30, ZooDefs.OpCode.create),
+ new CreateTxn("/foo", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1),
+ null);
+ assertEquals(dt.lastProcessedZxid, 1);
+ return dt;
+ }
+
+ private void testSerializeLastProcessedZxid(boolean enableForSerialize, boolean enableForDeserialize) throws Exception{
+ final DataTree dt = buildDataTreeForTest();
+
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ ZooKeeperServer.setSerializeLastProcessedZxidEnabled(enableForSerialize);
+ final BinaryOutputArchive oa = BinaryOutputArchive.getArchive(baos);
+ if (enableForSerialize) {
+ assertTrue(dt.serializeLastProcessedZxid(oa));
+ } else {
+ assertFalse(dt.serializeLastProcessedZxid(oa));
+ }
+ baos.flush();
+
+ ZooKeeperServer.setSerializeLastProcessedZxidEnabled(enableForDeserialize);
+ try (final ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray())) {
+ final InputArchive ia = BinaryInputArchive.getArchive(bais);
+ if (enableForDeserialize) {
+ assertTrue(dt.deserializeLastProcessedZxid(ia));
+ } else {
+ assertFalse(dt.deserializeLastProcessedZxid(ia));
+ }
+ assertEquals(dt.lastProcessedZxid, 1);
+ }
+ } finally {
+ ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true);
+ }
+ }
+
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/SnapshotDigestTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/SnapshotDigestTest.java
index 89dac35a3..4e8c6f8e2 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/SnapshotDigestTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/SnapshotDigestTest.java
@@ -166,7 +166,7 @@ public class SnapshotDigestTest extends ClientBase {
private void testCompatibleHelper(Boolean enabledBefore, Boolean enabledAfter) throws Exception {
ZooKeeperServer.setDigestEnabled(enabledBefore);
-
+ ZooKeeperServer.setSerializeLastProcessedZxidEnabled(enabledBefore);
// restart the server to cache the option change
reloadSnapshotAndCheckDigest();
@@ -179,6 +179,7 @@ public class SnapshotDigestTest extends ClientBase {
server.takeSnapshot();
ZooKeeperServer.setDigestEnabled(enabledAfter);
+ ZooKeeperServer.setSerializeLastProcessedZxidEnabled(enabledAfter);
reloadSnapshotAndCheckDigest();
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java
index 6f6a29099..ae58fa563 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java
@@ -80,11 +80,13 @@ public class TxnLogDigestTest extends ClientBase {
@Override
public void setupCustomizedEnv() {
ZooKeeperServer.setDigestEnabled(true);
+ ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true);
}
@Override
public void cleanUpCustomizedEnv() {
ZooKeeperServer.setDigestEnabled(false);
+ ZooKeeperServer.setSerializeLastProcessedZxidEnabled(false);
}
@BeforeAll
@@ -189,6 +191,7 @@ public class TxnLogDigestTest extends ClientBase {
QuorumPeerMainTest.waitForOne(zk, States.CONNECTING);
ZooKeeperServer.setDigestEnabled(digestEnabled);
+ ZooKeeperServer.setSerializeLastProcessedZxidEnabled(digestEnabled);
startServer();
QuorumPeerMainTest.waitForOne(zk, States.CONNECTED);
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandResponseTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandResponseTest.java
index cfa130690..8009dab47 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandResponseTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandResponseTest.java
@@ -18,8 +18,10 @@
package org.apache.zookeeper.server.admin;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import java.util.HashMap;
import java.util.Map;
+import javax.servlet.http.HttpServletResponse;
import org.apache.zookeeper.ZKTestCase;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -30,13 +32,16 @@ public class CommandResponseTest extends ZKTestCase {
@BeforeEach
public void setUp() throws Exception {
- r = new CommandResponse("makemeasandwich", "makeityourself");
+ r = new CommandResponse("makemeasandwich", "makeityourself", HttpServletResponse.SC_OK);
}
@Test
public void testGetters() {
assertEquals("makemeasandwich", r.getCommand());
assertEquals("makeityourself", r.getError());
+ assertEquals(HttpServletResponse.SC_OK, r.getStatusCode());
+ assertEquals(new HashMap(), r.getHeaders());
+ assertNull(r.getInputStream());
}
@Test
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java
index ff8dc2033..2d74776e3 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java
@@ -18,18 +18,25 @@
package org.apache.zookeeper.server.admin;
+import static org.apache.zookeeper.server.ZooKeeperServer.ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED;
+import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_ENABLED;
+import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_INTERVAL;
+import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.REQUEST_QUERY_PARAM_STREAMING;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import javax.servlet.http.HttpServletResponse;
import org.apache.zookeeper.metrics.MetricsUtils;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ServerStats;
@@ -50,37 +57,52 @@ public class CommandsTest extends ClientBase {
* - the primary name of the command
* @param kwargs
* - keyword arguments to the command
+ * @param expectedHeaders
+ * - expected HTTP response headers
+ * @param expectedStatusCode
+ * - expected HTTP status code
* @param fields
* - the fields that are expected in the returned Map
* @throws IOException
* @throws InterruptedException
*/
- public void testCommand(String cmdName, Map<String, String> kwargs, Field... fields) throws IOException, InterruptedException {
+ private void testCommand(String cmdName, Map<String, String> kwargs,
+ Map<String, String> expectedHeaders, int expectedStatusCode,
+ Field... fields) throws IOException, InterruptedException {
ZooKeeperServer zks = serverFactory.getZooKeeperServer();
- Map<String, Object> result = Commands.runCommand(cmdName, zks, kwargs).toMap();
-
- assertTrue(result.containsKey("command"));
- // This is only true because we're setting cmdName to the primary name
- assertEquals(cmdName, result.remove("command"));
- assertTrue(result.containsKey("error"));
- assertNull(result.remove("error"), "error: " + result.get("error"));
-
- for (Field field : fields) {
- String k = field.key;
- assertTrue(result.containsKey(k),
- "Result from command " + cmdName + " missing field \"" + k + "\"" + "\n" + result);
- Class<?> t = field.type;
- Object v = result.remove(k);
- assertTrue(t.isAssignableFrom(v.getClass()),
- "\"" + k + "\" field from command " + cmdName
- + " should be of type " + t + ", is actually of type " + v.getClass());
+ final CommandResponse commandResponse = Commands.runCommand(cmdName, zks, kwargs);
+ assertNotNull(commandResponse);
+ assertEquals(expectedStatusCode, commandResponse.getStatusCode());
+ try (final InputStream responseStream = commandResponse.getInputStream()) {
+ if (Boolean.parseBoolean(kwargs.getOrDefault(REQUEST_QUERY_PARAM_STREAMING, "false"))) {
+ assertNotNull(responseStream, "InputStream in the response of command " + cmdName + " should not be null");
+ } else {
+ Map<String, Object> result = commandResponse.toMap();
+ assertTrue(result.containsKey("command"));
+ // This is only true because we're setting cmdName to the primary name
+ assertEquals(cmdName, result.remove("command"));
+ assertTrue(result.containsKey("error"));
+ assertNull(result.remove("error"), "error: " + result.get("error"));
+
+ for (Field field : fields) {
+ String k = field.key;
+ assertTrue(result.containsKey(k),
+ "Result from command " + cmdName + " missing field \"" + k + "\"" + "\n" + result);
+ Class<?> t = field.type;
+ Object v = result.remove(k);
+ assertTrue(t.isAssignableFrom(v.getClass()),
+ "\"" + k + "\" field from command " + cmdName
+ + " should be of type " + t + ", is actually of type " + v.getClass());
+ }
+
+ assertTrue(result.isEmpty(), "Result from command " + cmdName + " contains extra fields: " + result);
+ }
}
-
- assertTrue(result.isEmpty(), "Result from command " + cmdName + " contains extra fields: " + result);
+ assertEquals(expectedHeaders, commandResponse.getHeaders());
}
public void testCommand(String cmdName, Field... fields) throws IOException, InterruptedException {
- testCommand(cmdName, new HashMap<String, String>(), fields);
+ testCommand(cmdName, new HashMap<String, String>(), new HashMap<>(), HttpServletResponse.SC_OK, fields);
}
private static class Field {
@@ -91,7 +113,16 @@ public class CommandsTest extends ClientBase {
this.key = key;
this.type = type;
}
+ }
+ @Test
+ public void testSnapshot_streaming() throws IOException, InterruptedException {
+ testSnapshot(true);
+ }
+
+ @Test
+ public void testSnapshot_nonStreaming() throws IOException, InterruptedException {
+ testSnapshot(false);
}
@Test
@@ -208,7 +239,7 @@ public class CommandsTest extends ClientBase {
public void testSetTraceMask() throws IOException, InterruptedException {
Map<String, String> kwargs = new HashMap<String, String>();
kwargs.put("traceMask", "1");
- testCommand("set_trace_mask", kwargs, new Field("tracemask", Long.class));
+ testCommand("set_trace_mask", kwargs, new HashMap<>(), HttpServletResponse.SC_OK, new Field("tracemask", Long.class));
}
@Test
@@ -288,4 +319,22 @@ public class CommandsTest extends ClientBase {
assertThat(response.toMap().containsKey("secure_connections"), is(true));
}
+ private void testSnapshot(final boolean streaming) throws IOException, InterruptedException {
+ System.setProperty(ADMIN_SNAPSHOT_ENABLED, "true");
+ System.setProperty(ADMIN_SNAPSHOT_INTERVAL, "0");
+ System.setProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true");
+ try {
+ final Map<String, String> kwargs = new HashMap<>();
+ kwargs.put(REQUEST_QUERY_PARAM_STREAMING, String.valueOf(streaming));
+ final Map<String, String> expectedHeaders = new HashMap<>();
+ expectedHeaders.put(Commands.SnapshotCommand.RESPONSE_HEADER_LAST_ZXID, "0x0");
+ expectedHeaders.put(Commands.SnapshotCommand.RESPONSE_HEADER_SNAPSHOT_SIZE, "478");
+ testCommand("snapshot", kwargs, expectedHeaders, HttpServletResponse.SC_OK);
+ } finally {
+ System.clearProperty(ADMIN_SNAPSHOT_ENABLED);
+ System.clearProperty(ADMIN_SNAPSHOT_INTERVAL);
+ System.clearProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED);
+ }
+ }
+
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java
index c269e1994..b06cde6fd 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java
@@ -61,9 +61,9 @@ public class JettyAdminServerTest extends ZKTestCase {
protected static final Logger LOG = LoggerFactory.getLogger(JettyAdminServerTest.class);
- private static final String URL_FORMAT = "http://localhost:%d/commands";
- private static final String HTTPS_URL_FORMAT = "https://localhost:%d/commands";
- private static final int jettyAdminPort = PortAssignment.unique();
+ static final String URL_FORMAT = "http://localhost:%d/commands";
+ static final String HTTPS_URL_FORMAT = "https://localhost:%d/commands";
+ static final int jettyAdminPort = PortAssignment.unique();
@BeforeEach
public void enableServer() {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotCommandTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotCommandTest.java
new file mode 100644
index 000000000..5b5dbd979
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotCommandTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.admin;
+
+import static org.apache.zookeeper.server.ZooKeeperServer.ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED;
+import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_ENABLED;
+import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_INTERVAL;
+import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.REQUEST_QUERY_PARAM_STREAMING;
+import static org.apache.zookeeper.server.admin.JettyAdminServerTest.URL_FORMAT;
+import static org.apache.zookeeper.server.admin.JettyAdminServerTest.jettyAdminPort;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.Map;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.IOUtils;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class SnapshotCommandTest extends ZKTestCase {
+ private static final Logger LOG = LoggerFactory.getLogger(SnapshotCommandTest.class);
+
+ private static final String PATH = "/snapshot_test";
+ private static final int NODE_COUNT = 10;
+
+ private final String hostPort = "127.0.0.1:" + PortAssignment.unique();
+ private ServerCnxnFactory cnxnFactory;
+ private JettyAdminServer adminServer;
+ private ZooKeeperServer zks;
+ private ZooKeeper zk;
+
+ @TempDir
+ static File dataDir;
+
+ @TempDir
+ static File logDir;
+
+ @BeforeAll
+ public void setup() throws Exception {
+ // start ZookeeperServer
+ System.setProperty("zookeeper.4lw.commands.whitelist", "*");
+ zks = new ZooKeeperServer(dataDir, logDir, 3000);
+ final int port = Integer.parseInt(hostPort.split(":")[1]);
+ cnxnFactory = ServerCnxnFactory.createFactory(port, -1);
+ cnxnFactory.startup(zks);
+ assertTrue(ClientBase.waitForServerUp(hostPort, 120000));
+
+ // start AdminServer
+ System.setProperty("zookeeper.admin.enableServer", "true");
+ System.setProperty("zookeeper.admin.serverPort", "" + jettyAdminPort);
+ System.setProperty(ADMIN_SNAPSHOT_ENABLED, "true");
+ System.setProperty(ADMIN_SNAPSHOT_INTERVAL, "0");
+ System.setProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true");
+
+ adminServer = new JettyAdminServer();
+ adminServer.setZooKeeperServer(zks);
+ adminServer.start();
+
+ // create Zookeeper client and test data
+ zk = ClientBase.createZKClient(hostPort);
+ createData(zk, NODE_COUNT);
+ }
+
+ @AfterAll
+ public void tearDown() throws Exception {
+ System.clearProperty("zookeeper.4lw.commands.whitelist");
+ System.clearProperty("zookeeper.admin.enableServer");
+ System.clearProperty("zookeeper.admin.serverPort");
+ System.clearProperty(ADMIN_SNAPSHOT_ENABLED);
+ System.clearProperty(ADMIN_SNAPSHOT_INTERVAL);
+ System.clearProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED);
+
+ if (zk != null) {
+ zk.close();
+ }
+
+ if (adminServer != null) {
+ adminServer.shutdown();
+ }
+
+ if (cnxnFactory != null) {
+ cnxnFactory.shutdown();
+ }
+
+ if (zks != null) {
+ zks.shutdown();
+ }
+ }
+
+ @Test
+ public void testSnapshotCommand_streaming() throws Exception {
+ // take snapshot with streaming
+ final HttpURLConnection snapshotConn = sendSnapshotRequest(true);
+
+ // validate snapshot response
+ assertEquals(HttpURLConnection.HTTP_OK, snapshotConn.getResponseCode());
+ validateResponseHeaders(snapshotConn);
+ final File snapshotFile = new File(dataDir + "/snapshot." + System.currentTimeMillis());
+ try (final InputStream inputStream = snapshotConn.getInputStream();
+ final FileOutputStream outputStream = new FileOutputStream(snapshotFile)) {
+ IOUtils.copyBytes(inputStream, outputStream, 1024, true);
+ final long fileSize = Files.size(snapshotFile.toPath());
+ assertTrue(fileSize > 0);
+ }
+ }
+
+ @Test
+ public void testSnapshotCommand_nonStreaming() throws Exception {
+ // take snapshot without streaming
+ final HttpURLConnection snapshotConn = sendSnapshotRequest(false);
+
+ // validate snapshot response
+ assertEquals(HttpURLConnection.HTTP_OK, snapshotConn.getResponseCode());
+ validateResponseHeaders(snapshotConn);
+ displayResponsePayload(snapshotConn);
+ }
+
+ @Test
+ public void testSnapshotCommand_disabled() throws Exception {
+ System.setProperty(ADMIN_SNAPSHOT_ENABLED, "false");
+ try {
+ // take snapshot
+ final HttpURLConnection snapshotConn = sendSnapshotRequest(true);
+
+ // validate snapshot response
+ assertEquals(HttpServletResponse.SC_SERVICE_UNAVAILABLE, snapshotConn.getResponseCode());
+ } finally {
+ System.setProperty(ADMIN_SNAPSHOT_ENABLED, "true");
+ }
+ }
+
+ @Test
+ public void testSnapshotCommand_serializeLastZxidDisabled() throws Exception {
+ ZooKeeperServer.setSerializeLastProcessedZxidEnabled(false);
+ try {
+ // take snapshot
+ final HttpURLConnection snapshotConn = sendSnapshotRequest(true);
+
+ // validate snapshot response
+ assertEquals(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, snapshotConn.getResponseCode());
+ } finally {
+ ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true);
+ }
+ }
+
+ private void createData(final ZooKeeper zk, final long count) throws Exception {
+ try {
+ zk.create(PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch (final KeeperException.NodeExistsException ignore) {
+ // ignore
+ }
+
+ for (int i = 0; i < count; i++) {
+ final String processNodePath = zk.create(String.format("%s/%s", PATH, "n_"), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+ LOG.info("Node created. path={}" + processNodePath);
+ }
+ }
+
+ private HttpURLConnection sendSnapshotRequest(final boolean streaming) throws Exception {
+ final String queryParamsStr = buildQueryStringForSnapshotCommand(streaming);
+ final URL snapshotURL = new URL(String.format(URL_FORMAT + "/snapshot", jettyAdminPort) + "?" + queryParamsStr);
+ final HttpURLConnection snapshotConn = (HttpURLConnection) snapshotURL.openConnection();
+ snapshotConn.setRequestMethod("GET");
+
+ return snapshotConn;
+ }
+
+ private String buildQueryStringForSnapshotCommand(final boolean streaming) throws Exception {
+ final Map<String, String> parameters = new HashMap<>();
+ parameters.put(REQUEST_QUERY_PARAM_STREAMING, String.valueOf(streaming));
+ return getParamsString(parameters);
+ }
+
+ private static String getParamsString(final Map<String, String> params) throws UnsupportedEncodingException {
+ final StringBuilder result = new StringBuilder();
+
+ for (final Map.Entry<String, String> entry : params.entrySet()) {
+ result.append(URLEncoder.encode(entry.getKey(), "UTF-8"));
+ result.append("=");
+
+ result.append(URLEncoder.encode(entry.getValue(), "UTF-8"));
+ result.append("&");
+ }
+
+ final String resultString = result.toString();
+ return resultString.length() > 0
+ ? resultString.substring(0, resultString.length() - 1)
+ : resultString;
+ }
+
+ private void validateResponseHeaders(final HttpURLConnection conn) {
+ LOG.info("Header:{}, Value:{}",
+ Commands.SnapshotCommand.RESPONSE_HEADER_LAST_ZXID,
+ conn.getHeaderField(Commands.SnapshotCommand.RESPONSE_HEADER_LAST_ZXID));
+ assertNotNull(conn.getHeaderField(Commands.SnapshotCommand.RESPONSE_HEADER_LAST_ZXID));
+
+ LOG.info("Header:{}, Value:{}",
+ Commands.SnapshotCommand.RESPONSE_HEADER_SNAPSHOT_SIZE,
+ conn.getHeaderField(Commands.SnapshotCommand.RESPONSE_HEADER_SNAPSHOT_SIZE));
+ assertNotNull(conn.getHeaderField(Commands.SnapshotCommand.RESPONSE_HEADER_SNAPSHOT_SIZE));
+ assertTrue(Integer.parseInt(conn.getHeaderField(Commands.SnapshotCommand.RESPONSE_HEADER_SNAPSHOT_SIZE)) > 0);
+ }
+
+ private void displayResponsePayload(final HttpURLConnection conn) throws IOException {
+ final StringBuilder sb = new StringBuilder();
+ try (final BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()))) {
+ String inputLine;
+ while ((inputLine = in.readLine()) != null) {
+ sb.append(inputLine);
+ }
+ LOG.info("Response payload: {}", sb);
+ }
+ }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogTest.java
index 44a009934..466cf3f4b 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogTest.java
@@ -393,6 +393,10 @@ public class FileTxnSnapLogTest {
SnapStream.setStreamMode(snappyEnabled ? SnapStream.StreamMode.SNAPPY : SnapStream.StreamMode.DEFAULT_MODE);
ZooKeeperServer.setDigestEnabled(digestEnabled);
+ // set the flag to be the same as digestEnabled to make sure the last serialized data
+ // (for example, datatree, digest, lastProcessedZxid) is setup as expected for backward
+ // compatibility test.
+ ZooKeeperServer.setSerializeLastProcessedZxidEnabled(digestEnabled);
TxnHeader txnHeader = new TxnHeader(1, 1, 1, 1 + 1, ZooDefs.OpCode.create);
CreateTxn txn = new CreateTxn("/" + 1, "data".getBytes(), null, false, 1);
Request request = new Request(1, 1, 1, txnHeader, txn, 1);
@@ -401,6 +405,10 @@ public class FileTxnSnapLogTest {
int expectedNodeCount = dataTree.getNodeCount();
ZooKeeperServer.setDigestEnabled(!digestEnabled);
+ // set the flag to be the same as digestEnabled to make sure the last serialized data
+ // (for example, datatree, digest, lastProcessedZxid) is setup as expected for backward
+ // compatibility test.
+ ZooKeeperServer.setSerializeLastProcessedZxidEnabled(!digestEnabled);
snaplog.restore(dataTree, sessions, (hdr, rec, digest) -> { });
assertEquals(expectedNodeCount, dataTree.getNodeCount());
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
index c39bc54e0..a75fb3830 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
@@ -150,7 +150,11 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
@Override
public void process(String path) {
LOG.info("Take a snapshot");
- zkServer.takeSnapshot(true);
+ try {
+ zkServer.takeSnapshot(true);
+ } catch (final IOException e) {
+ // ignored as it should never reach here because of System.exit() call
+ }
}
});
@@ -373,7 +377,11 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
public void process(long sessionId) {
LOG.info("Take snapshot");
if (shouldTakeSnapshot.getAndSet(false)) {
- zkServer.takeSnapshot(true);
+ try {
+ zkServer.takeSnapshot(true);
+ } catch (IOException e) {
+ // ignored as it should never reach here because of System.exit() call
+ }
}
}
});
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/RateLimiterTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/RateLimiterTest.java
new file mode 100644
index 000000000..b19e96823
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/RateLimiterTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.Test;
+
+public class RateLimiterTest {
+
+ @Test
+ public void testAllow_withinInterval() {
+ final int rate = 2;
+ final RateLimiter rateLimiter = new RateLimiter(rate, 5, TimeUnit.SECONDS);
+ for (int i = 0; i < rate; i++) {
+ assertTrue(rateLimiter.allow());
+ }
+ assertFalse(rateLimiter.allow());
+ }
+
+ @Test
+ public void testAllow_withinInterval_multiThreaded() {
+ final int rate = 10;
+
+ final RateLimiter rateLimiter = new RateLimiter(rate, 5, TimeUnit.SECONDS);
+ final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(rate + 1);
+ for (int i = 0; i < rate; i++) {
+ executor.execute(() -> assertTrue(rateLimiter.allow()));
+ }
+ executor.execute(() -> assertFalse(rateLimiter.allow()));
+ }
+
+ @Test
+ public void testAllow_exceedInterval() throws Exception {
+ final int interval = 1;
+
+ final RateLimiter rateLimiter = new RateLimiter(1, interval, TimeUnit.SECONDS);
+ assertTrue(rateLimiter.allow());
+ assertFalse(rateLimiter.allow());
+ Thread.sleep(TimeUnit.SECONDS.toMillis(interval + 1));
+ assertTrue(rateLimiter.allow());
+ }
+}