diff options
Diffstat (limited to 'zookeeper-server/src/main/java/org')
13 files changed, 439 insertions, 22 deletions
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java index 2818e15aa..cd45a7c34 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java @@ -1745,6 +1745,42 @@ public class DataTree { } /** + * Serializes the lastProcessedZxid so we can get it from snapshot instead the snapshot file name. + * This is needed for performing snapshot and restore via admin server commands. + * + * @param oa the output stream to write to + * @return true if the lastProcessedZxid is serialized successfully, otherwise false + * @throws IOException if there is an I/O error + */ + public boolean serializeLastProcessedZxid(final OutputArchive oa) throws IOException { + if (!ZooKeeperServer.isSerializeLastProcessedZxidEnabled()) { + return false; + } + oa.writeLong(lastProcessedZxid, "lastZxid"); + return true; + } + + /** + * Deserializes the lastProcessedZxid from the input stream and updates the lastProcessedZxid field. + * + * @param ia the input stream to read from + * @return true if lastProcessedZxid is deserialized successfully, otherwise false + * @throws IOException if there is an I/O error + */ + public boolean deserializeLastProcessedZxid(final InputArchive ia) throws IOException { + if (!ZooKeeperServer.isSerializeLastProcessedZxidEnabled()) { + return false; + } + try { + lastProcessedZxid = ia.readLong("lastZxid"); + } catch (final EOFException e) { + LOG.warn("Got EOFException while reading the last processed zxid, likely due to reading an older snapshot."); + return false; + } + return true; + } + + /** * Compares the actual tree's digest with that in the snapshot. * Resets digestFromLoadedSnapshot after comparision. * diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java index af156542f..ef28e32e1 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java @@ -72,6 +72,8 @@ public final class ServerMetrics { FSYNC_TIME = metricsContext.getSummary("fsynctime", DetailLevel.BASIC); SNAPSHOT_TIME = metricsContext.getSummary("snapshottime", DetailLevel.BASIC); + SNAPSHOT_ERROR_COUNT = metricsContext.getCounter("snapshot_error_count"); + SNAPSHOT_RATE_LIMITED_COUNT = metricsContext.getCounter("snapshot_rate_limited_count"); DB_INIT_TIME = metricsContext.getSummary("dbinittime", DetailLevel.BASIC); READ_LATENCY = metricsContext.getSummary("readlatency", DetailLevel.ADVANCED); UPDATE_LATENCY = metricsContext.getSummary("updatelatency", DetailLevel.ADVANCED); @@ -277,6 +279,16 @@ public final class ServerMetrics { public final Summary SNAPSHOT_TIME; /** + * Snapshot error count + */ + public final Counter SNAPSHOT_ERROR_COUNT; + + /** + * Snapshot rate limited count + */ + public final Counter SNAPSHOT_RATE_LIMITED_COUNT; + + /** * Db init time (snapshot loading + txnlog replay) */ public final Summary DB_INIT_TIME; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index f6c2b93eb..d61f269c9 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -120,6 +120,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled"; private static boolean digestEnabled; + public static final String ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED = "zookeeper.serializeLastProcessedZxid.enabled"; + private static boolean serializeLastProcessedZxidEnabled; + // Add a enable/disable option for now, we should remove this one when // this feature is confirmed to be stable public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled"; @@ -153,6 +156,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { closeSessionTxnEnabled = Boolean.parseBoolean( System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true")); LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled); + + setSerializeLastProcessedZxidEnabled(Boolean.parseBoolean( + System.getProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true"))); } // @VisibleForTesting @@ -535,23 +541,46 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { takeSnapshot(); } - public void takeSnapshot() { + public void takeSnapshot() throws IOException { takeSnapshot(false); } - public void takeSnapshot(boolean syncSnap) { + public void takeSnapshot(boolean syncSnap) throws IOException { + takeSnapshot(syncSnap, true, false); + } + + /** + * Takes a snapshot on the server. + * + * @param syncSnap syncSnap sync the snapshot immediately after write + * @param isSevere if true system exist, otherwise throw IOException + * @param fastForwardFromEdits whether fast forward database to the latest recorded transactions + * + * @return file snapshot file object + * @throws IOException + */ + public synchronized File takeSnapshot(boolean syncSnap, boolean isSevere, boolean fastForwardFromEdits) throws IOException { long start = Time.currentElapsedTime(); + File snapFile = null; try { - txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap); + if (fastForwardFromEdits) { + zkDb.fastForwardDataBase(); + } + snapFile = txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap); } catch (IOException e) { - LOG.error("Severe unrecoverable error, exiting", e); - // This is a severe error that we cannot recover from, - // so we need to exit - ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue()); + if (isSevere) { + LOG.error("Severe unrecoverable error, exiting", e); + // This is a severe error that we cannot recover from, + // so we need to exit + ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue()); + } else { + throw e; + } } long elapsed = Time.currentElapsedTime() - start; LOG.info("Snapshot taken in {} ms", elapsed); ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed); + return snapFile; } public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() { @@ -2139,6 +2168,15 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { ZooKeeperServer.digestEnabled = digestEnabled; } + public static boolean isSerializeLastProcessedZxidEnabled() { + return serializeLastProcessedZxidEnabled; + } + + public static void setSerializeLastProcessedZxidEnabled(boolean serializeLastZxidEnabled) { + serializeLastProcessedZxidEnabled = serializeLastZxidEnabled; + LOG.info("{} = {}", ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, serializeLastZxidEnabled); + } + /** * Trim a path to get the immediate predecessor. * diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandOutputter.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandOutputter.java index a8fe8bdc0..b9a87830e 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandOutputter.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandOutputter.java @@ -18,6 +18,7 @@ package org.apache.zookeeper.server.admin; +import java.io.OutputStream; import java.io.PrintWriter; /** @@ -31,6 +32,9 @@ public interface CommandOutputter { /** The MIME type of this output (e.g., "application/json") */ String getContentType(); - void output(CommandResponse response, PrintWriter pw); + /** Print out data as output */ + default void output(CommandResponse response, PrintWriter pw) {} + /** Stream out data as output */ + default void output(final CommandResponse response, final OutputStream os) {} } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandResponse.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandResponse.java index d9e7239a1..f47b4e942 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandResponse.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandResponse.java @@ -17,8 +17,11 @@ package org.apache.zookeeper.server.admin; +import java.io.InputStream; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; +import javax.servlet.http.HttpServletResponse; /** * A response from running a {@link Command}. @@ -37,6 +40,9 @@ public class CommandResponse { private final String command; private final String error; private final Map<String, Object> data; + private final Map<String, String> headers; + private int statusCode; + private InputStream inputStream; /** * Creates a new response with no error string. @@ -44,18 +50,35 @@ public class CommandResponse { * @param command command name */ public CommandResponse(String command) { - this(command, null); + this(command, null, HttpServletResponse.SC_OK); } /** * Creates a new response. * * @param command command name * @param error error string (may be null) + * @param statusCode http status code */ - public CommandResponse(String command, String error) { + public CommandResponse(String command, String error, int statusCode) { + this(command, error, statusCode, null); + } + + + /** + * Creates a new response. + * + * @param command command name + * @param error error string (may be null) + * @param statusCode http status code + * @param inputStream inputStream to send out data (may be null) + */ + public CommandResponse(final String command, final String error, final int statusCode, final InputStream inputStream) { this.command = command; this.error = error; data = new LinkedHashMap<String, Object>(); + headers = new HashMap<>(); + this.statusCode = statusCode; + this.inputStream = inputStream; } /** @@ -77,6 +100,38 @@ public class CommandResponse { } /** + * Gets the http status code + * + * @return http status code + */ + public int getStatusCode() { + return statusCode; + } + + /** + * Sets the http status code + */ + public void setStatusCode(int statusCode) { + this.statusCode = statusCode; + } + + /** + * Gets the InputStream (may be null). + * + * @return InputStream + */ + public InputStream getInputStream() { + return inputStream; + } + + /** + * Sets the InputStream + */ + public void setInputStream(final InputStream inputStream) { + this.inputStream = inputStream; + } + + /** * Adds a key/value pair to this response. * * @param key key @@ -97,6 +152,25 @@ public class CommandResponse { } /** + * Adds a header to this response. + * + * @param name name of the header + * @param value value of the header + */ + public void addHeader(final String name, final String value) { + headers.put(name, value); + } + + /** + * Returns all headers + * + * @return map representation of all headers + */ + public Map<String, String> getHeaders() { + return headers; + } + + /** * Converts this response to a map. The returned map is mutable, and * changes to it do not reflect back into this response. * diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java index 236c7ec24..848583a0f 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java @@ -18,8 +18,11 @@ package org.apache.zookeeper.server.admin; +import static org.apache.zookeeper.server.persistence.FileSnap.SNAPSHOT_FILE_PREFIX; import com.fasterxml.jackson.annotation.JsonProperty; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.File; +import java.io.FileInputStream; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collections; @@ -31,7 +34,9 @@ import java.util.Properties; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import javax.servlet.http.HttpServletResponse; import org.apache.zookeeper.Environment; import org.apache.zookeeper.Environment.Entry; import org.apache.zookeeper.Version; @@ -41,6 +46,7 @@ import org.apache.zookeeper.server.ServerMetrics; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.ZooTrace; import org.apache.zookeeper.server.persistence.SnapshotInfo; +import org.apache.zookeeper.server.persistence.Util; import org.apache.zookeeper.server.quorum.Follower; import org.apache.zookeeper.server.quorum.FollowerZooKeeperServer; import org.apache.zookeeper.server.quorum.Leader; @@ -51,7 +57,9 @@ import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer; import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; +import org.apache.zookeeper.server.util.RateLimiter; import org.apache.zookeeper.server.util.ZxidUtils; +import org.eclipse.jetty.http.HttpStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,10 +112,12 @@ public class Commands { Map<String, String> kwargs) { Command command = getCommand(cmdName); if (command == null) { - return new CommandResponse(cmdName, "Unknown command: " + cmdName); + // set the status code to 200 to keep the current behavior of existing commands + return new CommandResponse(cmdName, "Unknown command: " + cmdName, HttpServletResponse.SC_OK); } if (command.isServerRequired() && (zkServer == null || !zkServer.isRunning())) { - return new CommandResponse(cmdName, "This ZooKeeper instance is not currently serving requests"); + // set the status code to 200 to keep the current behavior of existing commands + return new CommandResponse(cmdName, "This ZooKeeper instance is not currently serving requests", HttpServletResponse.SC_OK); } return command.run(zkServer, kwargs); } @@ -144,6 +154,7 @@ public class Commands { registerCommand(new ObserverCnxnStatResetCommand()); registerCommand(new RuokCommand()); registerCommand(new SetTraceMaskCommand()); + registerCommand(new SnapshotCommand()); registerCommand(new SrvrCommand()); registerCommand(new StatCommand()); registerCommand(new StatResetCommand()); @@ -531,6 +542,93 @@ public class Commands { } /** + * Take a snapshot of current server and stream out the data. + * + * Argument: + * - "streaming": optional String to indicate whether streaming out data + * + * Returned snapshot as stream if streaming is true and metadata of the snapshot + * - "last_zxid": String + * - "snapshot_size": String + */ + public static class SnapshotCommand extends CommandBase { + static final String REQUEST_QUERY_PARAM_STREAMING = "streaming"; + + static final String RESPONSE_HEADER_LAST_ZXID = "last_zxid"; + static final String RESPONSE_HEADER_SNAPSHOT_SIZE = "snapshot_size"; + + static final String ADMIN_SNAPSHOT_ENABLED = "zookeeper.admin.snapshot.enabled"; + static final String ADMIN_SNAPSHOT_INTERVAL = "zookeeper.admin.snapshot.intervalInMS"; + + private static final long snapshotInterval = Integer.parseInt(System.getProperty(ADMIN_SNAPSHOT_INTERVAL, "300000")); + + private final RateLimiter rateLimiter; + + public SnapshotCommand() { + super(Arrays.asList("snapshot", "snap")); + rateLimiter = new RateLimiter(1, snapshotInterval, TimeUnit.MICROSECONDS); + } + + @SuppressFBWarnings(value = "OBL_UNSATISFIED_OBLIGATION", + justification = "FileInputStream is passed to CommandResponse and closed in StreamOutputter") + @Override + public CommandResponse run(final ZooKeeperServer zkServer, final Map<String, String> kwargs) { + final CommandResponse response = initializeResponse(); + + // check feature flag + final boolean snapshotEnabled = Boolean.parseBoolean(System.getProperty(ADMIN_SNAPSHOT_ENABLED, "false")); + if (!snapshotEnabled) { + response.setStatusCode(HttpServletResponse.SC_SERVICE_UNAVAILABLE); + LOG.warn("Snapshot command is disabled"); + return response; + } + + if (!zkServer.isSerializeLastProcessedZxidEnabled()) { + response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + LOG.warn("Snapshot command requires serializeLastProcessedZxidEnable flag is set to true"); + return response; + } + + // check rate limiting + if (!rateLimiter.allow()) { + response.setStatusCode(HttpStatus.TOO_MANY_REQUESTS_429); + ServerMetrics.getMetrics().SNAPSHOT_RATE_LIMITED_COUNT.add(1); + LOG.warn("Snapshot request was rate limited"); + return response; + } + + // check the streaming query param + boolean streaming = true; + if (kwargs.containsKey(REQUEST_QUERY_PARAM_STREAMING)) { + streaming = Boolean.parseBoolean(kwargs.get(REQUEST_QUERY_PARAM_STREAMING)); + } + + // take snapshot and stream out data if needed + try { + final File snapshotFile = zkServer.takeSnapshot(false, false, true); + final long lastZxid = Util.getZxidFromName(snapshotFile.getName(), SNAPSHOT_FILE_PREFIX); + response.addHeader(RESPONSE_HEADER_LAST_ZXID, "0x" + ZxidUtils.zxidToString(lastZxid)); + + final long size = snapshotFile.length(); + response.addHeader(RESPONSE_HEADER_SNAPSHOT_SIZE, String.valueOf(size)); + + if (size == 0) { + response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + ServerMetrics.getMetrics().SNAPSHOT_ERROR_COUNT.add(1); + LOG.warn("Snapshot file {} is empty", snapshotFile); + } else if (streaming) { + response.setInputStream(new FileInputStream(snapshotFile)); + } + } catch (final Exception e) { + response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + ServerMetrics.getMetrics().SNAPSHOT_ERROR_COUNT.add(1); + LOG.warn("Exception occurred when taking the snapshot via the snapshot admin command", e); + } + return response; + } + } + + /** * Server information. Returned map contains: * - "version": String * version of server diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java index 99241dcbd..3c82f8552 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java @@ -33,6 +33,7 @@ import org.apache.zookeeper.common.QuorumX509Util; import org.apache.zookeeper.common.SecretUtils; import org.apache.zookeeper.common.X509Util; import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.auth.IPAuthenticationProvider; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.security.ConstraintMapping; import org.eclipse.jetty.security.ConstraintSecurityHandler; @@ -259,15 +260,26 @@ public class JettyAdminServer implements AdminServer { } // Run the command - CommandResponse cmdResponse = Commands.runCommand(cmd, zkServer, kwargs); + final CommandResponse cmdResponse = Commands.runCommand(cmd, zkServer, kwargs); + response.setStatus(cmdResponse.getStatusCode()); - // Format and print the output of the command - CommandOutputter outputter = new JsonOutputter(); - response.setStatus(HttpServletResponse.SC_OK); - response.setContentType(outputter.getContentType()); - outputter.output(cmdResponse, response.getWriter()); + final Map<String, String> headers = cmdResponse.getHeaders(); + for (final Map.Entry<String, String> header : headers.entrySet()) { + response.addHeader(header.getKey(), header.getValue()); + } + final String clientIP = IPAuthenticationProvider.getClientIPAddress(request); + if (cmdResponse.getInputStream() == null) { + // Format and print the output of the command + CommandOutputter outputter = new JsonOutputter(clientIP); + response.setContentType(outputter.getContentType()); + outputter.output(cmdResponse, response.getWriter()); + } else { + // Stream out the output of the command + CommandOutputter outputter = new StreamOutputter(clientIP); + response.setContentType(outputter.getContentType()); + outputter.output(cmdResponse, response.getOutputStream()); + } } - } /** diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JsonOutputter.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JsonOutputter.java index 7d9457453..44c88de9d 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JsonOutputter.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JsonOutputter.java @@ -35,12 +35,14 @@ public class JsonOutputter implements CommandOutputter { public static final String ERROR_RESPONSE = "{\"error\": \"Exception writing command response to JSON\"}"; private ObjectMapper mapper; + private final String clientIP; - public JsonOutputter() { + public JsonOutputter(final String clientIP) { mapper = new ObjectMapper(); mapper.configure(SerializationFeature.WRITE_ENUMS_USING_TO_STRING, true); mapper.configure(SerializationFeature.INDENT_OUTPUT, true); mapper.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE); + this.clientIP = clientIP; } @Override @@ -59,7 +61,7 @@ public class JsonOutputter implements CommandOutputter { LOG.warn("Exception writing command response to JSON:", e); pw.write(ERROR_RESPONSE); } catch (IOException e) { - LOG.warn("Exception writing command response to JSON:", e); + LOG.warn("Exception writing command response as JSON to {}", clientIP, e); pw.write(ERROR_RESPONSE); } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/StreamOutputter.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/StreamOutputter.java new file mode 100644 index 000000000..e8f68e649 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/StreamOutputter.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.admin; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.zookeeper.common.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A class for streaming data out. + */ +public class StreamOutputter implements CommandOutputter{ + private static final Logger LOG = LoggerFactory.getLogger(StreamOutputter.class); + private final String clientIP; + + public StreamOutputter(final String clientIP) { + this.clientIP = clientIP; + } + + @Override + public String getContentType() { + return "application/octet-stream"; + } + + @Override + public void output(final CommandResponse response, final OutputStream os) { + try (final InputStream is = response.getInputStream()){ + IOUtils.copyBytes(is, os, 1024, true); + } catch (final IOException e) { + LOG.error("Exception occurred when streaming out data to {}", clientIP, e); + } + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java index b93e55a32..9f6fb4005 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java @@ -18,11 +18,14 @@ package org.apache.zookeeper.server.auth; +import java.util.StringTokenizer; +import javax.servlet.http.HttpServletRequest; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.server.ServerCnxn; public class IPAuthenticationProvider implements AuthenticationProvider { + private static final String X_FORWARDED_FOR_HEADER_NAME = "X-Forwarded-For"; public String getScheme() { return "ip"; @@ -128,4 +131,18 @@ public class IPAuthenticationProvider implements AuthenticationProvider { return true; } + /** + * Returns the HTTP(s) client IP address + * @param request HttpServletRequest + * @return IP address + */ + public static String getClientIPAddress(final HttpServletRequest request) { + // to handle the case that a HTTP(s) client connects via a proxy or load balancer + final String xForwardedForHeader = request.getHeader(X_FORWARDED_FOR_HEADER_NAME); + if (xForwardedForHeader == null) { + return request.getRemoteAddr(); + } + // the format of the field is: X-Forwarded-For: client, proxy1, proxy2 ... + return new StringTokenizer(xForwardedForHeader, ",").nextToken().trim(); + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java index df8a59c1a..1a91d1c30 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java @@ -99,6 +99,11 @@ public class FileSnap implements SnapShot { SnapStream.checkSealIntegrity(snapIS, ia); } + // deserialize the last processed zxid and check the intact + if (dt.deserializeLastProcessedZxid(ia)) { + SnapStream.checkSealIntegrity(snapIS, ia); + } + foundValid = true; break; } catch (IOException e) { @@ -255,6 +260,11 @@ public class FileSnap implements SnapShot { SnapStream.sealStream(snapOS, oa); } + // serialize the last processed zxid and add another CRC check + if (dt.serializeLastProcessedZxid(oa)) { + SnapStream.sealStream(snapOS, oa); + } + lastSnapshotInfo = new SnapshotInfo( Util.getZxidFromName(snapShot.getName(), SNAPSHOT_FILE_PREFIX), snapShot.lastModified() / 1000); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java index 403720bda..16f9cf71e 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java @@ -468,9 +468,10 @@ public class FileTxnSnapLog { * @param sessionsWithTimeouts the session timeouts to be * serialized onto disk * @param syncSnap sync the snapshot immediately after write + * @return the snapshot file * @throws IOException */ - public void save( + public File save( DataTree dataTree, ConcurrentHashMap<Long, Integer> sessionsWithTimeouts, boolean syncSnap) throws IOException { @@ -479,6 +480,7 @@ public class FileTxnSnapLog { LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapshotFile); try { snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap); + return snapshotFile; } catch (IOException e) { if (snapshotFile.length() == 0) { /* This may be caused by a full disk. In such a case, the server diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RateLimiter.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RateLimiter.java new file mode 100644 index 000000000..cb9473306 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RateLimiter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.util; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.zookeeper.common.Time; + +/** + * A class that provides simple interval-based rate limiting implementation. + */ +public class RateLimiter { + private final int rate; + private final long intervalInMs; + private long lastTimeReset; + private final AtomicInteger remained; + + public RateLimiter(final int rate, final long interval, final TimeUnit unit) { + this.rate = rate; + this.intervalInMs = unit.toMillis(interval); + this.lastTimeReset = Time.currentElapsedTime(); + this.remained = new AtomicInteger(rate); + } + + public boolean allow() { + final long now = Time.currentElapsedTime(); + + // reset the rate if interval passed + if (now > lastTimeReset + intervalInMs) { + remained.set(rate); + lastTimeReset = now; + } + + int value = remained.get(); + boolean allowed = false; + + // to handle race condition + while (!allowed && value > 0) { + allowed = remained.compareAndSet(value, value - 1); + value = remained.get(); + } + return allowed; + } +} |