diff options
author | li4wang <68786536+li4wang@users.noreply.github.com> | 2023-01-26 09:12:14 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-26 18:12:14 +0100 |
commit | d35bdfb9d3a4bc3d94f22785604418d4f650365d (patch) | |
tree | 6b53c3ac8299a87b1b10fad39a9b0907d589ca01 | |
parent | 778c4519e676278d2b76330df6e60032437a9973 (diff) | |
download | zookeeper-d35bdfb9d3a4bc3d94f22785604418d4f650365d.tar.gz |
ZOOKEEPER-4571: Admin server API for restore database from a snapshot (#1961)
Provides a restore command for restoring database from a snapshot
Author: Li Wang <liwang@apple.com>
Co-authored-by: liwang <liwang@apple.com>
18 files changed, 891 insertions, 162 deletions
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md index b4c01d35d..636c6cde2 100644 --- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md +++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md @@ -2115,16 +2115,22 @@ Both subsystems need to have sufficient amount of threads to achieve peak read t **New in 3.9.0:** The following options are used to configure the [AdminServer](#sc_adminserver). +* *admin.rateLimiterIntervalInMS* : + (Java system property: **zookeeper.admin.rateLimiterIntervalInMS**) + The time interval for rate limiting admin command to protect the server. + Defaults to 5 mins. + * *admin.snapshot.enabled* : (Java system property: **zookeeper.admin.snapshot.enabled**) The flag for enabling the snapshot command. Defaults to false. It will be enabled by default once the auth support for admin server commands is available. - -* *admin.snapshot.intervalInMS* : - (Java system property: **zookeeper.admin.snapshot.intervalInMS**) - The time interval for rate limiting snapshot command to protect the server. - Defaults to 5 mins. + +* *admin.restore.enabled* : + (Java system property: **zookeeper.admin.restore.enabled**) + The flag for enabling the restore command. Defaults to false. + It will be enabled by default once the auth support for admin server commands + is available. **New in 3.7.1:** The following options are used to configure the [AdminServer](#sc_adminserver). @@ -2641,6 +2647,13 @@ Available commands include: Reset all observer connection statistics. Companion command to *observers*. No new fields returned. +* *restore/rest* : + Restore database from snapshot input stream on the current server. + Returns the following data in response payload: + "last_zxid": String + Note: this API is rate-limited (once every 5 mins by default) to protect the server + from being over-loaded. + * *ruok* : No-op command, check if the server is running. A response does not necessarily indicate that the diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java index ef28e32e1..e3476ff6d 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java @@ -74,6 +74,9 @@ public final class ServerMetrics { SNAPSHOT_TIME = metricsContext.getSummary("snapshottime", DetailLevel.BASIC); SNAPSHOT_ERROR_COUNT = metricsContext.getCounter("snapshot_error_count"); SNAPSHOT_RATE_LIMITED_COUNT = metricsContext.getCounter("snapshot_rate_limited_count"); + RESTORE_TIME = metricsContext.getSummary("restore_time", DetailLevel.BASIC); + RESTORE_ERROR_COUNT = metricsContext.getCounter("restore_error_count"); + RESTORE_RATE_LIMITED_COUNT = metricsContext.getCounter("restore_rate_limited_count"); DB_INIT_TIME = metricsContext.getSummary("dbinittime", DetailLevel.BASIC); READ_LATENCY = metricsContext.getSummary("readlatency", DetailLevel.ADVANCED); UPDATE_LATENCY = metricsContext.getSummary("updatelatency", DetailLevel.ADVANCED); @@ -289,6 +292,21 @@ public final class ServerMetrics { public final Counter SNAPSHOT_RATE_LIMITED_COUNT; /** + * Restore time + */ + public final Summary RESTORE_TIME; + + /** + * Restore error count + */ + public final Counter RESTORE_ERROR_COUNT; + + /** + * Restore rate limited count + */ + public final Counter RESTORE_RATE_LIMITED_COUNT; + + /** * Db init time (snapshot loading + txnlog replay) */ public final Summary DB_INIT_TIME; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java index 05ada8cee..f20af88d5 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import java.util.zip.CheckedInputStream; import org.apache.jute.InputArchive; import org.apache.jute.OutputArchive; import org.apache.jute.Record; @@ -48,8 +49,10 @@ import org.apache.zookeeper.common.Time; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.DataTree.ProcessTxnResult; +import org.apache.zookeeper.server.persistence.FileSnap; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.persistence.FileTxnSnapLog.PlayBackListener; +import org.apache.zookeeper.server.persistence.SnapStream; import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator; import org.apache.zookeeper.server.quorum.Leader; import org.apache.zookeeper.server.quorum.Leader.Proposal; @@ -621,6 +624,43 @@ public class ZKDatabase { } /** + * Deserialize a snapshot that contains FileHeader from an input archive. It is used by + * the admin restore command. + * + * @param ia the input archive to deserialize from + * @param is the CheckInputStream to check integrity + * + * @throws IOException + */ + public void deserializeSnapshot(final InputArchive ia, final CheckedInputStream is) throws IOException { + clear(); + + // deserialize data tree + final DataTree dataTree = getDataTree(); + final FileSnap filesnap = new FileSnap(snapLog.getSnapDir()); + filesnap.deserialize(dataTree, getSessionWithTimeOuts(), ia); + SnapStream.checkSealIntegrity(is, ia); + + // deserialize digest and check integrity + if (dataTree.deserializeZxidDigest(ia, 0)) { + SnapStream.checkSealIntegrity(is, ia); + } + + // deserialize lastProcessedZxid and check integrity + if (dataTree.deserializeLastProcessedZxid(ia)) { + SnapStream.checkSealIntegrity(is, ia); + } + + // compare the digest to find inconsistency + if (dataTree.getDigestFromLoadedSnapshot() != null) { + dataTree.compareSnapshotDigests(dataTree.lastProcessedZxid); + } + + initialized = true; + } + + + /** * serialize the snapshot * @param oa the output archive to which the snapshot needs to be serialized * @throws IOException diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index d61f269c9..bbb0f7fee 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -19,9 +19,11 @@ package org.apache.zookeeper.server; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.BufferedInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.io.PrintWriter; import java.nio.ByteBuffer; import java.util.ArrayDeque; @@ -34,11 +36,16 @@ import java.util.Map; import java.util.Properties; import java.util.Random; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; +import java.util.zip.Adler32; +import java.util.zip.CheckedInputStream; import javax.security.sasl.SaslException; +import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; +import org.apache.jute.InputArchive; import org.apache.jute.Record; import org.apache.zookeeper.Environment; import org.apache.zookeeper.KeeperException; @@ -127,6 +134,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { // this feature is confirmed to be stable public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled"; private static boolean closeSessionTxnEnabled = true; + private volatile CountDownLatch restoreLatch; static { LOG = LoggerFactory.getLogger(ZooKeeperServer.class); @@ -541,12 +549,12 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { takeSnapshot(); } - public void takeSnapshot() throws IOException { - takeSnapshot(false); + public File takeSnapshot() throws IOException { + return takeSnapshot(false); } - public void takeSnapshot(boolean syncSnap) throws IOException { - takeSnapshot(syncSnap, true, false); + public File takeSnapshot(boolean syncSnap) throws IOException { + return takeSnapshot(syncSnap, true, false); } /** @@ -583,6 +591,61 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { return snapFile; } + /** + * Restores database from a snapshot. It is used by the restore admin server command. + * + * @param inputStream input stream of snapshot + * @Return last processed zxid + * @throws IOException + */ + public synchronized long restoreFromSnapshot(final InputStream inputStream) throws IOException { + if (inputStream == null) { + throw new IllegalArgumentException("InputStream can not be null when restoring from snapshot"); + } + + long start = Time.currentElapsedTime(); + LOG.info("Before restore database. lastProcessedZxid={}, nodeCount={},sessionCount={}", + getZKDatabase().getDataTreeLastProcessedZxid(), + getZKDatabase().dataTree.getNodeCount(), + getZKDatabase().getSessionCount()); + + // restore to a new zkDatabase + final ZKDatabase newZKDatabase = new ZKDatabase(this.txnLogFactory); + final CheckedInputStream cis = new CheckedInputStream(new BufferedInputStream(inputStream), new Adler32()); + final InputArchive ia = BinaryInputArchive.getArchive(cis); + newZKDatabase.deserializeSnapshot(ia, cis); + LOG.info("Restored to a new database. lastProcessedZxid={}, nodeCount={}, sessionCount={}", + newZKDatabase.getDataTreeLastProcessedZxid(), + newZKDatabase.dataTree.getNodeCount(), + newZKDatabase.getSessionCount()); + + // create a CountDownLatch + restoreLatch = new CountDownLatch(1); + + try { + // set to the new zkDatabase + setZKDatabase(newZKDatabase); + + // re-create SessionTrack + createSessionTracker(); + } finally { + // unblock request submission + restoreLatch.countDown(); + restoreLatch = null; + } + + LOG.info("After restore database. lastProcessedZxid={}, nodeCount={}, sessionCount={}", + getZKDatabase().getDataTreeLastProcessedZxid(), + getZKDatabase().dataTree.getNodeCount(), + getZKDatabase().getSessionCount()); + + long elapsed = Time.currentElapsedTime() - start; + LOG.info("Restore taken in {} ms", elapsed); + ServerMetrics.getMetrics().RESTORE_TIME.add(elapsed); + + return getLastProcessedZxid(); + } + public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() { return txnLogFactory.shouldForceWriteInitialSnapshotAfterLeaderElection(); } @@ -826,6 +889,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { * <li>During shutdown the server sets the state to SHUTDOWN, which * corresponds to the server not running.</li></ul> * + * <li>During maintenance (e.g. restore) the server sets the state to MAINTENANCE + * </li></ul> + * * @param state new server state. */ protected void setState(State state) { @@ -1151,6 +1217,14 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { } public void submitRequest(Request si) { + if (restoreLatch != null) { + try { + LOG.info("Blocking request submission while restore is in progress"); + restoreLatch.await(); + } catch (final InterruptedException e) { + LOG.warn("Unexpected interruption", e); + } + } enqueueRequest(si); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Command.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Command.java index b422715bf..5d06356b2 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Command.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Command.java @@ -18,6 +18,7 @@ package org.apache.zookeeper.server.admin; +import java.io.InputStream; import java.util.Map; import java.util.Set; import org.apache.zookeeper.server.ZooKeeperServer; @@ -57,19 +58,34 @@ public interface Command { boolean isServerRequired(); /** - * Run this command. Commands take a ZooKeeperServer and String-valued - * keyword arguments and return a map containing any information + * Run this command for HTTP GET request. Commands take a ZooKeeperServer, String-valued keyword + * arguments and return a CommandResponse object containing any information * constituting the response to the command. Commands are responsible for * parsing keyword arguments and performing any error handling if necessary. * Errors should be reported by setting the "error" entry of the returned * map with an appropriate message rather than throwing an exception. * - * @param zkServer + * @param zkServer ZooKeeper server * @param kwargs keyword -> argument value mapping - * @return Map representing response to command containing at minimum: + * @return CommandResponse representing response to command containing at minimum: * - "command" key containing the command's primary name * - "error" key containing a String error message or null if no error */ - CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs); + CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs); + /** + * Run this command for HTTP POST. Commands take a ZooKeeperServer and InputStream and + * return a CommandResponse object containing any information + * constituting the response to the command. Commands are responsible for + * parsing keyword arguments and performing any error handling if necessary. + * Errors should be reported by setting the "error" entry of the returned + * map with an appropriate message rather than throwing an exception. + * + * @param zkServer ZooKeeper server + * @param inputStream InputStream from request + * @return CommandResponse representing response to command containing at minimum: + * - "command" key containing the command's primary name + * - "error" key containing a String error message or null if no error + */ + CommandResponse runPost(ZooKeeperServer zkServer, InputStream inputStream); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java index 848583a0f..1911b5a57 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.File; import java.io.FileInputStream; +import java.io.InputStream; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collections; @@ -73,6 +74,8 @@ import org.slf4j.LoggerFactory; public class Commands { static final Logger LOG = LoggerFactory.getLogger(Commands.class); + static final String ADMIN_RATE_LIMITER_INTERVAL = "zookeeper.admin.rateLimiterIntervalInMS"; + private static final long rateLimiterInterval = Integer.parseInt(System.getProperty(ADMIN_RATE_LIMITER_INTERVAL, "300000")); /** Maps command names to Command instances */ private static Map<String, Command> commands = new HashMap<String, Command>(); @@ -100,16 +103,16 @@ public class Commands { * * @param cmdName * @param zkServer - * @param kwargs String-valued keyword arguments to the command + * @param kwargs String-valued keyword arguments to the command from HTTP GET request * (may be null if command requires no additional arguments) * @return Map representing response to command containing at minimum: * - "command" key containing the command's primary name * - "error" key containing a String error message or null if no error */ - public static CommandResponse runCommand( - String cmdName, - ZooKeeperServer zkServer, - Map<String, String> kwargs) { + public static CommandResponse runGetCommand( + String cmdName, + ZooKeeperServer zkServer, + Map<String, String> kwargs) { Command command = getCommand(cmdName); if (command == null) { // set the status code to 200 to keep the current behavior of existing commands @@ -119,7 +122,36 @@ public class Commands { // set the status code to 200 to keep the current behavior of existing commands return new CommandResponse(cmdName, "This ZooKeeper instance is not currently serving requests", HttpServletResponse.SC_OK); } - return command.run(zkServer, kwargs); + return command.runGet(zkServer, kwargs); + } + + /** + * Run the registered command with name cmdName. Commands should not produce + * any exceptions; any (anticipated) errors should be reported in the + * "error" entry of the returned map. Likewise, if no command with the given + * name is registered, this will be noted in the "error" entry. + * + * @param cmdName + * @param zkServer + * @param inputStream InputStream from HTTP POST request + * @return Map representing response to command containing at minimum: + * - "command" key containing the command's primary name + * - "error" key containing a String error message or null if no error + */ + public static CommandResponse runPostCommand( + String cmdName, + ZooKeeperServer zkServer, + InputStream inputStream) { + Command command = getCommand(cmdName); + if (command == null) { + // set the status code to 200 to keep the current behavior of existing commands + return new CommandResponse(cmdName, "Unknown command: " + cmdName, HttpServletResponse.SC_OK); + } + if (command.isServerRequired() && (zkServer == null || !zkServer.isRunning())) { + // set the status code to 200 to keep the current behavior of existing commands + return new CommandResponse(cmdName, "This ZooKeeper instance is not currently serving requests", HttpServletResponse.SC_OK); + } + return command.runPost(zkServer, inputStream); } /** @@ -152,6 +184,7 @@ public class Commands { registerCommand(new LeaderCommand()); registerCommand(new MonitorCommand()); registerCommand(new ObserverCnxnStatResetCommand()); + registerCommand(new RestoreCommand()); registerCommand(new RuokCommand()); registerCommand(new SetTraceMaskCommand()); registerCommand(new SnapshotCommand()); @@ -170,14 +203,14 @@ public class Commands { /** * Reset all connection statistics. */ - public static class CnxnStatResetCommand extends CommandBase { + public static class CnxnStatResetCommand extends GetCommand { public CnxnStatResetCommand() { super(Arrays.asList("connection_stat_reset", "crst")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); zkServer.getServerCnxnFactory().resetAllConnectionStats(); return response; @@ -190,14 +223,14 @@ public class Commands { * Server configuration parameters. * @see ZooKeeperServer#getConf() */ - public static class ConfCommand extends CommandBase { + public static class ConfCommand extends GetCommand { public ConfCommand() { super(Arrays.asList("configuration", "conf", "config")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); response.putAll(zkServer.getConf().toMap()); return response; @@ -210,14 +243,14 @@ public class Commands { * - "connections": list of connection info objects * @see org.apache.zookeeper.server.ServerCnxn#getConnectionInfo(boolean) */ - public static class ConsCommand extends CommandBase { + public static class ConsCommand extends GetCommand { public ConsCommand() { super(Arrays.asList("connections", "cons")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); ServerCnxnFactory serverCnxnFactory = zkServer.getServerCnxnFactory(); if (serverCnxnFactory != null) { @@ -239,14 +272,14 @@ public class Commands { /** * Information on ZK datadir and snapdir size in bytes */ - public static class DirsCommand extends CommandBase { + public static class DirsCommand extends GetCommand { public DirsCommand() { super(Arrays.asList("dirs")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); response.put("datadir_size", zkServer.getDataDirSize()); response.put("logdir_size", zkServer.getLogDirSize()); @@ -264,14 +297,14 @@ public class Commands { * @see ZooKeeperServer#getSessionExpiryMap() * @see ZooKeeperServer#getEphemerals() */ - public static class DumpCommand extends CommandBase { + public static class DumpCommand extends GetCommand { public DumpCommand() { super(Arrays.asList("dump")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); response.put("expiry_time_to_session_ids", zkServer.getSessionExpiryMap()); response.put("session_id_to_ephemeral_paths", zkServer.getEphemerals()); @@ -283,14 +316,14 @@ public class Commands { /** * All defined environment variables. */ - public static class EnvCommand extends CommandBase { + public static class EnvCommand extends GetCommand { public EnvCommand() { super(Arrays.asList("environment", "env", "envi"), false); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); for (Entry e : Environment.list()) { response.put(e.getKey(), e.getValue()); @@ -303,14 +336,14 @@ public class Commands { /** * Digest histories for every specific number of txns. */ - public static class DigestCommand extends CommandBase { + public static class DigestCommand extends GetCommand { public DigestCommand() { super(Arrays.asList("hash")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); response.put("digests", zkServer.getZKDatabase().getDataTree().getDigestLog()); return response; @@ -322,14 +355,14 @@ public class Commands { * The current trace mask. Returned map contains: * - "tracemask": Long */ - public static class GetTraceMaskCommand extends CommandBase { + public static class GetTraceMaskCommand extends GetCommand { public GetTraceMaskCommand() { super(Arrays.asList("get_trace_mask", "gtmk"), false); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); response.put("tracemask", ZooTrace.getTextTraceLevel()); return response; @@ -337,14 +370,14 @@ public class Commands { } - public static class InitialConfigurationCommand extends CommandBase { + public static class InitialConfigurationCommand extends GetCommand { public InitialConfigurationCommand() { super(Arrays.asList("initial_configuration", "icfg")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); response.put("initial_configuration", zkServer.getInitialConfig()); return response; @@ -356,14 +389,14 @@ public class Commands { * Is this server in read-only mode. Returned map contains: * - "is_read_only": Boolean */ - public static class IsroCommand extends CommandBase { + public static class IsroCommand extends GetCommand { public IsroCommand() { super(Arrays.asList("is_read_only", "isro")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); response.put("read_only", zkServer instanceof ReadOnlyZooKeeperServer); return response; @@ -380,14 +413,14 @@ public class Commands { * - "zxid": String * - "timestamp": Long */ - public static class LastSnapshotCommand extends CommandBase { + public static class LastSnapshotCommand extends GetCommand { public LastSnapshotCommand() { super(Arrays.asList("last_snapshot", "lsnp")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); SnapshotInfo info = zkServer.getTxnLogFactory().getLastSnapshotInfo(); response.put("zxid", Long.toHexString(info == null ? -1L : info.zxid)); @@ -400,14 +433,14 @@ public class Commands { /** * Returns the leader status of this instance and the leader host string. */ - public static class LeaderCommand extends CommandBase { + public static class LeaderCommand extends GetCommand { public LeaderCommand() { super(Arrays.asList("leader", "lead")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); if (zkServer instanceof QuorumZooKeeperServer) { response.put("is_leader", zkServer instanceof LeaderZooKeeperServer); @@ -450,14 +483,14 @@ public class Commands { * - "synced_followers": Integer (leader only) * - "pending_syncs": Integer (leader only) */ - public static class MonitorCommand extends CommandBase { + public static class MonitorCommand extends GetCommand { public MonitorCommand() { super(Arrays.asList("monitor", "mntr"), false); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); zkServer.dumpMonitorValues(response::put); ServerMetrics.getMetrics().getMetricsProvider().dump(response::put); @@ -470,14 +503,14 @@ public class Commands { /** * Reset all observer connection statistics. */ - public static class ObserverCnxnStatResetCommand extends CommandBase { + public static class ObserverCnxnStatResetCommand extends GetCommand { public ObserverCnxnStatResetCommand() { super(Arrays.asList("observer_connection_stat_reset", "orst")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); if (zkServer instanceof LeaderZooKeeperServer) { Leader leader = ((LeaderZooKeeperServer) zkServer).getLeader(); @@ -492,16 +525,80 @@ public class Commands { } /** + * Restore from snapshot on the current server. + * + * Returned map contains: + * - "last_zxid": String + */ + public static class RestoreCommand extends PostCommand { + static final String RESPONSE_DATA_LAST_ZXID = "last_zxid"; + + static final String ADMIN_RESTORE_ENABLED = "zookeeper.admin.restore.enabled"; + + + private RateLimiter rateLimiter; + + public RestoreCommand() { + super(Arrays.asList("restore", "rest")); + rateLimiter = new RateLimiter(1, rateLimiterInterval, TimeUnit.MICROSECONDS); + } + + @Override + public CommandResponse runPost(final ZooKeeperServer zkServer, final InputStream inputStream) { + final CommandResponse response = initializeResponse(); + + // check feature flag + final boolean restoreEnabled = Boolean.parseBoolean(System.getProperty(ADMIN_RESTORE_ENABLED, "false")); + if (!restoreEnabled) { + response.setStatusCode(HttpServletResponse.SC_SERVICE_UNAVAILABLE); + LOG.warn("Restore command is disabled"); + return response; + } + + if (!zkServer.isSerializeLastProcessedZxidEnabled()) { + response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + LOG.warn("Restore command requires serializeLastProcessedZxidEnable flag is set to true"); + return response; + } + + if (inputStream == null){ + response.setStatusCode(HttpServletResponse.SC_BAD_REQUEST); + LOG.warn("InputStream from restore request is null"); + return response; + } + + // check rate limiting + if (!rateLimiter.allow()) { + response.setStatusCode(HttpStatus.TOO_MANY_REQUESTS_429); + ServerMetrics.getMetrics().RESTORE_RATE_LIMITED_COUNT.add(1); + LOG.warn("Restore request was rate limited"); + return response; + } + + // restore from snapshot InputStream + try { + final long lastZxid = zkServer.restoreFromSnapshot(inputStream); + response.put(RESPONSE_DATA_LAST_ZXID, lastZxid); + } catch (final Exception e) { + response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + ServerMetrics.getMetrics().RESTORE_ERROR_COUNT.add(1); + LOG.warn("Exception occurred when restore snapshot via the restore command", e); + } + return response; + } + } + + /** * No-op command, check if the server is running */ - public static class RuokCommand extends CommandBase { + public static class RuokCommand extends GetCommand { public RuokCommand() { super(Arrays.asList("ruok")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { return initializeResponse(); } @@ -513,14 +610,14 @@ public class Commands { * Returned Map contains: * - "tracemask": Long */ - public static class SetTraceMaskCommand extends CommandBase { + public static class SetTraceMaskCommand extends GetCommand { public SetTraceMaskCommand() { super(Arrays.asList("set_trace_mask", "stmk"), false); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); long traceMask; if (!kwargs.containsKey("traceMask")) { @@ -551,28 +648,25 @@ public class Commands { * - "last_zxid": String * - "snapshot_size": String */ - public static class SnapshotCommand extends CommandBase { + public static class SnapshotCommand extends GetCommand { static final String REQUEST_QUERY_PARAM_STREAMING = "streaming"; static final String RESPONSE_HEADER_LAST_ZXID = "last_zxid"; static final String RESPONSE_HEADER_SNAPSHOT_SIZE = "snapshot_size"; static final String ADMIN_SNAPSHOT_ENABLED = "zookeeper.admin.snapshot.enabled"; - static final String ADMIN_SNAPSHOT_INTERVAL = "zookeeper.admin.snapshot.intervalInMS"; - - private static final long snapshotInterval = Integer.parseInt(System.getProperty(ADMIN_SNAPSHOT_INTERVAL, "300000")); private final RateLimiter rateLimiter; public SnapshotCommand() { super(Arrays.asList("snapshot", "snap")); - rateLimiter = new RateLimiter(1, snapshotInterval, TimeUnit.MICROSECONDS); + rateLimiter = new RateLimiter(1, rateLimiterInterval, TimeUnit.MICROSECONDS); } @SuppressFBWarnings(value = "OBL_UNSATISFIED_OBLIGATION", justification = "FileInputStream is passed to CommandResponse and closed in StreamOutputter") @Override - public CommandResponse run(final ZooKeeperServer zkServer, final Map<String, String> kwargs) { + public CommandResponse runGet(final ZooKeeperServer zkServer, final Map<String, String> kwargs) { final CommandResponse response = initializeResponse(); // check feature flag @@ -637,7 +731,7 @@ public class Commands { * - "server_stats": ServerStats object * - "node_count": Integer */ - public static class SrvrCommand extends CommandBase { + public static class SrvrCommand extends GetCommand { public SrvrCommand() { super(Arrays.asList("server_stats", "srvr")); @@ -649,7 +743,7 @@ public class Commands { } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); LOG.info("running stat"); response.put("version", Version.getFullVersion()); @@ -676,8 +770,8 @@ public class Commands { } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { - CommandResponse response = super.run(zkServer, kwargs); + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { + CommandResponse response = super.runGet(zkServer, kwargs); final Iterable<Map<String, Object>> connections; if (zkServer.getServerCnxnFactory() != null) { @@ -702,14 +796,14 @@ public class Commands { /** * Resets server statistics. */ - public static class StatResetCommand extends CommandBase { + public static class StatResetCommand extends GetCommand { public StatResetCommand() { super(Arrays.asList("stat_reset", "srst")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); zkServer.serverStats().reset(); return response; @@ -723,14 +817,14 @@ public class Commands { * - "observers": list of observer learner handler info objects (leader/follower only) * @see org.apache.zookeeper.server.quorum.LearnerHandler#getLearnerHandlerInfo() */ - public static class SyncedObserverConsCommand extends CommandBase { + public static class SyncedObserverConsCommand extends GetCommand { public SyncedObserverConsCommand() { super(Arrays.asList("observers", "obsr")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); @@ -760,14 +854,14 @@ public class Commands { /** * All defined system properties. */ - public static class SystemPropertiesCommand extends CommandBase { + public static class SystemPropertiesCommand extends GetCommand { public SystemPropertiesCommand() { super(Arrays.asList("system_properties", "sysp"), false); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); Properties systemProperties = System.getProperties(); SortedMap<String, String> sortedSystemProperties = new TreeMap<>(); @@ -782,14 +876,14 @@ public class Commands { * Returns the current ensemble configuration information. * It provides list of current voting members in the ensemble. */ - public static class VotingViewCommand extends CommandBase { + public static class VotingViewCommand extends GetCommand { public VotingViewCommand() { super(Arrays.asList("voting_view")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); if (zkServer instanceof QuorumZooKeeperServer) { QuorumPeer peer = ((QuorumZooKeeperServer) zkServer).self; @@ -850,14 +944,14 @@ public class Commands { * @see DataTree#getWatches() * @see DataTree#getWatches() */ - public static class WatchCommand extends CommandBase { + public static class WatchCommand extends GetCommand { public WatchCommand() { super(Arrays.asList("watches", "wchc")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { DataTree dt = zkServer.getZKDatabase().getDataTree(); CommandResponse response = initializeResponse(); response.put("session_id_to_watched_paths", dt.getWatches().toMap()); @@ -871,14 +965,14 @@ public class Commands { * - "path_to_session_ids": Map<String, Set<Long>> path -> session IDs of sessions watching path * @see DataTree#getWatchesByPath() */ - public static class WatchesByPathCommand extends CommandBase { + public static class WatchesByPathCommand extends GetCommand { public WatchesByPathCommand() { super(Arrays.asList("watches_by_path", "wchp")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { DataTree dt = zkServer.getZKDatabase().getDataTree(); CommandResponse response = initializeResponse(); response.put("path_to_session_ids", dt.getWatchesByPath().toMap()); @@ -891,14 +985,14 @@ public class Commands { * Summarized watch information. * @see DataTree#getWatchesSummary() */ - public static class WatchSummaryCommand extends CommandBase { + public static class WatchSummaryCommand extends GetCommand { public WatchSummaryCommand() { super(Arrays.asList("watch_summary", "wchs")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { DataTree dt = zkServer.getZKDatabase().getDataTree(); CommandResponse response = initializeResponse(); response.putAll(dt.getWatchesSummary().toMap()); @@ -911,14 +1005,14 @@ public class Commands { * Returns the current phase of Zab protocol that peer is running. * It can be in one of these phases: ELECTION, DISCOVERY, SYNCHRONIZATION, BROADCAST */ - public static class ZabStateCommand extends CommandBase { + public static class ZabStateCommand extends GetCommand { public ZabStateCommand() { super(Arrays.asList("zabstate"), false); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); if (zkServer instanceof QuorumZooKeeperServer) { QuorumPeer peer = ((QuorumZooKeeperServer) zkServer).self; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/GetCommand.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/GetCommand.java new file mode 100644 index 000000000..509cad72e --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/GetCommand.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.admin; + +import java.io.InputStream; +import java.util.List; +import org.apache.zookeeper.server.ZooKeeperServer; + +/** + * Command that represents HTTP GET request + */ + +public abstract class GetCommand extends CommandBase { + + protected GetCommand(List<String> names) { + super(names); + } + + protected GetCommand(List<String> names, boolean serverRequired) { + super(names, serverRequired); + } + + @Override + public CommandResponse runPost(ZooKeeperServer zkServer, InputStream inputStream) { + return null; + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java index 3c82f8552..effececdc 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java @@ -236,6 +236,7 @@ public class JettyAdminServer implements AdminServer { private static final long serialVersionUID = 1L; + @Override protected void doGet( HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { @@ -260,7 +261,7 @@ public class JettyAdminServer implements AdminServer { } // Run the command - final CommandResponse cmdResponse = Commands.runCommand(cmd, zkServer, kwargs); + final CommandResponse cmdResponse = Commands.runGetCommand(cmd, zkServer, kwargs); response.setStatus(cmdResponse.getStatusCode()); final Map<String, String> headers = cmdResponse.getHeaders(); @@ -280,6 +281,60 @@ public class JettyAdminServer implements AdminServer { outputter.output(cmdResponse, response.getOutputStream()); } } + + /** + * Serves HTTP POST requests. It reads request payload as raw data. + * It's up to each command to process the payload accordingly. + * For example, RestoreCommand uses the payload InputStream directly + * to read snapshot data. + */ + @Override + protected void doPost(final HttpServletRequest request, + final HttpServletResponse response) throws ServletException, IOException { + final String cmdName = extractCommandNameFromURL(request, response); + if (cmdName != null) { + final CommandResponse cmdResponse = Commands.runPostCommand(cmdName, zkServer, request.getInputStream()); + final String clientIP = IPAuthenticationProvider.getClientIPAddress(request); + sendJSONResponse(response, cmdResponse, clientIP); + } + } + + /** + * Extracts the command name from URL if it exists otherwise null + */ + private String extractCommandNameFromURL(final HttpServletRequest request, + final HttpServletResponse response) throws IOException { + String cmd = request.getPathInfo(); + if (cmd == null || cmd.equals("/")) { + printCommandLinks(response); + return null; + } + // Strip leading "/" + return cmd.substring(1); + } + + /** + * Prints the list of URLs to each registered command as response. + */ + private void printCommandLinks(final HttpServletResponse response) throws IOException { + for (final String link : commandLinks()) { + response.getWriter().println(link); + response.getWriter().println("<br/>"); + } + } + + /** + * Send JSON string as the response. + */ + private void sendJSONResponse(final HttpServletResponse response, + final CommandResponse cmdResponse, + final String clientIP) throws IOException { + final CommandOutputter outputter = new JsonOutputter(clientIP); + + response.setStatus(cmdResponse.getStatusCode()); + response.setContentType(outputter.getContentType()); + outputter.output(cmdResponse, response.getWriter()); + } } /** diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/PostCommand.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/PostCommand.java new file mode 100644 index 000000000..cb346b2e0 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/PostCommand.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Command that represents HTTP POST request + */ +package org.apache.zookeeper.server.admin; + +import java.util.List; +import java.util.Map; +import org.apache.zookeeper.server.ZooKeeperServer; + +public abstract class PostCommand extends CommandBase { + protected PostCommand(List<String> names) { + super(names); + } + + @Override + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { + return null; + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java index 1a91d1c30..f162f0cc6 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java @@ -99,7 +99,7 @@ public class FileSnap implements SnapShot { SnapStream.checkSealIntegrity(snapIS, ia); } - // deserialize the last processed zxid and check the intact + // deserialize lastProcessedZxid and check inconsistency if (dt.deserializeLastProcessedZxid(ia)) { SnapStream.checkSealIntegrity(snapIS, ia); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java index 9bc8ee519..847b12c28 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java @@ -170,7 +170,7 @@ public class SnapStream { * the checkSum of the content. * */ - static void checkSealIntegrity(CheckedInputStream is, InputArchive ia) throws IOException { + public static void checkSealIntegrity(CheckedInputStream is, InputArchive ia) throws IOException { long checkSum = is.getChecksum().getValue(); long val = ia.readLong("val"); ia.readString("path"); // Read and ignore "/" written by SealStream. diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java index 826a875b4..8d9430e1c 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java @@ -72,8 +72,8 @@ public class ZKTestCase { // same port. System.setProperty("zookeeper.admin.enableServer", "false"); - // disable rate limiting on the snapshot admin API - System.setProperty("zookeeper.admin.snapshot.intervalInMS", "0"); + // disable rate limiting + System.setProperty("zookeeper.admin.rateLimiterIntervalInMS", "0"); // ZOOKEEPER-2693 disables all 4lw by default. // Here we enable the 4lw which ZooKeeper tests depends. diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZookeeperServerRestoreTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZookeeperServerRestoreTest.java new file mode 100644 index 000000000..85f514dd7 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZookeeperServerRestoreTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.server; + +import static org.apache.zookeeper.server.persistence.FileSnap.SNAPSHOT_FILE_PREFIX; +import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.File; +import java.util.Set; +import java.util.zip.CheckedInputStream; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.server.persistence.SnapStream; +import org.apache.zookeeper.server.persistence.Util; +import org.apache.zookeeper.test.ClientBase; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class ZookeeperServerRestoreTest extends ZKTestCase { + private static final String BASE_PATH = "/restoreFromSnapshotTest"; + private static final int NODE_COUNT = 10; + private static final String HOST_PORT = "127.0.0.1:" + PortAssignment.unique(); + + @TempDir + static File dataDir; + + @TempDir + static File logDir; + + @Test + public void testRestoreFromSnapshot() throws Exception { + ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true); + + final ZooKeeperServer zks = new ZooKeeperServer(dataDir, logDir, 3000); + final int port = Integer.parseInt(HOST_PORT.split(":")[1]); + final ServerCnxnFactory serverCnxnFactory = ServerCnxnFactory.createFactory(port, -1); + + ZooKeeper zk1 = null; + ZooKeeper zk2 = null; + ZooKeeper zk3 = null; + + try { + // start the server + serverCnxnFactory.startup(zks); + assertTrue(ClientBase.waitForServerUp(HOST_PORT, CONNECTION_TIMEOUT)); + + // zk1 create test data + zk1 = ClientBase.createZKClient(HOST_PORT); + for (int i = 0; i < NODE_COUNT; i++) { + final String path = BASE_PATH + "-" + i; + zk1.create(path, String.valueOf(i).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + // take Snapshot + final File snapshotFile = zks.takeSnapshot(false, false, true); + final long lastZxidFromSnapshot = Util.getZxidFromName(snapshotFile.getName(), SNAPSHOT_FILE_PREFIX); + + // zk2 create more test data after snapshotting + zk2 = ClientBase.createZKClient(HOST_PORT); + for (int i = NODE_COUNT; i < NODE_COUNT * 2; i++) { + final String path = BASE_PATH + "-" + i; + zk2.create(path, String.valueOf(i).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + // restore from snapshot + try (final CheckedInputStream is = SnapStream.getInputStream(snapshotFile)) { + final long lastZxidFromRestore = zks.restoreFromSnapshot(is); + + // validate the last processed zxid + assertEquals(lastZxidFromSnapshot, lastZxidFromRestore); + + // validate restored data only contains data from snapshot + zk3 = ClientBase.createZKClient(HOST_PORT); + for (int i = 0; i < NODE_COUNT; i++) { + final String path = BASE_PATH + "-" + i; + final String expectedData = String.valueOf(i); + assertArrayEquals(expectedData.getBytes(), zk3.getData(path, null, null)); + } + assertEquals(NODE_COUNT + 3, zk3.getAllChildrenNumber("/")); + + // validate sessions + final SessionTracker sessionTracker = zks.getSessionTracker(); + final Set<Long> globalSessions = sessionTracker.globalSessions(); + assertEquals(2, globalSessions.size()); + assertTrue(globalSessions.contains(zk1.getSessionId())); + Assertions.assertFalse(globalSessions.contains(zk2.getSessionId())); + assertTrue(globalSessions.contains(zk3.getSessionId())); + + // validate ZookeeperServer state + assertEquals(ZooKeeperServer.State.RUNNING, zks.state); + + // validate being able to create more data after restore + zk3.create(BASE_PATH + "_" + "after", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + assertEquals(NODE_COUNT + 4, zk3.getAllChildrenNumber("/")); + } + } finally { + System.clearProperty("zookeeper.serializeLastProcessedZxid.enabled"); + + if (zk1 != null) { + zk1.close(); + } + if (zk2 != null) { + zk2.close(); + } + if (zk3 != null) { + zk3.close(); + } + + zks.shutdown(); + serverCnxnFactory.shutdown(); + } + } + + @Test + public void testRestoreFromSnapshot_nulInputStream() throws Exception { + final ZooKeeperServer zks = new ZooKeeperServer(dataDir, logDir, 3000); + assertThrows(IllegalArgumentException.class, () -> zks.restoreFromSnapshot(null)); + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/TakeSnapshotTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZookeeperServerSnapshotTest.java index a9953b7c0..c9d74a4cb 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/TakeSnapshotTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZookeeperServerSnapshotTest.java @@ -16,26 +16,26 @@ * limitations under the License. */ -package org.apache.zookeeper; +package org.apache.zookeeper.server; +import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.File; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.ZKTestCase; import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.server.ServerCnxnFactory; -import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.test.ClientBase; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -public class TakeSnapshotTest extends ClientBase { +public class ZookeeperServerSnapshotTest extends ZKTestCase { private static final String BASE_PATH = "/takeSnapshotTest"; - private static final int NODE_COUNT = 100; - private static final String HOSTPORT = "127.0.0.1:" + PortAssignment.unique(); - private ZooKeeper zk; + private static final int NODE_COUNT = 10; + private static final String HOST_PORT = "127.0.0.1:" + PortAssignment.unique(); @TempDir static File dataDir; @@ -43,32 +43,19 @@ public class TakeSnapshotTest extends ClientBase { @TempDir static File logDir; - - @BeforeEach - public void setUp() throws Exception { - super.setUp(); - ClientBase.setupTestEnv(); - } - - @AfterEach - public void tearDown() throws Exception { - if (zk != null) { - zk.close(); - } - } - @Test - public void testTakeSnapshotAndRestore() throws Exception { + public void testTakeSnapshot() throws Exception { ZooKeeperServer zks = new ZooKeeperServer(dataDir, logDir, 3000); ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true); - final int port = Integer.parseInt(HOSTPORT.split(":")[1]); + final int port = Integer.parseInt(HOST_PORT.split(":")[1]); final ServerCnxnFactory serverCnxnFactory = ServerCnxnFactory.createFactory(port, -1); - serverCnxnFactory.startup(zks); - assertTrue(ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT)); + ZooKeeper zk = null; + try { + serverCnxnFactory.startup(zks); + assertTrue(ClientBase.waitForServerUp(HOST_PORT, CONNECTION_TIMEOUT)); - try { - zk = ClientBase.createZKClient(HOSTPORT); + zk = ClientBase.createZKClient(HOST_PORT); for (int i = 0; i < NODE_COUNT; i++) { final String path = BASE_PATH + "-" + i; zk.create(path, String.valueOf(i).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -81,14 +68,14 @@ public class TakeSnapshotTest extends ClientBase { zk.close(); zks.shutdown(); - // start server again and assert the data restored from snapshot + // restart server and assert the data restored from snapshot zks = new ZooKeeperServer(dataDir, logDir, 3000); ZooKeeperServer.setSerializeLastProcessedZxidEnabled(false); serverCnxnFactory.startup(zks); - assertTrue(ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT)); + assertTrue(ClientBase.waitForServerUp(HOST_PORT, CONNECTION_TIMEOUT)); - zk = ClientBase.createZKClient(HOSTPORT); + zk = ClientBase.createZKClient(HOST_PORT); for (int i = 0; i < NODE_COUNT; i++) { final String path = BASE_PATH + "-" + i; final String expectedData = String.valueOf(i); @@ -96,6 +83,10 @@ public class TakeSnapshotTest extends ClientBase { } assertEquals(NODE_COUNT + 3, zk.getAllChildrenNumber("/")); } finally { + if (zk != null) { + zk.close(); + } + zks.shutdown(); serverCnxnFactory.shutdown(); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java index 2d74776e3..c60ae868d 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java @@ -19,8 +19,9 @@ package org.apache.zookeeper.server.admin; import static org.apache.zookeeper.server.ZooKeeperServer.ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED; +import static org.apache.zookeeper.server.admin.Commands.ADMIN_RATE_LIMITER_INTERVAL; +import static org.apache.zookeeper.server.admin.Commands.RestoreCommand.ADMIN_RESTORE_ENABLED; import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_ENABLED; -import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_INTERVAL; import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.REQUEST_QUERY_PARAM_STREAMING; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; @@ -30,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -57,6 +59,8 @@ public class CommandsTest extends ClientBase { * - the primary name of the command * @param kwargs * - keyword arguments to the command + * @param inputStream + * - InputStream to the command * @param expectedHeaders * - expected HTTP response headers * @param expectedStatusCode @@ -66,11 +70,12 @@ public class CommandsTest extends ClientBase { * @throws IOException * @throws InterruptedException */ - private void testCommand(String cmdName, Map<String, String> kwargs, + private void testCommand(String cmdName, Map<String, String> kwargs, InputStream inputStream, Map<String, String> expectedHeaders, int expectedStatusCode, Field... fields) throws IOException, InterruptedException { ZooKeeperServer zks = serverFactory.getZooKeeperServer(); - final CommandResponse commandResponse = Commands.runCommand(cmdName, zks, kwargs); + final CommandResponse commandResponse = inputStream == null + ? Commands.runGetCommand(cmdName, zks, kwargs) : Commands.runPostCommand(cmdName, zks, inputStream); assertNotNull(commandResponse); assertEquals(expectedStatusCode, commandResponse.getStatusCode()); try (final InputStream responseStream = commandResponse.getInputStream()) { @@ -102,7 +107,7 @@ public class CommandsTest extends ClientBase { } public void testCommand(String cmdName, Field... fields) throws IOException, InterruptedException { - testCommand(cmdName, new HashMap<String, String>(), new HashMap<>(), HttpServletResponse.SC_OK, fields); + testCommand(cmdName, new HashMap<String, String>(), null, new HashMap<>(), HttpServletResponse.SC_OK, fields); } private static class Field { @@ -116,16 +121,6 @@ public class CommandsTest extends ClientBase { } @Test - public void testSnapshot_streaming() throws IOException, InterruptedException { - testSnapshot(true); - } - - @Test - public void testSnapshot_nonStreaming() throws IOException, InterruptedException { - testSnapshot(false); - } - - @Test public void testConfiguration() throws IOException, InterruptedException { testCommand("configuration", new Field("client_port", Integer.class), new Field("data_dir", String.class), new Field("data_log_dir", String.class), new Field("tick_time", Integer.class), new Field("max_client_cnxns", Integer.class), new Field("min_session_timeout", Integer.class), new Field("max_session_timeout", Integer.class), new Field("server_id", Long.class), new Field("client_port_listen_backlog", Integer.class)); } @@ -231,6 +226,45 @@ public class CommandsTest extends ClientBase { } @Test + public void testRestore_invalidInputStream() throws IOException, InterruptedException { + setupForRestoreCommand(); + + try (final InputStream inputStream = new ByteArrayInputStream("Invalid snapshot data".getBytes())){ + final Map<String, String> kwargs = new HashMap<>(); + final Map<String, String> expectedHeaders = new HashMap<>(); + testCommand("restore", kwargs, inputStream, expectedHeaders, HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + } finally { + clearForRestoreCommand(); + } + } + + @Test + public void testRestore_nullInputStream() { + setupForRestoreCommand(); + final ZooKeeperServer zks = serverFactory.getZooKeeperServer(); + try { + final CommandResponse commandResponse = Commands.runPostCommand("restore", zks, null); + assertNotNull(commandResponse); + assertEquals(HttpServletResponse.SC_BAD_REQUEST, commandResponse.getStatusCode()); + } finally { + clearForRestoreCommand(); + if (zks != null) { + zks.shutdown(); + } + } + } + + @Test + public void testSnapshot_streaming() throws IOException, InterruptedException { + testSnapshot(true); + } + + @Test + public void testSnapshot_nonStreaming() throws IOException, InterruptedException { + testSnapshot(false); + } + + @Test public void testServerStats() throws IOException, InterruptedException { testCommand("server_stats", new Field("version", String.class), new Field("read_only", Boolean.class), new Field("server_stats", ServerStats.class), new Field("node_count", Integer.class), new Field("client_response", BufferStats.class)); } @@ -239,7 +273,7 @@ public class CommandsTest extends ClientBase { public void testSetTraceMask() throws IOException, InterruptedException { Map<String, String> kwargs = new HashMap<String, String>(); kwargs.put("traceMask", "1"); - testCommand("set_trace_mask", kwargs, new HashMap<>(), HttpServletResponse.SC_OK, new Field("tracemask", Long.class)); + testCommand("set_trace_mask", kwargs, null, new HashMap<>(), HttpServletResponse.SC_OK, new Field("tracemask", Long.class)); } @Test @@ -289,7 +323,7 @@ public class CommandsTest extends ClientBase { when(zkServer.getSecureServerCnxnFactory()).thenReturn(cnxnFactory); // Act - CommandResponse response = cmd.run(zkServer, null); + CommandResponse response = cmd.runGet(zkServer, null); // Assert assertThat(response.toMap().containsKey("connections"), is(true)); @@ -313,7 +347,7 @@ public class CommandsTest extends ClientBase { when(zkServer.getZKDatabase()).thenReturn(zkDatabase); when(zkDatabase.getNodeCount()).thenReturn(0); - CommandResponse response = cmd.run(zkServer, null); + CommandResponse response = cmd.runGet(zkServer, null); assertThat(response.toMap().containsKey("connections"), is(true)); assertThat(response.toMap().containsKey("secure_connections"), is(true)); @@ -321,7 +355,7 @@ public class CommandsTest extends ClientBase { private void testSnapshot(final boolean streaming) throws IOException, InterruptedException { System.setProperty(ADMIN_SNAPSHOT_ENABLED, "true"); - System.setProperty(ADMIN_SNAPSHOT_INTERVAL, "0"); + System.setProperty(ADMIN_RATE_LIMITER_INTERVAL, "0"); System.setProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true"); try { final Map<String, String> kwargs = new HashMap<>(); @@ -329,12 +363,23 @@ public class CommandsTest extends ClientBase { final Map<String, String> expectedHeaders = new HashMap<>(); expectedHeaders.put(Commands.SnapshotCommand.RESPONSE_HEADER_LAST_ZXID, "0x0"); expectedHeaders.put(Commands.SnapshotCommand.RESPONSE_HEADER_SNAPSHOT_SIZE, "478"); - testCommand("snapshot", kwargs, expectedHeaders, HttpServletResponse.SC_OK); + testCommand("snapshot", kwargs, null, expectedHeaders, HttpServletResponse.SC_OK); } finally { System.clearProperty(ADMIN_SNAPSHOT_ENABLED); - System.clearProperty(ADMIN_SNAPSHOT_INTERVAL); + System.clearProperty(ADMIN_RATE_LIMITER_INTERVAL); System.clearProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED); } } + private void setupForRestoreCommand() { + System.setProperty(ADMIN_RESTORE_ENABLED, "true"); + System.setProperty(ADMIN_RATE_LIMITER_INTERVAL, "0"); + System.setProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true"); + } + + private void clearForRestoreCommand() { + System.clearProperty(ADMIN_RESTORE_ENABLED); + System.clearProperty(ADMIN_RATE_LIMITER_INTERVAL); + System.clearProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED); + } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java index b06cde6fd..41f00751a 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java @@ -63,7 +63,7 @@ public class JettyAdminServerTest extends ZKTestCase { static final String URL_FORMAT = "http://localhost:%d/commands"; static final String HTTPS_URL_FORMAT = "https://localhost:%d/commands"; - static final int jettyAdminPort = PortAssignment.unique(); + private final int jettyAdminPort = PortAssignment.unique(); @BeforeEach public void enableServer() { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotCommandTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotAndRestoreCommandTest.java index 5b5dbd979..f7d357e72 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotCommandTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotAndRestoreCommandTest.java @@ -19,20 +19,22 @@ package org.apache.zookeeper.server.admin; import static org.apache.zookeeper.server.ZooKeeperServer.ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED; +import static org.apache.zookeeper.server.admin.Commands.ADMIN_RATE_LIMITER_INTERVAL; +import static org.apache.zookeeper.server.admin.Commands.RestoreCommand.ADMIN_RESTORE_ENABLED; import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_ENABLED; -import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_INTERVAL; import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.REQUEST_QUERY_PARAM_STREAMING; import static org.apache.zookeeper.server.admin.JettyAdminServerTest.URL_FORMAT; -import static org.apache.zookeeper.server.admin.JettyAdminServerTest.jettyAdminPort; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.BufferedReader; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.net.HttpURLConnection; import java.net.URL; @@ -40,6 +42,11 @@ import java.net.URLEncoder; import java.nio.file.Files; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.zip.CheckedInputStream; import javax.servlet.http.HttpServletResponse; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -48,8 +55,11 @@ import org.apache.zookeeper.ZKTestCase; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.common.IOUtils; +import org.apache.zookeeper.metrics.MetricsUtils; import org.apache.zookeeper.server.ServerCnxnFactory; +import org.apache.zookeeper.server.ServerMetrics; import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.persistence.SnapStream; import org.apache.zookeeper.test.ClientBase; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -60,13 +70,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class SnapshotCommandTest extends ZKTestCase { - private static final Logger LOG = LoggerFactory.getLogger(SnapshotCommandTest.class); +public class SnapshotAndRestoreCommandTest extends ZKTestCase { + private static final Logger LOG = LoggerFactory.getLogger(SnapshotAndRestoreCommandTest.class); - private static final String PATH = "/snapshot_test"; + private static final String SNAPSHOT_TEST_PATH = "/snapshot_test"; private static final int NODE_COUNT = 10; - private final String hostPort = "127.0.0.1:" + PortAssignment.unique(); + private final String hostPort = "127.0.0.1:" + PortAssignment.unique(); + private final int jettyAdminPort = PortAssignment.unique(); private ServerCnxnFactory cnxnFactory; private JettyAdminServer adminServer; private ZooKeeperServer zks; @@ -91,9 +102,10 @@ public class SnapshotCommandTest extends ZKTestCase { // start AdminServer System.setProperty("zookeeper.admin.enableServer", "true"); System.setProperty("zookeeper.admin.serverPort", "" + jettyAdminPort); + System.setProperty(ADMIN_RATE_LIMITER_INTERVAL, "0"); System.setProperty(ADMIN_SNAPSHOT_ENABLED, "true"); - System.setProperty(ADMIN_SNAPSHOT_INTERVAL, "0"); System.setProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true"); + System.setProperty(ADMIN_RESTORE_ENABLED, "true"); adminServer = new JettyAdminServer(); adminServer.setZooKeeperServer(zks); @@ -101,7 +113,7 @@ public class SnapshotCommandTest extends ZKTestCase { // create Zookeeper client and test data zk = ClientBase.createZKClient(hostPort); - createData(zk, NODE_COUNT); + createData(zk, SNAPSHOT_TEST_PATH, NODE_COUNT); } @AfterAll @@ -109,9 +121,10 @@ public class SnapshotCommandTest extends ZKTestCase { System.clearProperty("zookeeper.4lw.commands.whitelist"); System.clearProperty("zookeeper.admin.enableServer"); System.clearProperty("zookeeper.admin.serverPort"); + System.clearProperty(ADMIN_RATE_LIMITER_INTERVAL); System.clearProperty(ADMIN_SNAPSHOT_ENABLED); - System.clearProperty(ADMIN_SNAPSHOT_INTERVAL); System.clearProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED); + System.clearProperty(ADMIN_RESTORE_ENABLED); if (zk != null) { zk.close(); @@ -131,19 +144,85 @@ public class SnapshotCommandTest extends ZKTestCase { } @Test - public void testSnapshotCommand_streaming() throws Exception { - // take snapshot with streaming - final HttpURLConnection snapshotConn = sendSnapshotRequest(true); + public void testSnapshotAndRestoreCommand_streaming() throws Exception { + ServerMetrics.getMetrics().resetAll(); - // validate snapshot response - assertEquals(HttpURLConnection.HTTP_OK, snapshotConn.getResponseCode()); - validateResponseHeaders(snapshotConn); - final File snapshotFile = new File(dataDir + "/snapshot." + System.currentTimeMillis()); - try (final InputStream inputStream = snapshotConn.getInputStream(); - final FileOutputStream outputStream = new FileOutputStream(snapshotFile)) { - IOUtils.copyBytes(inputStream, outputStream, 1024, true); - final long fileSize = Files.size(snapshotFile.toPath()); - assertTrue(fileSize > 0); + // take snapshot with streaming and validate + final File snapshotFile = takeSnapshotAndValidate(); + + // validate snapshot metrics + validateSnapshotMetrics(); + + // restore from snapshot and validate + performRestoreAndValidate(snapshotFile); + + // validate creating data after restore + try (final ZooKeeper zk = ClientBase.createZKClient(hostPort)) { + createData(zk, SNAPSHOT_TEST_PATH, NODE_COUNT + 1); + assertEquals(NODE_COUNT + NODE_COUNT + 1, zk.getAllChildrenNumber(SNAPSHOT_TEST_PATH)); + } + + // validate restore metrics + validateRestoreMetrics(); + } + + @Test + public void testClientRequest_restoreInProgress() throws Exception { + final int threadCount = 2; + final int nodeCount = 50; + final String restoreTestPath = "/restore_test"; + + // take snapshot + final File snapshotFile = takeSnapshotAndValidate(); + + final ExecutorService service = Executors.newFixedThreadPool(threadCount); + final CountDownLatch latch = new CountDownLatch(threadCount); + final AtomicBoolean createSucceeded = new AtomicBoolean(false); + final AtomicBoolean restoreSucceeded = new AtomicBoolean(false); + + // thread 1 creates data + service.submit(() -> { + try { + createData(zk, restoreTestPath, nodeCount); + createSucceeded.set(true); + } catch (final Exception e) { + LOG.error(e.getMessage()); + e.printStackTrace(); + } finally { + latch.countDown(); + } + }); + + // thread 2 performs restore operation + service.submit(() -> { + try { + performRestoreAndValidate(snapshotFile); + restoreSucceeded.set(true); + } catch (final Exception e) { + LOG.error(e.getMessage()); + e.printStackTrace(); + } finally { + latch.countDown(); + } + }); + + // wait for operations completed + latch.await(); + + // validate all client requests succeeded + if (createSucceeded.get() && restoreSucceeded.get()) { + assertEquals(nodeCount, zk.getAllChildrenNumber(restoreTestPath)); + } + } + + @Test + public void testRestores() throws Exception { + // take snapshot + final File snapshotFile = takeSnapshotAndValidate(); + + // perform restores + for (int i = 0; i < 3; i++) { + performRestoreAndValidate(snapshotFile); } } @@ -186,16 +265,47 @@ public class SnapshotCommandTest extends ZKTestCase { } } - private void createData(final ZooKeeper zk, final long count) throws Exception { + @Test + public void testRestoreCommand_disabled() throws Exception { + System.setProperty(ADMIN_RESTORE_ENABLED, "false"); + try { + final HttpURLConnection restoreConn = sendRestoreRequest(); + assertEquals(HttpServletResponse.SC_SERVICE_UNAVAILABLE, restoreConn.getResponseCode()); + } finally { + System.setProperty(ADMIN_RESTORE_ENABLED, "true"); + } + } + + @Test + public void testRestoreCommand_serializeLastZxidDisabled() throws Exception { + ZooKeeperServer.setSerializeLastProcessedZxidEnabled(false); + try { + final HttpURLConnection restoreConn = sendRestoreRequest(); + assertEquals(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, restoreConn.getResponseCode()); + } finally { + ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true); + } + } + + @Test + public void testRestoreCommand_invalidSnapshotData() throws Exception { + final HttpURLConnection restoreConn = sendRestoreRequest(); + try (final InputStream inputStream = new ByteArrayInputStream("Invalid snapshot data".getBytes()); + final OutputStream outputStream = restoreConn.getOutputStream()) { + IOUtils.copyBytes(inputStream, outputStream, 1024, true); + } + assertEquals(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, restoreConn.getResponseCode()); + } + + private void createData(final ZooKeeper zk, final String parentPath, final long count) throws Exception { try { - zk.create(PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.create(parentPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (final KeeperException.NodeExistsException ignore) { // ignore } for (int i = 0; i < count; i++) { - final String processNodePath = zk.create(String.format("%s/%s", PATH, "n_"), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); - LOG.info("Node created. path={}" + processNodePath); + zk.create(String.format("%s/%s", parentPath, "n_"), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } } @@ -208,6 +318,15 @@ public class SnapshotCommandTest extends ZKTestCase { return snapshotConn; } + private HttpURLConnection sendRestoreRequest() throws Exception { + final URL restoreURL = new URL(String.format(URL_FORMAT + "/restore", jettyAdminPort)); + final HttpURLConnection restoreConn = (HttpURLConnection) restoreURL.openConnection(); + restoreConn.setDoOutput(true); + restoreConn.setRequestMethod("POST"); + + return restoreConn; + } + private String buildQueryStringForSnapshotCommand(final boolean streaming) throws Exception { final Map<String, String> parameters = new HashMap<>(); parameters.put(REQUEST_QUERY_PARAM_STREAMING, String.valueOf(streaming)); @@ -254,4 +373,47 @@ public class SnapshotCommandTest extends ZKTestCase { LOG.info("Response payload: {}", sb); } } + + private void validateSnapshotMetrics() { + Map<String, Object> metrics = MetricsUtils.currentServerMetrics(); + assertEquals(0, (long) metrics.get("snapshot_error_count")); + assertEquals(0, (long) metrics.get("snapshot_rate_limited_count")); + assertTrue((Double) metrics.get("avg_snapshottime") > 0.0); + } + + private void validateRestoreMetrics() { + Map<String, Object> metrics = MetricsUtils.currentServerMetrics(); + assertEquals(0, (long) metrics.get("restore_error_count")); + assertEquals(0, (long) metrics.get("restore_rate_limited_count")); + assertTrue((Double) metrics.get("avg_restore_time") > 0.0); + } + + private File takeSnapshotAndValidate() throws Exception { + // take snapshot with streaming + final HttpURLConnection snapshotConn = sendSnapshotRequest(true); + + // validate snapshot response + assertEquals(HttpURLConnection.HTTP_OK, snapshotConn.getResponseCode()); + validateResponseHeaders(snapshotConn); + final File snapshotFile = new File(dataDir + "/snapshot." + System.currentTimeMillis()); + try (final InputStream inputStream = snapshotConn.getInputStream(); + final FileOutputStream outputStream = new FileOutputStream(snapshotFile)) { + IOUtils.copyBytes(inputStream, outputStream, 1024, true); + final long fileSize = Files.size(snapshotFile.toPath()); + assertTrue(fileSize > 0); + } + return snapshotFile; + } + + private void performRestoreAndValidate(final File snapshotFile) throws Exception { + // perform restore + final HttpURLConnection restoreConn = sendRestoreRequest(); + try (final CheckedInputStream is = SnapStream.getInputStream(snapshotFile); + final OutputStream outputStream = restoreConn.getOutputStream()) { + IOUtils.copyBytes(is, outputStream, 1024, true); + } + // validate restore response + assertEquals(HttpURLConnection.HTTP_OK, restoreConn.getResponseCode()); + displayResponsePayload(restoreConn); + } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java index ffd659978..573d361ae 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java @@ -293,7 +293,7 @@ public class ObserverMasterTest extends ObserverMasterTestBase { // test stats collection final Map<String, String> emptyMap = Collections.emptyMap(); - Map<String, Object> stats = Commands.runCommand("mntr", q3.getQuorumPeer().getActiveServer(), emptyMap).toMap(); + Map<String, Object> stats = Commands.runGetCommand("mntr", q3.getQuorumPeer().getActiveServer(), emptyMap).toMap(); assertTrue(stats.containsKey("observer_master_id"), "observer not emitting observer_master_id"); // check the stats for the first peer |