summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorli4wang <68786536+li4wang@users.noreply.github.com>2023-01-26 09:12:14 -0800
committerGitHub <noreply@github.com>2023-01-26 18:12:14 +0100
commitd35bdfb9d3a4bc3d94f22785604418d4f650365d (patch)
tree6b53c3ac8299a87b1b10fad39a9b0907d589ca01
parent778c4519e676278d2b76330df6e60032437a9973 (diff)
downloadzookeeper-d35bdfb9d3a4bc3d94f22785604418d4f650365d.tar.gz
ZOOKEEPER-4571: Admin server API for restore database from a snapshot (#1961)
Provides a restore command for restoring database from a snapshot Author: Li Wang <liwang@apple.com> Co-authored-by: liwang <liwang@apple.com>
-rw-r--r--zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md23
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java18
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java40
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java82
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Command.java26
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java222
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/GetCommand.java43
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java57
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/PostCommand.java37
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java2
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java2
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java4
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/ZookeeperServerRestoreTest.java141
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/ZookeeperServerSnapshotTest.java (renamed from zookeeper-server/src/test/java/org/apache/zookeeper/TakeSnapshotTest.java)55
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java85
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java2
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotAndRestoreCommandTest.java (renamed from zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotCommandTest.java)212
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java2
18 files changed, 891 insertions, 162 deletions
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index b4c01d35d..636c6cde2 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -2115,16 +2115,22 @@ Both subsystems need to have sufficient amount of threads to achieve peak read t
**New in 3.9.0:** The following
options are used to configure the [AdminServer](#sc_adminserver).
+* *admin.rateLimiterIntervalInMS* :
+ (Java system property: **zookeeper.admin.rateLimiterIntervalInMS**)
+ The time interval for rate limiting admin command to protect the server.
+ Defaults to 5 mins.
+
* *admin.snapshot.enabled* :
(Java system property: **zookeeper.admin.snapshot.enabled**)
The flag for enabling the snapshot command. Defaults to false.
It will be enabled by default once the auth support for admin server commands
is available.
-
-* *admin.snapshot.intervalInMS* :
- (Java system property: **zookeeper.admin.snapshot.intervalInMS**)
- The time interval for rate limiting snapshot command to protect the server.
- Defaults to 5 mins.
+
+* *admin.restore.enabled* :
+ (Java system property: **zookeeper.admin.restore.enabled**)
+ The flag for enabling the restore command. Defaults to false.
+ It will be enabled by default once the auth support for admin server commands
+ is available.
**New in 3.7.1:** The following
options are used to configure the [AdminServer](#sc_adminserver).
@@ -2641,6 +2647,13 @@ Available commands include:
Reset all observer connection statistics. Companion command to *observers*.
No new fields returned.
+* *restore/rest* :
+ Restore database from snapshot input stream on the current server.
+ Returns the following data in response payload:
+ "last_zxid": String
+ Note: this API is rate-limited (once every 5 mins by default) to protect the server
+ from being over-loaded.
+
* *ruok* :
No-op command, check if the server is running.
A response does not necessarily indicate that the
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index ef28e32e1..e3476ff6d 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -74,6 +74,9 @@ public final class ServerMetrics {
SNAPSHOT_TIME = metricsContext.getSummary("snapshottime", DetailLevel.BASIC);
SNAPSHOT_ERROR_COUNT = metricsContext.getCounter("snapshot_error_count");
SNAPSHOT_RATE_LIMITED_COUNT = metricsContext.getCounter("snapshot_rate_limited_count");
+ RESTORE_TIME = metricsContext.getSummary("restore_time", DetailLevel.BASIC);
+ RESTORE_ERROR_COUNT = metricsContext.getCounter("restore_error_count");
+ RESTORE_RATE_LIMITED_COUNT = metricsContext.getCounter("restore_rate_limited_count");
DB_INIT_TIME = metricsContext.getSummary("dbinittime", DetailLevel.BASIC);
READ_LATENCY = metricsContext.getSummary("readlatency", DetailLevel.ADVANCED);
UPDATE_LATENCY = metricsContext.getSummary("updatelatency", DetailLevel.ADVANCED);
@@ -289,6 +292,21 @@ public final class ServerMetrics {
public final Counter SNAPSHOT_RATE_LIMITED_COUNT;
/**
+ * Restore time
+ */
+ public final Summary RESTORE_TIME;
+
+ /**
+ * Restore error count
+ */
+ public final Counter RESTORE_ERROR_COUNT;
+
+ /**
+ * Restore rate limited count
+ */
+ public final Counter RESTORE_RATE_LIMITED_COUNT;
+
+ /**
* Db init time (snapshot loading + txnlog replay)
*/
public final Summary DB_INIT_TIME;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
index 05ada8cee..f20af88d5 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.zip.CheckedInputStream;
import org.apache.jute.InputArchive;
import org.apache.jute.OutputArchive;
import org.apache.jute.Record;
@@ -48,8 +49,10 @@ import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
+import org.apache.zookeeper.server.persistence.FileSnap;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog.PlayBackListener;
+import org.apache.zookeeper.server.persistence.SnapStream;
import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.server.quorum.Leader.Proposal;
@@ -621,6 +624,43 @@ public class ZKDatabase {
}
/**
+ * Deserialize a snapshot that contains FileHeader from an input archive. It is used by
+ * the admin restore command.
+ *
+ * @param ia the input archive to deserialize from
+ * @param is the CheckInputStream to check integrity
+ *
+ * @throws IOException
+ */
+ public void deserializeSnapshot(final InputArchive ia, final CheckedInputStream is) throws IOException {
+ clear();
+
+ // deserialize data tree
+ final DataTree dataTree = getDataTree();
+ final FileSnap filesnap = new FileSnap(snapLog.getSnapDir());
+ filesnap.deserialize(dataTree, getSessionWithTimeOuts(), ia);
+ SnapStream.checkSealIntegrity(is, ia);
+
+ // deserialize digest and check integrity
+ if (dataTree.deserializeZxidDigest(ia, 0)) {
+ SnapStream.checkSealIntegrity(is, ia);
+ }
+
+ // deserialize lastProcessedZxid and check integrity
+ if (dataTree.deserializeLastProcessedZxid(ia)) {
+ SnapStream.checkSealIntegrity(is, ia);
+ }
+
+ // compare the digest to find inconsistency
+ if (dataTree.getDigestFromLoadedSnapshot() != null) {
+ dataTree.compareSnapshotDigests(dataTree.lastProcessedZxid);
+ }
+
+ initialized = true;
+ }
+
+
+ /**
* serialize the snapshot
* @param oa the output archive to which the snapshot needs to be serialized
* @throws IOException
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index d61f269c9..bbb0f7fee 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -19,9 +19,11 @@
package org.apache.zookeeper.server;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
@@ -34,11 +36,16 @@ import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
+import java.util.zip.Adler32;
+import java.util.zip.CheckedInputStream;
import javax.security.sasl.SaslException;
+import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.InputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.Environment;
import org.apache.zookeeper.KeeperException;
@@ -127,6 +134,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
// this feature is confirmed to be stable
public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled";
private static boolean closeSessionTxnEnabled = true;
+ private volatile CountDownLatch restoreLatch;
static {
LOG = LoggerFactory.getLogger(ZooKeeperServer.class);
@@ -541,12 +549,12 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
takeSnapshot();
}
- public void takeSnapshot() throws IOException {
- takeSnapshot(false);
+ public File takeSnapshot() throws IOException {
+ return takeSnapshot(false);
}
- public void takeSnapshot(boolean syncSnap) throws IOException {
- takeSnapshot(syncSnap, true, false);
+ public File takeSnapshot(boolean syncSnap) throws IOException {
+ return takeSnapshot(syncSnap, true, false);
}
/**
@@ -583,6 +591,61 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
return snapFile;
}
+ /**
+ * Restores database from a snapshot. It is used by the restore admin server command.
+ *
+ * @param inputStream input stream of snapshot
+ * @Return last processed zxid
+ * @throws IOException
+ */
+ public synchronized long restoreFromSnapshot(final InputStream inputStream) throws IOException {
+ if (inputStream == null) {
+ throw new IllegalArgumentException("InputStream can not be null when restoring from snapshot");
+ }
+
+ long start = Time.currentElapsedTime();
+ LOG.info("Before restore database. lastProcessedZxid={}, nodeCount={},sessionCount={}",
+ getZKDatabase().getDataTreeLastProcessedZxid(),
+ getZKDatabase().dataTree.getNodeCount(),
+ getZKDatabase().getSessionCount());
+
+ // restore to a new zkDatabase
+ final ZKDatabase newZKDatabase = new ZKDatabase(this.txnLogFactory);
+ final CheckedInputStream cis = new CheckedInputStream(new BufferedInputStream(inputStream), new Adler32());
+ final InputArchive ia = BinaryInputArchive.getArchive(cis);
+ newZKDatabase.deserializeSnapshot(ia, cis);
+ LOG.info("Restored to a new database. lastProcessedZxid={}, nodeCount={}, sessionCount={}",
+ newZKDatabase.getDataTreeLastProcessedZxid(),
+ newZKDatabase.dataTree.getNodeCount(),
+ newZKDatabase.getSessionCount());
+
+ // create a CountDownLatch
+ restoreLatch = new CountDownLatch(1);
+
+ try {
+ // set to the new zkDatabase
+ setZKDatabase(newZKDatabase);
+
+ // re-create SessionTrack
+ createSessionTracker();
+ } finally {
+ // unblock request submission
+ restoreLatch.countDown();
+ restoreLatch = null;
+ }
+
+ LOG.info("After restore database. lastProcessedZxid={}, nodeCount={}, sessionCount={}",
+ getZKDatabase().getDataTreeLastProcessedZxid(),
+ getZKDatabase().dataTree.getNodeCount(),
+ getZKDatabase().getSessionCount());
+
+ long elapsed = Time.currentElapsedTime() - start;
+ LOG.info("Restore taken in {} ms", elapsed);
+ ServerMetrics.getMetrics().RESTORE_TIME.add(elapsed);
+
+ return getLastProcessedZxid();
+ }
+
public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() {
return txnLogFactory.shouldForceWriteInitialSnapshotAfterLeaderElection();
}
@@ -826,6 +889,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
* <li>During shutdown the server sets the state to SHUTDOWN, which
* corresponds to the server not running.</li></ul>
*
+ * <li>During maintenance (e.g. restore) the server sets the state to MAINTENANCE
+ * </li></ul>
+ *
* @param state new server state.
*/
protected void setState(State state) {
@@ -1151,6 +1217,14 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
public void submitRequest(Request si) {
+ if (restoreLatch != null) {
+ try {
+ LOG.info("Blocking request submission while restore is in progress");
+ restoreLatch.await();
+ } catch (final InterruptedException e) {
+ LOG.warn("Unexpected interruption", e);
+ }
+ }
enqueueRequest(si);
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Command.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Command.java
index b422715bf..5d06356b2 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Command.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Command.java
@@ -18,6 +18,7 @@
package org.apache.zookeeper.server.admin;
+import java.io.InputStream;
import java.util.Map;
import java.util.Set;
import org.apache.zookeeper.server.ZooKeeperServer;
@@ -57,19 +58,34 @@ public interface Command {
boolean isServerRequired();
/**
- * Run this command. Commands take a ZooKeeperServer and String-valued
- * keyword arguments and return a map containing any information
+ * Run this command for HTTP GET request. Commands take a ZooKeeperServer, String-valued keyword
+ * arguments and return a CommandResponse object containing any information
* constituting the response to the command. Commands are responsible for
* parsing keyword arguments and performing any error handling if necessary.
* Errors should be reported by setting the "error" entry of the returned
* map with an appropriate message rather than throwing an exception.
*
- * @param zkServer
+ * @param zkServer ZooKeeper server
* @param kwargs keyword -&gt; argument value mapping
- * @return Map representing response to command containing at minimum:
+ * @return CommandResponse representing response to command containing at minimum:
* - "command" key containing the command's primary name
* - "error" key containing a String error message or null if no error
*/
- CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs);
+ CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs);
+ /**
+ * Run this command for HTTP POST. Commands take a ZooKeeperServer and InputStream and
+ * return a CommandResponse object containing any information
+ * constituting the response to the command. Commands are responsible for
+ * parsing keyword arguments and performing any error handling if necessary.
+ * Errors should be reported by setting the "error" entry of the returned
+ * map with an appropriate message rather than throwing an exception.
+ *
+ * @param zkServer ZooKeeper server
+ * @param inputStream InputStream from request
+ * @return CommandResponse representing response to command containing at minimum:
+ * - "command" key containing the command's primary name
+ * - "error" key containing a String error message or null if no error
+ */
+ CommandResponse runPost(ZooKeeperServer zkServer, InputStream inputStream);
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
index 848583a0f..1911b5a57 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.File;
import java.io.FileInputStream;
+import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
@@ -73,6 +74,8 @@ import org.slf4j.LoggerFactory;
public class Commands {
static final Logger LOG = LoggerFactory.getLogger(Commands.class);
+ static final String ADMIN_RATE_LIMITER_INTERVAL = "zookeeper.admin.rateLimiterIntervalInMS";
+ private static final long rateLimiterInterval = Integer.parseInt(System.getProperty(ADMIN_RATE_LIMITER_INTERVAL, "300000"));
/** Maps command names to Command instances */
private static Map<String, Command> commands = new HashMap<String, Command>();
@@ -100,16 +103,16 @@ public class Commands {
*
* @param cmdName
* @param zkServer
- * @param kwargs String-valued keyword arguments to the command
+ * @param kwargs String-valued keyword arguments to the command from HTTP GET request
* (may be null if command requires no additional arguments)
* @return Map representing response to command containing at minimum:
* - "command" key containing the command's primary name
* - "error" key containing a String error message or null if no error
*/
- public static CommandResponse runCommand(
- String cmdName,
- ZooKeeperServer zkServer,
- Map<String, String> kwargs) {
+ public static CommandResponse runGetCommand(
+ String cmdName,
+ ZooKeeperServer zkServer,
+ Map<String, String> kwargs) {
Command command = getCommand(cmdName);
if (command == null) {
// set the status code to 200 to keep the current behavior of existing commands
@@ -119,7 +122,36 @@ public class Commands {
// set the status code to 200 to keep the current behavior of existing commands
return new CommandResponse(cmdName, "This ZooKeeper instance is not currently serving requests", HttpServletResponse.SC_OK);
}
- return command.run(zkServer, kwargs);
+ return command.runGet(zkServer, kwargs);
+ }
+
+ /**
+ * Run the registered command with name cmdName. Commands should not produce
+ * any exceptions; any (anticipated) errors should be reported in the
+ * "error" entry of the returned map. Likewise, if no command with the given
+ * name is registered, this will be noted in the "error" entry.
+ *
+ * @param cmdName
+ * @param zkServer
+ * @param inputStream InputStream from HTTP POST request
+ * @return Map representing response to command containing at minimum:
+ * - "command" key containing the command's primary name
+ * - "error" key containing a String error message or null if no error
+ */
+ public static CommandResponse runPostCommand(
+ String cmdName,
+ ZooKeeperServer zkServer,
+ InputStream inputStream) {
+ Command command = getCommand(cmdName);
+ if (command == null) {
+ // set the status code to 200 to keep the current behavior of existing commands
+ return new CommandResponse(cmdName, "Unknown command: " + cmdName, HttpServletResponse.SC_OK);
+ }
+ if (command.isServerRequired() && (zkServer == null || !zkServer.isRunning())) {
+ // set the status code to 200 to keep the current behavior of existing commands
+ return new CommandResponse(cmdName, "This ZooKeeper instance is not currently serving requests", HttpServletResponse.SC_OK);
+ }
+ return command.runPost(zkServer, inputStream);
}
/**
@@ -152,6 +184,7 @@ public class Commands {
registerCommand(new LeaderCommand());
registerCommand(new MonitorCommand());
registerCommand(new ObserverCnxnStatResetCommand());
+ registerCommand(new RestoreCommand());
registerCommand(new RuokCommand());
registerCommand(new SetTraceMaskCommand());
registerCommand(new SnapshotCommand());
@@ -170,14 +203,14 @@ public class Commands {
/**
* Reset all connection statistics.
*/
- public static class CnxnStatResetCommand extends CommandBase {
+ public static class CnxnStatResetCommand extends GetCommand {
public CnxnStatResetCommand() {
super(Arrays.asList("connection_stat_reset", "crst"));
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
CommandResponse response = initializeResponse();
zkServer.getServerCnxnFactory().resetAllConnectionStats();
return response;
@@ -190,14 +223,14 @@ public class Commands {
* Server configuration parameters.
* @see ZooKeeperServer#getConf()
*/
- public static class ConfCommand extends CommandBase {
+ public static class ConfCommand extends GetCommand {
public ConfCommand() {
super(Arrays.asList("configuration", "conf", "config"));
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
CommandResponse response = initializeResponse();
response.putAll(zkServer.getConf().toMap());
return response;
@@ -210,14 +243,14 @@ public class Commands {
* - "connections": list of connection info objects
* @see org.apache.zookeeper.server.ServerCnxn#getConnectionInfo(boolean)
*/
- public static class ConsCommand extends CommandBase {
+ public static class ConsCommand extends GetCommand {
public ConsCommand() {
super(Arrays.asList("connections", "cons"));
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
CommandResponse response = initializeResponse();
ServerCnxnFactory serverCnxnFactory = zkServer.getServerCnxnFactory();
if (serverCnxnFactory != null) {
@@ -239,14 +272,14 @@ public class Commands {
/**
* Information on ZK datadir and snapdir size in bytes
*/
- public static class DirsCommand extends CommandBase {
+ public static class DirsCommand extends GetCommand {
public DirsCommand() {
super(Arrays.asList("dirs"));
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
CommandResponse response = initializeResponse();
response.put("datadir_size", zkServer.getDataDirSize());
response.put("logdir_size", zkServer.getLogDirSize());
@@ -264,14 +297,14 @@ public class Commands {
* @see ZooKeeperServer#getSessionExpiryMap()
* @see ZooKeeperServer#getEphemerals()
*/
- public static class DumpCommand extends CommandBase {
+ public static class DumpCommand extends GetCommand {
public DumpCommand() {
super(Arrays.asList("dump"));
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
CommandResponse response = initializeResponse();
response.put("expiry_time_to_session_ids", zkServer.getSessionExpiryMap());
response.put("session_id_to_ephemeral_paths", zkServer.getEphemerals());
@@ -283,14 +316,14 @@ public class Commands {
/**
* All defined environment variables.
*/
- public static class EnvCommand extends CommandBase {
+ public static class EnvCommand extends GetCommand {
public EnvCommand() {
super(Arrays.asList("environment", "env", "envi"), false);
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
CommandResponse response = initializeResponse();
for (Entry e : Environment.list()) {
response.put(e.getKey(), e.getValue());
@@ -303,14 +336,14 @@ public class Commands {
/**
* Digest histories for every specific number of txns.
*/
- public static class DigestCommand extends CommandBase {
+ public static class DigestCommand extends GetCommand {
public DigestCommand() {
super(Arrays.asList("hash"));
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
CommandResponse response = initializeResponse();
response.put("digests", zkServer.getZKDatabase().getDataTree().getDigestLog());
return response;
@@ -322,14 +355,14 @@ public class Commands {
* The current trace mask. Returned map contains:
* - "tracemask": Long
*/
- public static class GetTraceMaskCommand extends CommandBase {
+ public static class GetTraceMaskCommand extends GetCommand {
public GetTraceMaskCommand() {
super(Arrays.asList("get_trace_mask", "gtmk"), false);
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
CommandResponse response = initializeResponse();
response.put("tracemask", ZooTrace.getTextTraceLevel());
return response;
@@ -337,14 +370,14 @@ public class Commands {
}
- public static class InitialConfigurationCommand extends CommandBase {
+ public static class InitialConfigurationCommand extends GetCommand {
public InitialConfigurationCommand() {
super(Arrays.asList("initial_configuration", "icfg"));
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
CommandResponse response = initializeResponse();
response.put("initial_configuration", zkServer.getInitialConfig());
return response;
@@ -356,14 +389,14 @@ public class Commands {
* Is this server in read-only mode. Returned map contains:
* - "is_read_only": Boolean
*/
- public static class IsroCommand extends CommandBase {
+ public static class IsroCommand extends GetCommand {
public IsroCommand() {
super(Arrays.asList("is_read_only", "isro"));
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
CommandResponse response = initializeResponse();
response.put("read_only", zkServer instanceof ReadOnlyZooKeeperServer);
return response;
@@ -380,14 +413,14 @@ public class Commands {
* - "zxid": String
* - "timestamp": Long
*/
- public static class LastSnapshotCommand extends CommandBase {
+ public static class LastSnapshotCommand extends GetCommand {
public LastSnapshotCommand() {
super(Arrays.asList("last_snapshot", "lsnp"));
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
CommandResponse response = initializeResponse();
SnapshotInfo info = zkServer.getTxnLogFactory().getLastSnapshotInfo();
response.put("zxid", Long.toHexString(info == null ? -1L : info.zxid));
@@ -400,14 +433,14 @@ public class Commands {
/**
* Returns the leader status of this instance and the leader host string.
*/
- public static class LeaderCommand extends CommandBase {
+ public static class LeaderCommand extends GetCommand {
public LeaderCommand() {
super(Arrays.asList("leader", "lead"));
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
CommandResponse response = initializeResponse();
if (zkServer instanceof QuorumZooKeeperServer) {
response.put("is_leader", zkServer instanceof LeaderZooKeeperServer);
@@ -450,14 +483,14 @@ public class Commands {
* - "synced_followers": Integer (leader only)
* - "pending_syncs": Integer (leader only)
*/
- public static class MonitorCommand extends CommandBase {
+ public static class MonitorCommand extends GetCommand {
public MonitorCommand() {
super(Arrays.asList("monitor", "mntr"), false);
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
CommandResponse response = initializeResponse();
zkServer.dumpMonitorValues(response::put);
ServerMetrics.getMetrics().getMetricsProvider().dump(response::put);
@@ -470,14 +503,14 @@ public class Commands {
/**
* Reset all observer connection statistics.
*/
- public static class ObserverCnxnStatResetCommand extends CommandBase {
+ public static class ObserverCnxnStatResetCommand extends GetCommand {
public ObserverCnxnStatResetCommand() {
super(Arrays.asList("observer_connection_stat_reset", "orst"));
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
CommandResponse response = initializeResponse();
if (zkServer instanceof LeaderZooKeeperServer) {
Leader leader = ((LeaderZooKeeperServer) zkServer).getLeader();
@@ -492,16 +525,80 @@ public class Commands {
}
/**
+ * Restore from snapshot on the current server.
+ *
+ * Returned map contains:
+ * - "last_zxid": String
+ */
+ public static class RestoreCommand extends PostCommand {
+ static final String RESPONSE_DATA_LAST_ZXID = "last_zxid";
+
+ static final String ADMIN_RESTORE_ENABLED = "zookeeper.admin.restore.enabled";
+
+
+ private RateLimiter rateLimiter;
+
+ public RestoreCommand() {
+ super(Arrays.asList("restore", "rest"));
+ rateLimiter = new RateLimiter(1, rateLimiterInterval, TimeUnit.MICROSECONDS);
+ }
+
+ @Override
+ public CommandResponse runPost(final ZooKeeperServer zkServer, final InputStream inputStream) {
+ final CommandResponse response = initializeResponse();
+
+ // check feature flag
+ final boolean restoreEnabled = Boolean.parseBoolean(System.getProperty(ADMIN_RESTORE_ENABLED, "false"));
+ if (!restoreEnabled) {
+ response.setStatusCode(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
+ LOG.warn("Restore command is disabled");
+ return response;
+ }
+
+ if (!zkServer.isSerializeLastProcessedZxidEnabled()) {
+ response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ LOG.warn("Restore command requires serializeLastProcessedZxidEnable flag is set to true");
+ return response;
+ }
+
+ if (inputStream == null){
+ response.setStatusCode(HttpServletResponse.SC_BAD_REQUEST);
+ LOG.warn("InputStream from restore request is null");
+ return response;
+ }
+
+ // check rate limiting
+ if (!rateLimiter.allow()) {
+ response.setStatusCode(HttpStatus.TOO_MANY_REQUESTS_429);
+ ServerMetrics.getMetrics().RESTORE_RATE_LIMITED_COUNT.add(1);
+ LOG.warn("Restore request was rate limited");
+ return response;
+ }
+
+ // restore from snapshot InputStream
+ try {
+ final long lastZxid = zkServer.restoreFromSnapshot(inputStream);
+ response.put(RESPONSE_DATA_LAST_ZXID, lastZxid);
+ } catch (final Exception e) {
+ response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ ServerMetrics.getMetrics().RESTORE_ERROR_COUNT.add(1);
+ LOG.warn("Exception occurred when restore snapshot via the restore command", e);
+ }
+ return response;
+ }
+ }
+
+ /**
* No-op command, check if the server is running
*/
- public static class RuokCommand extends CommandBase {
+ public static class RuokCommand extends GetCommand {
public RuokCommand() {
super(Arrays.asList("ruok"));
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
return initializeResponse();
}
@@ -513,14 +610,14 @@ public class Commands {
* Returned Map contains:
* - "tracemask": Long
*/
- public static class SetTraceMaskCommand extends CommandBase {
+ public static class SetTraceMaskCommand extends GetCommand {
public SetTraceMaskCommand() {
super(Arrays.asList("set_trace_mask", "stmk"), false);
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
CommandResponse response = initializeResponse();
long traceMask;
if (!kwargs.containsKey("traceMask")) {
@@ -551,28 +648,25 @@ public class Commands {
* - "last_zxid": String
* - "snapshot_size": String
*/
- public static class SnapshotCommand extends CommandBase {
+ public static class SnapshotCommand extends GetCommand {
static final String REQUEST_QUERY_PARAM_STREAMING = "streaming";
static final String RESPONSE_HEADER_LAST_ZXID = "last_zxid";
static final String RESPONSE_HEADER_SNAPSHOT_SIZE = "snapshot_size";
static final String ADMIN_SNAPSHOT_ENABLED = "zookeeper.admin.snapshot.enabled";
- static final String ADMIN_SNAPSHOT_INTERVAL = "zookeeper.admin.snapshot.intervalInMS";
-
- private static final long snapshotInterval = Integer.parseInt(System.getProperty(ADMIN_SNAPSHOT_INTERVAL, "300000"));
private final RateLimiter rateLimiter;
public SnapshotCommand() {
super(Arrays.asList("snapshot", "snap"));
- rateLimiter = new RateLimiter(1, snapshotInterval, TimeUnit.MICROSECONDS);
+ rateLimiter = new RateLimiter(1, rateLimiterInterval, TimeUnit.MICROSECONDS);
}
@SuppressFBWarnings(value = "OBL_UNSATISFIED_OBLIGATION",
justification = "FileInputStream is passed to CommandResponse and closed in StreamOutputter")
@Override
- public CommandResponse run(final ZooKeeperServer zkServer, final Map<String, String> kwargs) {
+ public CommandResponse runGet(final ZooKeeperServer zkServer, final Map<String, String> kwargs) {
final CommandResponse response = initializeResponse();
// check feature flag
@@ -637,7 +731,7 @@ public class Commands {
* - "server_stats": ServerStats object
* - "node_count": Integer
*/
- public static class SrvrCommand extends CommandBase {
+ public static class SrvrCommand extends GetCommand {
public SrvrCommand() {
super(Arrays.asList("server_stats", "srvr"));
@@ -649,7 +743,7 @@ public class Commands {
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
CommandResponse response = initializeResponse();
LOG.info("running stat");
response.put("version", Version.getFullVersion());
@@ -676,8 +770,8 @@ public class Commands {
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
- CommandResponse response = super.run(zkServer, kwargs);
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ CommandResponse response = super.runGet(zkServer, kwargs);
final Iterable<Map<String, Object>> connections;
if (zkServer.getServerCnxnFactory() != null) {
@@ -702,14 +796,14 @@ public class Commands {
/**
* Resets server statistics.
*/
- public static class StatResetCommand extends CommandBase {
+ public static class StatResetCommand extends GetCommand {
public StatResetCommand() {
super(Arrays.asList("stat_reset", "srst"));
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
CommandResponse response = initializeResponse();
zkServer.serverStats().reset();
return response;
@@ -723,14 +817,14 @@ public class Commands {
* - "observers": list of observer learner handler info objects (leader/follower only)
* @see org.apache.zookeeper.server.quorum.LearnerHandler#getLearnerHandlerInfo()
*/
- public static class SyncedObserverConsCommand extends CommandBase {
+ public static class SyncedObserverConsCommand extends GetCommand {
public SyncedObserverConsCommand() {
super(Arrays.asList("observers", "obsr"));
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
CommandResponse response = initializeResponse();
@@ -760,14 +854,14 @@ public class Commands {
/**
* All defined system properties.
*/
- public static class SystemPropertiesCommand extends CommandBase {
+ public static class SystemPropertiesCommand extends GetCommand {
public SystemPropertiesCommand() {
super(Arrays.asList("system_properties", "sysp"), false);
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
CommandResponse response = initializeResponse();
Properties systemProperties = System.getProperties();
SortedMap<String, String> sortedSystemProperties = new TreeMap<>();
@@ -782,14 +876,14 @@ public class Commands {
* Returns the current ensemble configuration information.
* It provides list of current voting members in the ensemble.
*/
- public static class VotingViewCommand extends CommandBase {
+ public static class VotingViewCommand extends GetCommand {
public VotingViewCommand() {
super(Arrays.asList("voting_view"));
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
CommandResponse response = initializeResponse();
if (zkServer instanceof QuorumZooKeeperServer) {
QuorumPeer peer = ((QuorumZooKeeperServer) zkServer).self;
@@ -850,14 +944,14 @@ public class Commands {
* @see DataTree#getWatches()
* @see DataTree#getWatches()
*/
- public static class WatchCommand extends CommandBase {
+ public static class WatchCommand extends GetCommand {
public WatchCommand() {
super(Arrays.asList("watches", "wchc"));
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
DataTree dt = zkServer.getZKDatabase().getDataTree();
CommandResponse response = initializeResponse();
response.put("session_id_to_watched_paths", dt.getWatches().toMap());
@@ -871,14 +965,14 @@ public class Commands {
* - "path_to_session_ids": Map&lt;String, Set&lt;Long&gt;&gt; path -&gt; session IDs of sessions watching path
* @see DataTree#getWatchesByPath()
*/
- public static class WatchesByPathCommand extends CommandBase {
+ public static class WatchesByPathCommand extends GetCommand {
public WatchesByPathCommand() {
super(Arrays.asList("watches_by_path", "wchp"));
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
DataTree dt = zkServer.getZKDatabase().getDataTree();
CommandResponse response = initializeResponse();
response.put("path_to_session_ids", dt.getWatchesByPath().toMap());
@@ -891,14 +985,14 @@ public class Commands {
* Summarized watch information.
* @see DataTree#getWatchesSummary()
*/
- public static class WatchSummaryCommand extends CommandBase {
+ public static class WatchSummaryCommand extends GetCommand {
public WatchSummaryCommand() {
super(Arrays.asList("watch_summary", "wchs"));
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
DataTree dt = zkServer.getZKDatabase().getDataTree();
CommandResponse response = initializeResponse();
response.putAll(dt.getWatchesSummary().toMap());
@@ -911,14 +1005,14 @@ public class Commands {
* Returns the current phase of Zab protocol that peer is running.
* It can be in one of these phases: ELECTION, DISCOVERY, SYNCHRONIZATION, BROADCAST
*/
- public static class ZabStateCommand extends CommandBase {
+ public static class ZabStateCommand extends GetCommand {
public ZabStateCommand() {
super(Arrays.asList("zabstate"), false);
}
@Override
- public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
CommandResponse response = initializeResponse();
if (zkServer instanceof QuorumZooKeeperServer) {
QuorumPeer peer = ((QuorumZooKeeperServer) zkServer).self;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/GetCommand.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/GetCommand.java
new file mode 100644
index 000000000..509cad72e
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/GetCommand.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.admin;
+
+import java.io.InputStream;
+import java.util.List;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+/**
+ * Command that represents HTTP GET request
+ */
+
+public abstract class GetCommand extends CommandBase {
+
+ protected GetCommand(List<String> names) {
+ super(names);
+ }
+
+ protected GetCommand(List<String> names, boolean serverRequired) {
+ super(names, serverRequired);
+ }
+
+ @Override
+ public CommandResponse runPost(ZooKeeperServer zkServer, InputStream inputStream) {
+ return null;
+ }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java
index 3c82f8552..effececdc 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java
@@ -236,6 +236,7 @@ public class JettyAdminServer implements AdminServer {
private static final long serialVersionUID = 1L;
+ @Override
protected void doGet(
HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
@@ -260,7 +261,7 @@ public class JettyAdminServer implements AdminServer {
}
// Run the command
- final CommandResponse cmdResponse = Commands.runCommand(cmd, zkServer, kwargs);
+ final CommandResponse cmdResponse = Commands.runGetCommand(cmd, zkServer, kwargs);
response.setStatus(cmdResponse.getStatusCode());
final Map<String, String> headers = cmdResponse.getHeaders();
@@ -280,6 +281,60 @@ public class JettyAdminServer implements AdminServer {
outputter.output(cmdResponse, response.getOutputStream());
}
}
+
+ /**
+ * Serves HTTP POST requests. It reads request payload as raw data.
+ * It's up to each command to process the payload accordingly.
+ * For example, RestoreCommand uses the payload InputStream directly
+ * to read snapshot data.
+ */
+ @Override
+ protected void doPost(final HttpServletRequest request,
+ final HttpServletResponse response) throws ServletException, IOException {
+ final String cmdName = extractCommandNameFromURL(request, response);
+ if (cmdName != null) {
+ final CommandResponse cmdResponse = Commands.runPostCommand(cmdName, zkServer, request.getInputStream());
+ final String clientIP = IPAuthenticationProvider.getClientIPAddress(request);
+ sendJSONResponse(response, cmdResponse, clientIP);
+ }
+ }
+
+ /**
+ * Extracts the command name from URL if it exists otherwise null
+ */
+ private String extractCommandNameFromURL(final HttpServletRequest request,
+ final HttpServletResponse response) throws IOException {
+ String cmd = request.getPathInfo();
+ if (cmd == null || cmd.equals("/")) {
+ printCommandLinks(response);
+ return null;
+ }
+ // Strip leading "/"
+ return cmd.substring(1);
+ }
+
+ /**
+ * Prints the list of URLs to each registered command as response.
+ */
+ private void printCommandLinks(final HttpServletResponse response) throws IOException {
+ for (final String link : commandLinks()) {
+ response.getWriter().println(link);
+ response.getWriter().println("<br/>");
+ }
+ }
+
+ /**
+ * Send JSON string as the response.
+ */
+ private void sendJSONResponse(final HttpServletResponse response,
+ final CommandResponse cmdResponse,
+ final String clientIP) throws IOException {
+ final CommandOutputter outputter = new JsonOutputter(clientIP);
+
+ response.setStatus(cmdResponse.getStatusCode());
+ response.setContentType(outputter.getContentType());
+ outputter.output(cmdResponse, response.getWriter());
+ }
}
/**
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/PostCommand.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/PostCommand.java
new file mode 100644
index 000000000..cb346b2e0
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/PostCommand.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Command that represents HTTP POST request
+ */
+package org.apache.zookeeper.server.admin;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+public abstract class PostCommand extends CommandBase {
+ protected PostCommand(List<String> names) {
+ super(names);
+ }
+
+ @Override
+ public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ return null;
+ }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
index 1a91d1c30..f162f0cc6 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
@@ -99,7 +99,7 @@ public class FileSnap implements SnapShot {
SnapStream.checkSealIntegrity(snapIS, ia);
}
- // deserialize the last processed zxid and check the intact
+ // deserialize lastProcessedZxid and check inconsistency
if (dt.deserializeLastProcessedZxid(ia)) {
SnapStream.checkSealIntegrity(snapIS, ia);
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java
index 9bc8ee519..847b12c28 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java
@@ -170,7 +170,7 @@ public class SnapStream {
* the checkSum of the content.
*
*/
- static void checkSealIntegrity(CheckedInputStream is, InputArchive ia) throws IOException {
+ public static void checkSealIntegrity(CheckedInputStream is, InputArchive ia) throws IOException {
long checkSum = is.getChecksum().getValue();
long val = ia.readLong("val");
ia.readString("path"); // Read and ignore "/" written by SealStream.
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
index 826a875b4..8d9430e1c 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
@@ -72,8 +72,8 @@ public class ZKTestCase {
// same port.
System.setProperty("zookeeper.admin.enableServer", "false");
- // disable rate limiting on the snapshot admin API
- System.setProperty("zookeeper.admin.snapshot.intervalInMS", "0");
+ // disable rate limiting
+ System.setProperty("zookeeper.admin.rateLimiterIntervalInMS", "0");
// ZOOKEEPER-2693 disables all 4lw by default.
// Here we enable the 4lw which ZooKeeper tests depends.
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZookeeperServerRestoreTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZookeeperServerRestoreTest.java
new file mode 100644
index 000000000..85f514dd7
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZookeeperServerRestoreTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server;
+
+import static org.apache.zookeeper.server.persistence.FileSnap.SNAPSHOT_FILE_PREFIX;
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.File;
+import java.util.Set;
+import java.util.zip.CheckedInputStream;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.persistence.SnapStream;
+import org.apache.zookeeper.server.persistence.Util;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class ZookeeperServerRestoreTest extends ZKTestCase {
+ private static final String BASE_PATH = "/restoreFromSnapshotTest";
+ private static final int NODE_COUNT = 10;
+ private static final String HOST_PORT = "127.0.0.1:" + PortAssignment.unique();
+
+ @TempDir
+ static File dataDir;
+
+ @TempDir
+ static File logDir;
+
+ @Test
+ public void testRestoreFromSnapshot() throws Exception {
+ ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true);
+
+ final ZooKeeperServer zks = new ZooKeeperServer(dataDir, logDir, 3000);
+ final int port = Integer.parseInt(HOST_PORT.split(":")[1]);
+ final ServerCnxnFactory serverCnxnFactory = ServerCnxnFactory.createFactory(port, -1);
+
+ ZooKeeper zk1 = null;
+ ZooKeeper zk2 = null;
+ ZooKeeper zk3 = null;
+
+ try {
+ // start the server
+ serverCnxnFactory.startup(zks);
+ assertTrue(ClientBase.waitForServerUp(HOST_PORT, CONNECTION_TIMEOUT));
+
+ // zk1 create test data
+ zk1 = ClientBase.createZKClient(HOST_PORT);
+ for (int i = 0; i < NODE_COUNT; i++) {
+ final String path = BASE_PATH + "-" + i;
+ zk1.create(path, String.valueOf(i).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+
+ // take Snapshot
+ final File snapshotFile = zks.takeSnapshot(false, false, true);
+ final long lastZxidFromSnapshot = Util.getZxidFromName(snapshotFile.getName(), SNAPSHOT_FILE_PREFIX);
+
+ // zk2 create more test data after snapshotting
+ zk2 = ClientBase.createZKClient(HOST_PORT);
+ for (int i = NODE_COUNT; i < NODE_COUNT * 2; i++) {
+ final String path = BASE_PATH + "-" + i;
+ zk2.create(path, String.valueOf(i).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+
+ // restore from snapshot
+ try (final CheckedInputStream is = SnapStream.getInputStream(snapshotFile)) {
+ final long lastZxidFromRestore = zks.restoreFromSnapshot(is);
+
+ // validate the last processed zxid
+ assertEquals(lastZxidFromSnapshot, lastZxidFromRestore);
+
+ // validate restored data only contains data from snapshot
+ zk3 = ClientBase.createZKClient(HOST_PORT);
+ for (int i = 0; i < NODE_COUNT; i++) {
+ final String path = BASE_PATH + "-" + i;
+ final String expectedData = String.valueOf(i);
+ assertArrayEquals(expectedData.getBytes(), zk3.getData(path, null, null));
+ }
+ assertEquals(NODE_COUNT + 3, zk3.getAllChildrenNumber("/"));
+
+ // validate sessions
+ final SessionTracker sessionTracker = zks.getSessionTracker();
+ final Set<Long> globalSessions = sessionTracker.globalSessions();
+ assertEquals(2, globalSessions.size());
+ assertTrue(globalSessions.contains(zk1.getSessionId()));
+ Assertions.assertFalse(globalSessions.contains(zk2.getSessionId()));
+ assertTrue(globalSessions.contains(zk3.getSessionId()));
+
+ // validate ZookeeperServer state
+ assertEquals(ZooKeeperServer.State.RUNNING, zks.state);
+
+ // validate being able to create more data after restore
+ zk3.create(BASE_PATH + "_" + "after", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ assertEquals(NODE_COUNT + 4, zk3.getAllChildrenNumber("/"));
+ }
+ } finally {
+ System.clearProperty("zookeeper.serializeLastProcessedZxid.enabled");
+
+ if (zk1 != null) {
+ zk1.close();
+ }
+ if (zk2 != null) {
+ zk2.close();
+ }
+ if (zk3 != null) {
+ zk3.close();
+ }
+
+ zks.shutdown();
+ serverCnxnFactory.shutdown();
+ }
+ }
+
+ @Test
+ public void testRestoreFromSnapshot_nulInputStream() throws Exception {
+ final ZooKeeperServer zks = new ZooKeeperServer(dataDir, logDir, 3000);
+ assertThrows(IllegalArgumentException.class, () -> zks.restoreFromSnapshot(null));
+ }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/TakeSnapshotTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZookeeperServerSnapshotTest.java
index a9953b7c0..c9d74a4cb 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/TakeSnapshotTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZookeeperServerSnapshotTest.java
@@ -16,26 +16,26 @@
* limitations under the License.
*/
-package org.apache.zookeeper;
+package org.apache.zookeeper.server;
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.File;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.test.ClientBase;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-public class TakeSnapshotTest extends ClientBase {
+public class ZookeeperServerSnapshotTest extends ZKTestCase {
private static final String BASE_PATH = "/takeSnapshotTest";
- private static final int NODE_COUNT = 100;
- private static final String HOSTPORT = "127.0.0.1:" + PortAssignment.unique();
- private ZooKeeper zk;
+ private static final int NODE_COUNT = 10;
+ private static final String HOST_PORT = "127.0.0.1:" + PortAssignment.unique();
@TempDir
static File dataDir;
@@ -43,32 +43,19 @@ public class TakeSnapshotTest extends ClientBase {
@TempDir
static File logDir;
-
- @BeforeEach
- public void setUp() throws Exception {
- super.setUp();
- ClientBase.setupTestEnv();
- }
-
- @AfterEach
- public void tearDown() throws Exception {
- if (zk != null) {
- zk.close();
- }
- }
-
@Test
- public void testTakeSnapshotAndRestore() throws Exception {
+ public void testTakeSnapshot() throws Exception {
ZooKeeperServer zks = new ZooKeeperServer(dataDir, logDir, 3000);
ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true);
- final int port = Integer.parseInt(HOSTPORT.split(":")[1]);
+ final int port = Integer.parseInt(HOST_PORT.split(":")[1]);
final ServerCnxnFactory serverCnxnFactory = ServerCnxnFactory.createFactory(port, -1);
- serverCnxnFactory.startup(zks);
- assertTrue(ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
+ ZooKeeper zk = null;
+ try {
+ serverCnxnFactory.startup(zks);
+ assertTrue(ClientBase.waitForServerUp(HOST_PORT, CONNECTION_TIMEOUT));
- try {
- zk = ClientBase.createZKClient(HOSTPORT);
+ zk = ClientBase.createZKClient(HOST_PORT);
for (int i = 0; i < NODE_COUNT; i++) {
final String path = BASE_PATH + "-" + i;
zk.create(path, String.valueOf(i).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
@@ -81,14 +68,14 @@ public class TakeSnapshotTest extends ClientBase {
zk.close();
zks.shutdown();
- // start server again and assert the data restored from snapshot
+ // restart server and assert the data restored from snapshot
zks = new ZooKeeperServer(dataDir, logDir, 3000);
ZooKeeperServer.setSerializeLastProcessedZxidEnabled(false);
serverCnxnFactory.startup(zks);
- assertTrue(ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
+ assertTrue(ClientBase.waitForServerUp(HOST_PORT, CONNECTION_TIMEOUT));
- zk = ClientBase.createZKClient(HOSTPORT);
+ zk = ClientBase.createZKClient(HOST_PORT);
for (int i = 0; i < NODE_COUNT; i++) {
final String path = BASE_PATH + "-" + i;
final String expectedData = String.valueOf(i);
@@ -96,6 +83,10 @@ public class TakeSnapshotTest extends ClientBase {
}
assertEquals(NODE_COUNT + 3, zk.getAllChildrenNumber("/"));
} finally {
+ if (zk != null) {
+ zk.close();
+ }
+
zks.shutdown();
serverCnxnFactory.shutdown();
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java
index 2d74776e3..c60ae868d 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java
@@ -19,8 +19,9 @@
package org.apache.zookeeper.server.admin;
import static org.apache.zookeeper.server.ZooKeeperServer.ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED;
+import static org.apache.zookeeper.server.admin.Commands.ADMIN_RATE_LIMITER_INTERVAL;
+import static org.apache.zookeeper.server.admin.Commands.RestoreCommand.ADMIN_RESTORE_ENABLED;
import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_ENABLED;
-import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_INTERVAL;
import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.REQUEST_QUERY_PARAM_STREAMING;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
@@ -30,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@@ -57,6 +59,8 @@ public class CommandsTest extends ClientBase {
* - the primary name of the command
* @param kwargs
* - keyword arguments to the command
+ * @param inputStream
+ * - InputStream to the command
* @param expectedHeaders
* - expected HTTP response headers
* @param expectedStatusCode
@@ -66,11 +70,12 @@ public class CommandsTest extends ClientBase {
* @throws IOException
* @throws InterruptedException
*/
- private void testCommand(String cmdName, Map<String, String> kwargs,
+ private void testCommand(String cmdName, Map<String, String> kwargs, InputStream inputStream,
Map<String, String> expectedHeaders, int expectedStatusCode,
Field... fields) throws IOException, InterruptedException {
ZooKeeperServer zks = serverFactory.getZooKeeperServer();
- final CommandResponse commandResponse = Commands.runCommand(cmdName, zks, kwargs);
+ final CommandResponse commandResponse = inputStream == null
+ ? Commands.runGetCommand(cmdName, zks, kwargs) : Commands.runPostCommand(cmdName, zks, inputStream);
assertNotNull(commandResponse);
assertEquals(expectedStatusCode, commandResponse.getStatusCode());
try (final InputStream responseStream = commandResponse.getInputStream()) {
@@ -102,7 +107,7 @@ public class CommandsTest extends ClientBase {
}
public void testCommand(String cmdName, Field... fields) throws IOException, InterruptedException {
- testCommand(cmdName, new HashMap<String, String>(), new HashMap<>(), HttpServletResponse.SC_OK, fields);
+ testCommand(cmdName, new HashMap<String, String>(), null, new HashMap<>(), HttpServletResponse.SC_OK, fields);
}
private static class Field {
@@ -116,16 +121,6 @@ public class CommandsTest extends ClientBase {
}
@Test
- public void testSnapshot_streaming() throws IOException, InterruptedException {
- testSnapshot(true);
- }
-
- @Test
- public void testSnapshot_nonStreaming() throws IOException, InterruptedException {
- testSnapshot(false);
- }
-
- @Test
public void testConfiguration() throws IOException, InterruptedException {
testCommand("configuration", new Field("client_port", Integer.class), new Field("data_dir", String.class), new Field("data_log_dir", String.class), new Field("tick_time", Integer.class), new Field("max_client_cnxns", Integer.class), new Field("min_session_timeout", Integer.class), new Field("max_session_timeout", Integer.class), new Field("server_id", Long.class), new Field("client_port_listen_backlog", Integer.class));
}
@@ -231,6 +226,45 @@ public class CommandsTest extends ClientBase {
}
@Test
+ public void testRestore_invalidInputStream() throws IOException, InterruptedException {
+ setupForRestoreCommand();
+
+ try (final InputStream inputStream = new ByteArrayInputStream("Invalid snapshot data".getBytes())){
+ final Map<String, String> kwargs = new HashMap<>();
+ final Map<String, String> expectedHeaders = new HashMap<>();
+ testCommand("restore", kwargs, inputStream, expectedHeaders, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ } finally {
+ clearForRestoreCommand();
+ }
+ }
+
+ @Test
+ public void testRestore_nullInputStream() {
+ setupForRestoreCommand();
+ final ZooKeeperServer zks = serverFactory.getZooKeeperServer();
+ try {
+ final CommandResponse commandResponse = Commands.runPostCommand("restore", zks, null);
+ assertNotNull(commandResponse);
+ assertEquals(HttpServletResponse.SC_BAD_REQUEST, commandResponse.getStatusCode());
+ } finally {
+ clearForRestoreCommand();
+ if (zks != null) {
+ zks.shutdown();
+ }
+ }
+ }
+
+ @Test
+ public void testSnapshot_streaming() throws IOException, InterruptedException {
+ testSnapshot(true);
+ }
+
+ @Test
+ public void testSnapshot_nonStreaming() throws IOException, InterruptedException {
+ testSnapshot(false);
+ }
+
+ @Test
public void testServerStats() throws IOException, InterruptedException {
testCommand("server_stats", new Field("version", String.class), new Field("read_only", Boolean.class), new Field("server_stats", ServerStats.class), new Field("node_count", Integer.class), new Field("client_response", BufferStats.class));
}
@@ -239,7 +273,7 @@ public class CommandsTest extends ClientBase {
public void testSetTraceMask() throws IOException, InterruptedException {
Map<String, String> kwargs = new HashMap<String, String>();
kwargs.put("traceMask", "1");
- testCommand("set_trace_mask", kwargs, new HashMap<>(), HttpServletResponse.SC_OK, new Field("tracemask", Long.class));
+ testCommand("set_trace_mask", kwargs, null, new HashMap<>(), HttpServletResponse.SC_OK, new Field("tracemask", Long.class));
}
@Test
@@ -289,7 +323,7 @@ public class CommandsTest extends ClientBase {
when(zkServer.getSecureServerCnxnFactory()).thenReturn(cnxnFactory);
// Act
- CommandResponse response = cmd.run(zkServer, null);
+ CommandResponse response = cmd.runGet(zkServer, null);
// Assert
assertThat(response.toMap().containsKey("connections"), is(true));
@@ -313,7 +347,7 @@ public class CommandsTest extends ClientBase {
when(zkServer.getZKDatabase()).thenReturn(zkDatabase);
when(zkDatabase.getNodeCount()).thenReturn(0);
- CommandResponse response = cmd.run(zkServer, null);
+ CommandResponse response = cmd.runGet(zkServer, null);
assertThat(response.toMap().containsKey("connections"), is(true));
assertThat(response.toMap().containsKey("secure_connections"), is(true));
@@ -321,7 +355,7 @@ public class CommandsTest extends ClientBase {
private void testSnapshot(final boolean streaming) throws IOException, InterruptedException {
System.setProperty(ADMIN_SNAPSHOT_ENABLED, "true");
- System.setProperty(ADMIN_SNAPSHOT_INTERVAL, "0");
+ System.setProperty(ADMIN_RATE_LIMITER_INTERVAL, "0");
System.setProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true");
try {
final Map<String, String> kwargs = new HashMap<>();
@@ -329,12 +363,23 @@ public class CommandsTest extends ClientBase {
final Map<String, String> expectedHeaders = new HashMap<>();
expectedHeaders.put(Commands.SnapshotCommand.RESPONSE_HEADER_LAST_ZXID, "0x0");
expectedHeaders.put(Commands.SnapshotCommand.RESPONSE_HEADER_SNAPSHOT_SIZE, "478");
- testCommand("snapshot", kwargs, expectedHeaders, HttpServletResponse.SC_OK);
+ testCommand("snapshot", kwargs, null, expectedHeaders, HttpServletResponse.SC_OK);
} finally {
System.clearProperty(ADMIN_SNAPSHOT_ENABLED);
- System.clearProperty(ADMIN_SNAPSHOT_INTERVAL);
+ System.clearProperty(ADMIN_RATE_LIMITER_INTERVAL);
System.clearProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED);
}
}
+ private void setupForRestoreCommand() {
+ System.setProperty(ADMIN_RESTORE_ENABLED, "true");
+ System.setProperty(ADMIN_RATE_LIMITER_INTERVAL, "0");
+ System.setProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true");
+ }
+
+ private void clearForRestoreCommand() {
+ System.clearProperty(ADMIN_RESTORE_ENABLED);
+ System.clearProperty(ADMIN_RATE_LIMITER_INTERVAL);
+ System.clearProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED);
+ }
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java
index b06cde6fd..41f00751a 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java
@@ -63,7 +63,7 @@ public class JettyAdminServerTest extends ZKTestCase {
static final String URL_FORMAT = "http://localhost:%d/commands";
static final String HTTPS_URL_FORMAT = "https://localhost:%d/commands";
- static final int jettyAdminPort = PortAssignment.unique();
+ private final int jettyAdminPort = PortAssignment.unique();
@BeforeEach
public void enableServer() {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotCommandTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotAndRestoreCommandTest.java
index 5b5dbd979..f7d357e72 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotCommandTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotAndRestoreCommandTest.java
@@ -19,20 +19,22 @@
package org.apache.zookeeper.server.admin;
import static org.apache.zookeeper.server.ZooKeeperServer.ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED;
+import static org.apache.zookeeper.server.admin.Commands.ADMIN_RATE_LIMITER_INTERVAL;
+import static org.apache.zookeeper.server.admin.Commands.RestoreCommand.ADMIN_RESTORE_ENABLED;
import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_ENABLED;
-import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_INTERVAL;
import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.REQUEST_QUERY_PARAM_STREAMING;
import static org.apache.zookeeper.server.admin.JettyAdminServerTest.URL_FORMAT;
-import static org.apache.zookeeper.server.admin.JettyAdminServerTest.jettyAdminPort;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.URL;
@@ -40,6 +42,11 @@ import java.net.URLEncoder;
import java.nio.file.Files;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.CheckedInputStream;
import javax.servlet.http.HttpServletResponse;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -48,8 +55,11 @@ import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.common.IOUtils;
+import org.apache.zookeeper.metrics.MetricsUtils;
import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.SnapStream;
import org.apache.zookeeper.test.ClientBase;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -60,13 +70,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class SnapshotCommandTest extends ZKTestCase {
- private static final Logger LOG = LoggerFactory.getLogger(SnapshotCommandTest.class);
+public class SnapshotAndRestoreCommandTest extends ZKTestCase {
+ private static final Logger LOG = LoggerFactory.getLogger(SnapshotAndRestoreCommandTest.class);
- private static final String PATH = "/snapshot_test";
+ private static final String SNAPSHOT_TEST_PATH = "/snapshot_test";
private static final int NODE_COUNT = 10;
- private final String hostPort = "127.0.0.1:" + PortAssignment.unique();
+ private final String hostPort = "127.0.0.1:" + PortAssignment.unique();
+ private final int jettyAdminPort = PortAssignment.unique();
private ServerCnxnFactory cnxnFactory;
private JettyAdminServer adminServer;
private ZooKeeperServer zks;
@@ -91,9 +102,10 @@ public class SnapshotCommandTest extends ZKTestCase {
// start AdminServer
System.setProperty("zookeeper.admin.enableServer", "true");
System.setProperty("zookeeper.admin.serverPort", "" + jettyAdminPort);
+ System.setProperty(ADMIN_RATE_LIMITER_INTERVAL, "0");
System.setProperty(ADMIN_SNAPSHOT_ENABLED, "true");
- System.setProperty(ADMIN_SNAPSHOT_INTERVAL, "0");
System.setProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true");
+ System.setProperty(ADMIN_RESTORE_ENABLED, "true");
adminServer = new JettyAdminServer();
adminServer.setZooKeeperServer(zks);
@@ -101,7 +113,7 @@ public class SnapshotCommandTest extends ZKTestCase {
// create Zookeeper client and test data
zk = ClientBase.createZKClient(hostPort);
- createData(zk, NODE_COUNT);
+ createData(zk, SNAPSHOT_TEST_PATH, NODE_COUNT);
}
@AfterAll
@@ -109,9 +121,10 @@ public class SnapshotCommandTest extends ZKTestCase {
System.clearProperty("zookeeper.4lw.commands.whitelist");
System.clearProperty("zookeeper.admin.enableServer");
System.clearProperty("zookeeper.admin.serverPort");
+ System.clearProperty(ADMIN_RATE_LIMITER_INTERVAL);
System.clearProperty(ADMIN_SNAPSHOT_ENABLED);
- System.clearProperty(ADMIN_SNAPSHOT_INTERVAL);
System.clearProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED);
+ System.clearProperty(ADMIN_RESTORE_ENABLED);
if (zk != null) {
zk.close();
@@ -131,19 +144,85 @@ public class SnapshotCommandTest extends ZKTestCase {
}
@Test
- public void testSnapshotCommand_streaming() throws Exception {
- // take snapshot with streaming
- final HttpURLConnection snapshotConn = sendSnapshotRequest(true);
+ public void testSnapshotAndRestoreCommand_streaming() throws Exception {
+ ServerMetrics.getMetrics().resetAll();
- // validate snapshot response
- assertEquals(HttpURLConnection.HTTP_OK, snapshotConn.getResponseCode());
- validateResponseHeaders(snapshotConn);
- final File snapshotFile = new File(dataDir + "/snapshot." + System.currentTimeMillis());
- try (final InputStream inputStream = snapshotConn.getInputStream();
- final FileOutputStream outputStream = new FileOutputStream(snapshotFile)) {
- IOUtils.copyBytes(inputStream, outputStream, 1024, true);
- final long fileSize = Files.size(snapshotFile.toPath());
- assertTrue(fileSize > 0);
+ // take snapshot with streaming and validate
+ final File snapshotFile = takeSnapshotAndValidate();
+
+ // validate snapshot metrics
+ validateSnapshotMetrics();
+
+ // restore from snapshot and validate
+ performRestoreAndValidate(snapshotFile);
+
+ // validate creating data after restore
+ try (final ZooKeeper zk = ClientBase.createZKClient(hostPort)) {
+ createData(zk, SNAPSHOT_TEST_PATH, NODE_COUNT + 1);
+ assertEquals(NODE_COUNT + NODE_COUNT + 1, zk.getAllChildrenNumber(SNAPSHOT_TEST_PATH));
+ }
+
+ // validate restore metrics
+ validateRestoreMetrics();
+ }
+
+ @Test
+ public void testClientRequest_restoreInProgress() throws Exception {
+ final int threadCount = 2;
+ final int nodeCount = 50;
+ final String restoreTestPath = "/restore_test";
+
+ // take snapshot
+ final File snapshotFile = takeSnapshotAndValidate();
+
+ final ExecutorService service = Executors.newFixedThreadPool(threadCount);
+ final CountDownLatch latch = new CountDownLatch(threadCount);
+ final AtomicBoolean createSucceeded = new AtomicBoolean(false);
+ final AtomicBoolean restoreSucceeded = new AtomicBoolean(false);
+
+ // thread 1 creates data
+ service.submit(() -> {
+ try {
+ createData(zk, restoreTestPath, nodeCount);
+ createSucceeded.set(true);
+ } catch (final Exception e) {
+ LOG.error(e.getMessage());
+ e.printStackTrace();
+ } finally {
+ latch.countDown();
+ }
+ });
+
+ // thread 2 performs restore operation
+ service.submit(() -> {
+ try {
+ performRestoreAndValidate(snapshotFile);
+ restoreSucceeded.set(true);
+ } catch (final Exception e) {
+ LOG.error(e.getMessage());
+ e.printStackTrace();
+ } finally {
+ latch.countDown();
+ }
+ });
+
+ // wait for operations completed
+ latch.await();
+
+ // validate all client requests succeeded
+ if (createSucceeded.get() && restoreSucceeded.get()) {
+ assertEquals(nodeCount, zk.getAllChildrenNumber(restoreTestPath));
+ }
+ }
+
+ @Test
+ public void testRestores() throws Exception {
+ // take snapshot
+ final File snapshotFile = takeSnapshotAndValidate();
+
+ // perform restores
+ for (int i = 0; i < 3; i++) {
+ performRestoreAndValidate(snapshotFile);
}
}
@@ -186,16 +265,47 @@ public class SnapshotCommandTest extends ZKTestCase {
}
}
- private void createData(final ZooKeeper zk, final long count) throws Exception {
+ @Test
+ public void testRestoreCommand_disabled() throws Exception {
+ System.setProperty(ADMIN_RESTORE_ENABLED, "false");
+ try {
+ final HttpURLConnection restoreConn = sendRestoreRequest();
+ assertEquals(HttpServletResponse.SC_SERVICE_UNAVAILABLE, restoreConn.getResponseCode());
+ } finally {
+ System.setProperty(ADMIN_RESTORE_ENABLED, "true");
+ }
+ }
+
+ @Test
+ public void testRestoreCommand_serializeLastZxidDisabled() throws Exception {
+ ZooKeeperServer.setSerializeLastProcessedZxidEnabled(false);
+ try {
+ final HttpURLConnection restoreConn = sendRestoreRequest();
+ assertEquals(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, restoreConn.getResponseCode());
+ } finally {
+ ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true);
+ }
+ }
+
+ @Test
+ public void testRestoreCommand_invalidSnapshotData() throws Exception {
+ final HttpURLConnection restoreConn = sendRestoreRequest();
+ try (final InputStream inputStream = new ByteArrayInputStream("Invalid snapshot data".getBytes());
+ final OutputStream outputStream = restoreConn.getOutputStream()) {
+ IOUtils.copyBytes(inputStream, outputStream, 1024, true);
+ }
+ assertEquals(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, restoreConn.getResponseCode());
+ }
+
+ private void createData(final ZooKeeper zk, final String parentPath, final long count) throws Exception {
try {
- zk.create(PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.create(parentPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (final KeeperException.NodeExistsException ignore) {
// ignore
}
for (int i = 0; i < count; i++) {
- final String processNodePath = zk.create(String.format("%s/%s", PATH, "n_"), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
- LOG.info("Node created. path={}" + processNodePath);
+ zk.create(String.format("%s/%s", parentPath, "n_"), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
@@ -208,6 +318,15 @@ public class SnapshotCommandTest extends ZKTestCase {
return snapshotConn;
}
+ private HttpURLConnection sendRestoreRequest() throws Exception {
+ final URL restoreURL = new URL(String.format(URL_FORMAT + "/restore", jettyAdminPort));
+ final HttpURLConnection restoreConn = (HttpURLConnection) restoreURL.openConnection();
+ restoreConn.setDoOutput(true);
+ restoreConn.setRequestMethod("POST");
+
+ return restoreConn;
+ }
+
private String buildQueryStringForSnapshotCommand(final boolean streaming) throws Exception {
final Map<String, String> parameters = new HashMap<>();
parameters.put(REQUEST_QUERY_PARAM_STREAMING, String.valueOf(streaming));
@@ -254,4 +373,47 @@ public class SnapshotCommandTest extends ZKTestCase {
LOG.info("Response payload: {}", sb);
}
}
+
+ private void validateSnapshotMetrics() {
+ Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
+ assertEquals(0, (long) metrics.get("snapshot_error_count"));
+ assertEquals(0, (long) metrics.get("snapshot_rate_limited_count"));
+ assertTrue((Double) metrics.get("avg_snapshottime") > 0.0);
+ }
+
+ private void validateRestoreMetrics() {
+ Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
+ assertEquals(0, (long) metrics.get("restore_error_count"));
+ assertEquals(0, (long) metrics.get("restore_rate_limited_count"));
+ assertTrue((Double) metrics.get("avg_restore_time") > 0.0);
+ }
+
+ private File takeSnapshotAndValidate() throws Exception {
+ // take snapshot with streaming
+ final HttpURLConnection snapshotConn = sendSnapshotRequest(true);
+
+ // validate snapshot response
+ assertEquals(HttpURLConnection.HTTP_OK, snapshotConn.getResponseCode());
+ validateResponseHeaders(snapshotConn);
+ final File snapshotFile = new File(dataDir + "/snapshot." + System.currentTimeMillis());
+ try (final InputStream inputStream = snapshotConn.getInputStream();
+ final FileOutputStream outputStream = new FileOutputStream(snapshotFile)) {
+ IOUtils.copyBytes(inputStream, outputStream, 1024, true);
+ final long fileSize = Files.size(snapshotFile.toPath());
+ assertTrue(fileSize > 0);
+ }
+ return snapshotFile;
+ }
+
+ private void performRestoreAndValidate(final File snapshotFile) throws Exception {
+ // perform restore
+ final HttpURLConnection restoreConn = sendRestoreRequest();
+ try (final CheckedInputStream is = SnapStream.getInputStream(snapshotFile);
+ final OutputStream outputStream = restoreConn.getOutputStream()) {
+ IOUtils.copyBytes(is, outputStream, 1024, true);
+ }
+ // validate restore response
+ assertEquals(HttpURLConnection.HTTP_OK, restoreConn.getResponseCode());
+ displayResponsePayload(restoreConn);
+ }
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java
index ffd659978..573d361ae 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java
@@ -293,7 +293,7 @@ public class ObserverMasterTest extends ObserverMasterTestBase {
// test stats collection
final Map<String, String> emptyMap = Collections.emptyMap();
- Map<String, Object> stats = Commands.runCommand("mntr", q3.getQuorumPeer().getActiveServer(), emptyMap).toMap();
+ Map<String, Object> stats = Commands.runGetCommand("mntr", q3.getQuorumPeer().getActiveServer(), emptyMap).toMap();
assertTrue(stats.containsKey("observer_master_id"), "observer not emitting observer_master_id");
// check the stats for the first peer