summaryrefslogtreecommitdiff
path: root/zookeeper-server/src/main/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'zookeeper-server/src/main/java/org')
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java36
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java12
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java52
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandOutputter.java6
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandResponse.java78
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java102
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java26
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JsonOutputter.java6
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/StreamOutputter.java52
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java17
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java10
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java4
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RateLimiter.java60
13 files changed, 439 insertions, 22 deletions
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
index 2818e15aa..cd45a7c34 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
@@ -1745,6 +1745,42 @@ public class DataTree {
}
/**
+ * Serializes the lastProcessedZxid so we can get it from snapshot instead the snapshot file name.
+ * This is needed for performing snapshot and restore via admin server commands.
+ *
+ * @param oa the output stream to write to
+ * @return true if the lastProcessedZxid is serialized successfully, otherwise false
+ * @throws IOException if there is an I/O error
+ */
+ public boolean serializeLastProcessedZxid(final OutputArchive oa) throws IOException {
+ if (!ZooKeeperServer.isSerializeLastProcessedZxidEnabled()) {
+ return false;
+ }
+ oa.writeLong(lastProcessedZxid, "lastZxid");
+ return true;
+ }
+
+ /**
+ * Deserializes the lastProcessedZxid from the input stream and updates the lastProcessedZxid field.
+ *
+ * @param ia the input stream to read from
+ * @return true if lastProcessedZxid is deserialized successfully, otherwise false
+ * @throws IOException if there is an I/O error
+ */
+ public boolean deserializeLastProcessedZxid(final InputArchive ia) throws IOException {
+ if (!ZooKeeperServer.isSerializeLastProcessedZxidEnabled()) {
+ return false;
+ }
+ try {
+ lastProcessedZxid = ia.readLong("lastZxid");
+ } catch (final EOFException e) {
+ LOG.warn("Got EOFException while reading the last processed zxid, likely due to reading an older snapshot.");
+ return false;
+ }
+ return true;
+ }
+
+ /**
* Compares the actual tree's digest with that in the snapshot.
* Resets digestFromLoadedSnapshot after comparision.
*
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index af156542f..ef28e32e1 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -72,6 +72,8 @@ public final class ServerMetrics {
FSYNC_TIME = metricsContext.getSummary("fsynctime", DetailLevel.BASIC);
SNAPSHOT_TIME = metricsContext.getSummary("snapshottime", DetailLevel.BASIC);
+ SNAPSHOT_ERROR_COUNT = metricsContext.getCounter("snapshot_error_count");
+ SNAPSHOT_RATE_LIMITED_COUNT = metricsContext.getCounter("snapshot_rate_limited_count");
DB_INIT_TIME = metricsContext.getSummary("dbinittime", DetailLevel.BASIC);
READ_LATENCY = metricsContext.getSummary("readlatency", DetailLevel.ADVANCED);
UPDATE_LATENCY = metricsContext.getSummary("updatelatency", DetailLevel.ADVANCED);
@@ -277,6 +279,16 @@ public final class ServerMetrics {
public final Summary SNAPSHOT_TIME;
/**
+ * Snapshot error count
+ */
+ public final Counter SNAPSHOT_ERROR_COUNT;
+
+ /**
+ * Snapshot rate limited count
+ */
+ public final Counter SNAPSHOT_RATE_LIMITED_COUNT;
+
+ /**
* Db init time (snapshot loading + txnlog replay)
*/
public final Summary DB_INIT_TIME;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index f6c2b93eb..d61f269c9 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -120,6 +120,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled";
private static boolean digestEnabled;
+ public static final String ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED = "zookeeper.serializeLastProcessedZxid.enabled";
+ private static boolean serializeLastProcessedZxidEnabled;
+
// Add a enable/disable option for now, we should remove this one when
// this feature is confirmed to be stable
public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled";
@@ -153,6 +156,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
closeSessionTxnEnabled = Boolean.parseBoolean(
System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true"));
LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled);
+
+ setSerializeLastProcessedZxidEnabled(Boolean.parseBoolean(
+ System.getProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true")));
}
// @VisibleForTesting
@@ -535,23 +541,46 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
takeSnapshot();
}
- public void takeSnapshot() {
+ public void takeSnapshot() throws IOException {
takeSnapshot(false);
}
- public void takeSnapshot(boolean syncSnap) {
+ public void takeSnapshot(boolean syncSnap) throws IOException {
+ takeSnapshot(syncSnap, true, false);
+ }
+
+ /**
+ * Takes a snapshot on the server.
+ *
+ * @param syncSnap syncSnap sync the snapshot immediately after write
+ * @param isSevere if true system exist, otherwise throw IOException
+ * @param fastForwardFromEdits whether fast forward database to the latest recorded transactions
+ *
+ * @return file snapshot file object
+ * @throws IOException
+ */
+ public synchronized File takeSnapshot(boolean syncSnap, boolean isSevere, boolean fastForwardFromEdits) throws IOException {
long start = Time.currentElapsedTime();
+ File snapFile = null;
try {
- txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
+ if (fastForwardFromEdits) {
+ zkDb.fastForwardDataBase();
+ }
+ snapFile = txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
} catch (IOException e) {
- LOG.error("Severe unrecoverable error, exiting", e);
- // This is a severe error that we cannot recover from,
- // so we need to exit
- ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue());
+ if (isSevere) {
+ LOG.error("Severe unrecoverable error, exiting", e);
+ // This is a severe error that we cannot recover from,
+ // so we need to exit
+ ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue());
+ } else {
+ throw e;
+ }
}
long elapsed = Time.currentElapsedTime() - start;
LOG.info("Snapshot taken in {} ms", elapsed);
ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed);
+ return snapFile;
}
public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() {
@@ -2139,6 +2168,15 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
ZooKeeperServer.digestEnabled = digestEnabled;
}
+ public static boolean isSerializeLastProcessedZxidEnabled() {
+ return serializeLastProcessedZxidEnabled;
+ }
+
+ public static void setSerializeLastProcessedZxidEnabled(boolean serializeLastZxidEnabled) {
+ serializeLastProcessedZxidEnabled = serializeLastZxidEnabled;
+ LOG.info("{} = {}", ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, serializeLastZxidEnabled);
+ }
+
/**
* Trim a path to get the immediate predecessor.
*
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandOutputter.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandOutputter.java
index a8fe8bdc0..b9a87830e 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandOutputter.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandOutputter.java
@@ -18,6 +18,7 @@
package org.apache.zookeeper.server.admin;
+import java.io.OutputStream;
import java.io.PrintWriter;
/**
@@ -31,6 +32,9 @@ public interface CommandOutputter {
/** The MIME type of this output (e.g., "application/json") */
String getContentType();
- void output(CommandResponse response, PrintWriter pw);
+ /** Print out data as output */
+ default void output(CommandResponse response, PrintWriter pw) {}
+ /** Stream out data as output */
+ default void output(final CommandResponse response, final OutputStream os) {}
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandResponse.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandResponse.java
index d9e7239a1..f47b4e942 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandResponse.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandResponse.java
@@ -17,8 +17,11 @@
package org.apache.zookeeper.server.admin;
+import java.io.InputStream;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
+import javax.servlet.http.HttpServletResponse;
/**
* A response from running a {@link Command}.
@@ -37,6 +40,9 @@ public class CommandResponse {
private final String command;
private final String error;
private final Map<String, Object> data;
+ private final Map<String, String> headers;
+ private int statusCode;
+ private InputStream inputStream;
/**
* Creates a new response with no error string.
@@ -44,18 +50,35 @@ public class CommandResponse {
* @param command command name
*/
public CommandResponse(String command) {
- this(command, null);
+ this(command, null, HttpServletResponse.SC_OK);
}
/**
* Creates a new response.
*
* @param command command name
* @param error error string (may be null)
+ * @param statusCode http status code
*/
- public CommandResponse(String command, String error) {
+ public CommandResponse(String command, String error, int statusCode) {
+ this(command, error, statusCode, null);
+ }
+
+
+ /**
+ * Creates a new response.
+ *
+ * @param command command name
+ * @param error error string (may be null)
+ * @param statusCode http status code
+ * @param inputStream inputStream to send out data (may be null)
+ */
+ public CommandResponse(final String command, final String error, final int statusCode, final InputStream inputStream) {
this.command = command;
this.error = error;
data = new LinkedHashMap<String, Object>();
+ headers = new HashMap<>();
+ this.statusCode = statusCode;
+ this.inputStream = inputStream;
}
/**
@@ -77,6 +100,38 @@ public class CommandResponse {
}
/**
+ * Gets the http status code
+ *
+ * @return http status code
+ */
+ public int getStatusCode() {
+ return statusCode;
+ }
+
+ /**
+ * Sets the http status code
+ */
+ public void setStatusCode(int statusCode) {
+ this.statusCode = statusCode;
+ }
+
+ /**
+ * Gets the InputStream (may be null).
+ *
+ * @return InputStream
+ */
+ public InputStream getInputStream() {
+ return inputStream;
+ }
+
+ /**
+ * Sets the InputStream
+ */
+ public void setInputStream(final InputStream inputStream) {
+ this.inputStream = inputStream;
+ }
+
+ /**
* Adds a key/value pair to this response.
*
* @param key key
@@ -97,6 +152,25 @@ public class CommandResponse {
}
/**
+ * Adds a header to this response.
+ *
+ * @param name name of the header
+ * @param value value of the header
+ */
+ public void addHeader(final String name, final String value) {
+ headers.put(name, value);
+ }
+
+ /**
+ * Returns all headers
+ *
+ * @return map representation of all headers
+ */
+ public Map<String, String> getHeaders() {
+ return headers;
+ }
+
+ /**
* Converts this response to a map. The returned map is mutable, and
* changes to it do not reflect back into this response.
*
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
index 236c7ec24..848583a0f 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
@@ -18,8 +18,11 @@
package org.apache.zookeeper.server.admin;
+import static org.apache.zookeeper.server.persistence.FileSnap.SNAPSHOT_FILE_PREFIX;
import com.fasterxml.jackson.annotation.JsonProperty;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.File;
+import java.io.FileInputStream;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
@@ -31,7 +34,9 @@ import java.util.Properties;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import javax.servlet.http.HttpServletResponse;
import org.apache.zookeeper.Environment;
import org.apache.zookeeper.Environment.Entry;
import org.apache.zookeeper.Version;
@@ -41,6 +46,7 @@ import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.server.persistence.SnapshotInfo;
+import org.apache.zookeeper.server.persistence.Util;
import org.apache.zookeeper.server.quorum.Follower;
import org.apache.zookeeper.server.quorum.FollowerZooKeeperServer;
import org.apache.zookeeper.server.quorum.Leader;
@@ -51,7 +57,9 @@ import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer;
import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.apache.zookeeper.server.util.RateLimiter;
import org.apache.zookeeper.server.util.ZxidUtils;
+import org.eclipse.jetty.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -104,10 +112,12 @@ public class Commands {
Map<String, String> kwargs) {
Command command = getCommand(cmdName);
if (command == null) {
- return new CommandResponse(cmdName, "Unknown command: " + cmdName);
+ // set the status code to 200 to keep the current behavior of existing commands
+ return new CommandResponse(cmdName, "Unknown command: " + cmdName, HttpServletResponse.SC_OK);
}
if (command.isServerRequired() && (zkServer == null || !zkServer.isRunning())) {
- return new CommandResponse(cmdName, "This ZooKeeper instance is not currently serving requests");
+ // set the status code to 200 to keep the current behavior of existing commands
+ return new CommandResponse(cmdName, "This ZooKeeper instance is not currently serving requests", HttpServletResponse.SC_OK);
}
return command.run(zkServer, kwargs);
}
@@ -144,6 +154,7 @@ public class Commands {
registerCommand(new ObserverCnxnStatResetCommand());
registerCommand(new RuokCommand());
registerCommand(new SetTraceMaskCommand());
+ registerCommand(new SnapshotCommand());
registerCommand(new SrvrCommand());
registerCommand(new StatCommand());
registerCommand(new StatResetCommand());
@@ -531,6 +542,93 @@ public class Commands {
}
/**
+ * Take a snapshot of current server and stream out the data.
+ *
+ * Argument:
+ * - "streaming": optional String to indicate whether streaming out data
+ *
+ * Returned snapshot as stream if streaming is true and metadata of the snapshot
+ * - "last_zxid": String
+ * - "snapshot_size": String
+ */
+ public static class SnapshotCommand extends CommandBase {
+ static final String REQUEST_QUERY_PARAM_STREAMING = "streaming";
+
+ static final String RESPONSE_HEADER_LAST_ZXID = "last_zxid";
+ static final String RESPONSE_HEADER_SNAPSHOT_SIZE = "snapshot_size";
+
+ static final String ADMIN_SNAPSHOT_ENABLED = "zookeeper.admin.snapshot.enabled";
+ static final String ADMIN_SNAPSHOT_INTERVAL = "zookeeper.admin.snapshot.intervalInMS";
+
+ private static final long snapshotInterval = Integer.parseInt(System.getProperty(ADMIN_SNAPSHOT_INTERVAL, "300000"));
+
+ private final RateLimiter rateLimiter;
+
+ public SnapshotCommand() {
+ super(Arrays.asList("snapshot", "snap"));
+ rateLimiter = new RateLimiter(1, snapshotInterval, TimeUnit.MICROSECONDS);
+ }
+
+ @SuppressFBWarnings(value = "OBL_UNSATISFIED_OBLIGATION",
+ justification = "FileInputStream is passed to CommandResponse and closed in StreamOutputter")
+ @Override
+ public CommandResponse run(final ZooKeeperServer zkServer, final Map<String, String> kwargs) {
+ final CommandResponse response = initializeResponse();
+
+ // check feature flag
+ final boolean snapshotEnabled = Boolean.parseBoolean(System.getProperty(ADMIN_SNAPSHOT_ENABLED, "false"));
+ if (!snapshotEnabled) {
+ response.setStatusCode(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
+ LOG.warn("Snapshot command is disabled");
+ return response;
+ }
+
+ if (!zkServer.isSerializeLastProcessedZxidEnabled()) {
+ response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ LOG.warn("Snapshot command requires serializeLastProcessedZxidEnable flag is set to true");
+ return response;
+ }
+
+ // check rate limiting
+ if (!rateLimiter.allow()) {
+ response.setStatusCode(HttpStatus.TOO_MANY_REQUESTS_429);
+ ServerMetrics.getMetrics().SNAPSHOT_RATE_LIMITED_COUNT.add(1);
+ LOG.warn("Snapshot request was rate limited");
+ return response;
+ }
+
+ // check the streaming query param
+ boolean streaming = true;
+ if (kwargs.containsKey(REQUEST_QUERY_PARAM_STREAMING)) {
+ streaming = Boolean.parseBoolean(kwargs.get(REQUEST_QUERY_PARAM_STREAMING));
+ }
+
+ // take snapshot and stream out data if needed
+ try {
+ final File snapshotFile = zkServer.takeSnapshot(false, false, true);
+ final long lastZxid = Util.getZxidFromName(snapshotFile.getName(), SNAPSHOT_FILE_PREFIX);
+ response.addHeader(RESPONSE_HEADER_LAST_ZXID, "0x" + ZxidUtils.zxidToString(lastZxid));
+
+ final long size = snapshotFile.length();
+ response.addHeader(RESPONSE_HEADER_SNAPSHOT_SIZE, String.valueOf(size));
+
+ if (size == 0) {
+ response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ ServerMetrics.getMetrics().SNAPSHOT_ERROR_COUNT.add(1);
+ LOG.warn("Snapshot file {} is empty", snapshotFile);
+ } else if (streaming) {
+ response.setInputStream(new FileInputStream(snapshotFile));
+ }
+ } catch (final Exception e) {
+ response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ ServerMetrics.getMetrics().SNAPSHOT_ERROR_COUNT.add(1);
+ LOG.warn("Exception occurred when taking the snapshot via the snapshot admin command", e);
+ }
+ return response;
+ }
+ }
+
+ /**
* Server information. Returned map contains:
* - "version": String
* version of server
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java
index 99241dcbd..3c82f8552 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java
@@ -33,6 +33,7 @@ import org.apache.zookeeper.common.QuorumX509Util;
import org.apache.zookeeper.common.SecretUtils;
import org.apache.zookeeper.common.X509Util;
import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.auth.IPAuthenticationProvider;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.security.ConstraintMapping;
import org.eclipse.jetty.security.ConstraintSecurityHandler;
@@ -259,15 +260,26 @@ public class JettyAdminServer implements AdminServer {
}
// Run the command
- CommandResponse cmdResponse = Commands.runCommand(cmd, zkServer, kwargs);
+ final CommandResponse cmdResponse = Commands.runCommand(cmd, zkServer, kwargs);
+ response.setStatus(cmdResponse.getStatusCode());
- // Format and print the output of the command
- CommandOutputter outputter = new JsonOutputter();
- response.setStatus(HttpServletResponse.SC_OK);
- response.setContentType(outputter.getContentType());
- outputter.output(cmdResponse, response.getWriter());
+ final Map<String, String> headers = cmdResponse.getHeaders();
+ for (final Map.Entry<String, String> header : headers.entrySet()) {
+ response.addHeader(header.getKey(), header.getValue());
+ }
+ final String clientIP = IPAuthenticationProvider.getClientIPAddress(request);
+ if (cmdResponse.getInputStream() == null) {
+ // Format and print the output of the command
+ CommandOutputter outputter = new JsonOutputter(clientIP);
+ response.setContentType(outputter.getContentType());
+ outputter.output(cmdResponse, response.getWriter());
+ } else {
+ // Stream out the output of the command
+ CommandOutputter outputter = new StreamOutputter(clientIP);
+ response.setContentType(outputter.getContentType());
+ outputter.output(cmdResponse, response.getOutputStream());
+ }
}
-
}
/**
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JsonOutputter.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JsonOutputter.java
index 7d9457453..44c88de9d 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JsonOutputter.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JsonOutputter.java
@@ -35,12 +35,14 @@ public class JsonOutputter implements CommandOutputter {
public static final String ERROR_RESPONSE = "{\"error\": \"Exception writing command response to JSON\"}";
private ObjectMapper mapper;
+ private final String clientIP;
- public JsonOutputter() {
+ public JsonOutputter(final String clientIP) {
mapper = new ObjectMapper();
mapper.configure(SerializationFeature.WRITE_ENUMS_USING_TO_STRING, true);
mapper.configure(SerializationFeature.INDENT_OUTPUT, true);
mapper.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
+ this.clientIP = clientIP;
}
@Override
@@ -59,7 +61,7 @@ public class JsonOutputter implements CommandOutputter {
LOG.warn("Exception writing command response to JSON:", e);
pw.write(ERROR_RESPONSE);
} catch (IOException e) {
- LOG.warn("Exception writing command response to JSON:", e);
+ LOG.warn("Exception writing command response as JSON to {}", clientIP, e);
pw.write(ERROR_RESPONSE);
}
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/StreamOutputter.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/StreamOutputter.java
new file mode 100644
index 000000000..e8f68e649
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/StreamOutputter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.admin;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.zookeeper.common.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A class for streaming data out.
+ */
+public class StreamOutputter implements CommandOutputter{
+ private static final Logger LOG = LoggerFactory.getLogger(StreamOutputter.class);
+ private final String clientIP;
+
+ public StreamOutputter(final String clientIP) {
+ this.clientIP = clientIP;
+ }
+
+ @Override
+ public String getContentType() {
+ return "application/octet-stream";
+ }
+
+ @Override
+ public void output(final CommandResponse response, final OutputStream os) {
+ try (final InputStream is = response.getInputStream()){
+ IOUtils.copyBytes(is, os, 1024, true);
+ } catch (final IOException e) {
+ LOG.error("Exception occurred when streaming out data to {}", clientIP, e);
+ }
+ }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java
index b93e55a32..9f6fb4005 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java
@@ -18,11 +18,14 @@
package org.apache.zookeeper.server.auth;
+import java.util.StringTokenizer;
+import javax.servlet.http.HttpServletRequest;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.server.ServerCnxn;
public class IPAuthenticationProvider implements AuthenticationProvider {
+ private static final String X_FORWARDED_FOR_HEADER_NAME = "X-Forwarded-For";
public String getScheme() {
return "ip";
@@ -128,4 +131,18 @@ public class IPAuthenticationProvider implements AuthenticationProvider {
return true;
}
+ /**
+ * Returns the HTTP(s) client IP address
+ * @param request HttpServletRequest
+ * @return IP address
+ */
+ public static String getClientIPAddress(final HttpServletRequest request) {
+ // to handle the case that a HTTP(s) client connects via a proxy or load balancer
+ final String xForwardedForHeader = request.getHeader(X_FORWARDED_FOR_HEADER_NAME);
+ if (xForwardedForHeader == null) {
+ return request.getRemoteAddr();
+ }
+ // the format of the field is: X-Forwarded-For: client, proxy1, proxy2 ...
+ return new StringTokenizer(xForwardedForHeader, ",").nextToken().trim();
+ }
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
index df8a59c1a..1a91d1c30 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
@@ -99,6 +99,11 @@ public class FileSnap implements SnapShot {
SnapStream.checkSealIntegrity(snapIS, ia);
}
+ // deserialize the last processed zxid and check the intact
+ if (dt.deserializeLastProcessedZxid(ia)) {
+ SnapStream.checkSealIntegrity(snapIS, ia);
+ }
+
foundValid = true;
break;
} catch (IOException e) {
@@ -255,6 +260,11 @@ public class FileSnap implements SnapShot {
SnapStream.sealStream(snapOS, oa);
}
+ // serialize the last processed zxid and add another CRC check
+ if (dt.serializeLastProcessedZxid(oa)) {
+ SnapStream.sealStream(snapOS, oa);
+ }
+
lastSnapshotInfo = new SnapshotInfo(
Util.getZxidFromName(snapShot.getName(), SNAPSHOT_FILE_PREFIX),
snapShot.lastModified() / 1000);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
index 403720bda..16f9cf71e 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
@@ -468,9 +468,10 @@ public class FileTxnSnapLog {
* @param sessionsWithTimeouts the session timeouts to be
* serialized onto disk
* @param syncSnap sync the snapshot immediately after write
+ * @return the snapshot file
* @throws IOException
*/
- public void save(
+ public File save(
DataTree dataTree,
ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
boolean syncSnap) throws IOException {
@@ -479,6 +480,7 @@ public class FileTxnSnapLog {
LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapshotFile);
try {
snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
+ return snapshotFile;
} catch (IOException e) {
if (snapshotFile.length() == 0) {
/* This may be caused by a full disk. In such a case, the server
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RateLimiter.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RateLimiter.java
new file mode 100644
index 000000000..cb9473306
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RateLimiter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.zookeeper.common.Time;
+
+/**
+ * A class that provides simple interval-based rate limiting implementation.
+ */
+public class RateLimiter {
+ private final int rate;
+ private final long intervalInMs;
+ private long lastTimeReset;
+ private final AtomicInteger remained;
+
+ public RateLimiter(final int rate, final long interval, final TimeUnit unit) {
+ this.rate = rate;
+ this.intervalInMs = unit.toMillis(interval);
+ this.lastTimeReset = Time.currentElapsedTime();
+ this.remained = new AtomicInteger(rate);
+ }
+
+ public boolean allow() {
+ final long now = Time.currentElapsedTime();
+
+ // reset the rate if interval passed
+ if (now > lastTimeReset + intervalInMs) {
+ remained.set(rate);
+ lastTimeReset = now;
+ }
+
+ int value = remained.get();
+ boolean allowed = false;
+
+ // to handle race condition
+ while (!allowed && value > 0) {
+ allowed = remained.compareAndSet(value, value - 1);
+ value = remained.get();
+ }
+ return allowed;
+ }
+}