summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKezhu Wang <kezhuw@gmail.com>2022-09-25 16:21:45 +0800
committermaoling <maoling@apache.org>2022-09-25 16:21:45 +0800
commitbc1b231c9e32667b2978c86a6a64833470973dbd (patch)
treea359f15641eea2408d159361338ca89925426a44
parent9b6ec9060604347fc996fe69dc33a21222fbd0c4 (diff)
downloadzookeeper-bc1b231c9e32667b2978c86a6a64833470973dbd.tar.gz
ZOOKEEPER-4327: Fix flaky RequestThrottlerTest
This PR tries to fix several test failures in `RequestThrottlerTest`. First, `RequestThrottlerTest#testDropStaleRequests`. Place `Thread.sleep(200)` after `submittedRequests.take()` in `RequestThrottler#run` will fail two assertions: 1. `assertEquals(2L, (long) metrics.get("prep_processor_request_queued"))` 2. `assertEquals(1L, (long) metrics.get("request_throttle_wait_count"))` This happens due to `setStale` chould happen before throttle handling. This commit solves this by introducing an interception point `RequestThrottler.throttleSleep` to build happen-before relations: 1. `throttling.countDown` happens before `setStale`, this ensures that unthrottled request are processed as usual. 2. `setStale` happens before `throttled.await`, this defends `RequestThrottler.throttleSleep` against spurious wakeup. Second, `RequestThrottlerTest#testRequestThrottler`. * `RequestThrottlerTest.testRequestThrottler:197 expected: <2> but was: <1>` `ZooKeeperServer#submitRequest` and `PrepRequestProcessor#processRequest` run in different threads, thus there is no guarantee on metric `prep_processor_request_queued` after `submitted.await(5, TimeUnit.SECONDS)`. Place `Thread.sleep(200)` before `zks.submitRequestNow(request)` in `RequestThrottler#run` will incur this failure. * `RequestThrottlerTest.testRequestThrottler:206 expected: <5> but was: <4>` `entered.await(STALL_TIME, TimeUnit.MILLISECONDS)` could return `false` due to almost same timeout as `RequestThrottler#throttleSleep`. Place `Thread.sleep(500)` around `throttleSleep` will increase failure possibility. Third, `RequestThrottlerTest#testGlobalOutstandingRequestThrottlingWithRequestThrottlerDisabled`. * `RequestThrottlerTest.testGlobalOutstandingRequestThrottlingWithRequestThrottlerDisabled:340 expected: <3> but was: <4>` `ZooKeeperServer#shouldThrottle` depends on consistent sum of `getInflight` and `getInProcess`. But it is no true. Place `Thread.sleep(200)` before `zks.submitRequestNow(request)` in `RequestThrottler#run` could reproduce this. Sees also https://github.com/apache/zookeeper/pull/1739, https://github.com/apache/zookeeper/pull/1821. Author: Kezhu Wang <kezhuw@gmail.com> Reviewers: Mate Szalay-Beko <symat@apache.org>, maoling <maoling@apache.org> Closes #1887 from kezhuw/ZOOKEEPER-4327-flaky-RequestThrottlerTest.testDropStaleRequests
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java12
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java5
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java66
3 files changed, 64 insertions, 19 deletions
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java
index d60efa087..4a401e5b9 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java
@@ -195,13 +195,11 @@ public class RequestThrottler extends ZooKeeperCriticalThread {
LOG.info("RequestThrottler shutdown. Dropped {} requests", dropped);
}
- private synchronized void throttleSleep(int stallTime) {
- try {
- ServerMetrics.getMetrics().REQUEST_THROTTLE_WAIT_COUNT.add(1);
- this.wait(stallTime);
- } catch (InterruptedException ie) {
- return;
- }
+
+ // @VisibleForTesting
+ synchronized void throttleSleep(int stallTime) throws InterruptedException {
+ ServerMetrics.getMetrics().REQUEST_THROTTLE_WAIT_COUNT.add(1);
+ this.wait(stallTime);
}
@SuppressFBWarnings(value = "NN_NAKED_NOTIFY", justification = "state change is in ZooKeeperServer.decInProgress() ")
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 0303ca645..817e84b3e 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -749,9 +749,12 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
protected void startRequestThrottler() {
- requestThrottler = new RequestThrottler(this);
+ requestThrottler = createRequestThrottler();
requestThrottler.start();
+ }
+ protected RequestThrottler createRequestThrottler() {
+ return new RequestThrottler(this);
}
protected void setupRequestProcessors() {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java
index ed2239990..152592075 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java
@@ -67,11 +67,17 @@ public class RequestThrottlerTest extends ZKTestCase {
CountDownLatch disconnected = null;
+ CountDownLatch throttled = null;
+ CountDownLatch throttling = null;
+
ZooKeeperServer zks = null;
ServerCnxnFactory f = null;
ZooKeeper zk = null;
int connectionLossCount = 0;
+ private long getCounterMetric(String name) {
+ return (long) MetricsUtils.currentServerMetrics().get(name);
+ }
@BeforeEach
public void setup() throws Exception {
@@ -116,6 +122,11 @@ public class RequestThrottlerTest extends ZKTestCase {
}
@Override
+ protected RequestThrottler createRequestThrottler() {
+ return new TestRequestThrottler(this);
+ }
+
+ @Override
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
@@ -141,6 +152,24 @@ public class RequestThrottlerTest extends ZKTestCase {
}
}
+ class TestRequestThrottler extends RequestThrottler {
+ public TestRequestThrottler(ZooKeeperServer zks) {
+ super(zks);
+ }
+
+ @Override
+ synchronized void throttleSleep(int stallTime) throws InterruptedException {
+ if (throttling != null) {
+ throttling.countDown();
+ }
+ super.throttleSleep(stallTime);
+ // Defend against unstable timing and potential spurious wakeup.
+ if (throttled != null) {
+ assertTrue(throttled.await(20, TimeUnit.SECONDS));
+ }
+ }
+ }
+
class TestPrepRequestProcessor extends PrepRequestProcessor {
public TestPrepRequestProcessor(ZooKeeperServer zks, RequestProcessor syncProcessor) {
@@ -191,18 +220,22 @@ public class RequestThrottlerTest extends ZKTestCase {
// make sure the server received all 5 requests
submitted.await(5, TimeUnit.SECONDS);
- Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
// but only two requests can get into the pipeline because of the throttler
- assertEquals(2L, (long) metrics.get("prep_processor_request_queued"));
- assertEquals(1L, (long) metrics.get("request_throttle_wait_count"));
+ WaitForCondition requestQueued = () -> getCounterMetric("prep_processor_request_queued") == 2;
+ waitFor("request not queued", requestQueued, 5);
+
+ WaitForCondition throttleWait = () -> getCounterMetric("request_throttle_wait_count") >= 1;
+ waitFor("no throttle wait", throttleWait, 5);
// let the requests go through the pipeline and the throttler will be waken up to allow more requests
// to enter the pipeline
resumeProcess.countDown();
- entered.await(STALL_TIME, TimeUnit.MILLISECONDS);
- metrics = MetricsUtils.currentServerMetrics();
+ // wait for more than one STALL_TIME to reduce timeout before wakeup
+ assertTrue(entered.await(STALL_TIME + 5000, TimeUnit.MILLISECONDS));
+
+ Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
assertEquals(TOTAL_REQUESTS, (long) metrics.get("prep_processor_request_queued"));
}
@@ -221,6 +254,9 @@ public class RequestThrottlerTest extends ZKTestCase {
resumeProcess = new CountDownLatch(1);
submitted = new CountDownLatch(TOTAL_REQUESTS);
+ throttled = new CountDownLatch(1);
+ throttling = new CountDownLatch(1);
+
// send 5 requests asynchronously
for (int i = 0; i < TOTAL_REQUESTS; i++) {
zk.create("/request_throttle_test- " + i, ("/request_throttle_test- "
@@ -231,11 +267,18 @@ public class RequestThrottlerTest extends ZKTestCase {
// make sure the server received all 5 requests
assertTrue(submitted.await(5, TimeUnit.SECONDS));
+ // stale throttled requests
+ assertTrue(throttling.await(5, TimeUnit.SECONDS));
for (ServerCnxn cnxn : f.cnxns) {
cnxn.setStale();
}
+ throttled.countDown();
zk = null;
+ // only first three requests are counted as finished
+ finished = new CountDownLatch(3);
+
+ // let the requests go through the pipeline
resumeProcess.countDown();
LOG.info("raise the latch");
@@ -243,6 +286,8 @@ public class RequestThrottlerTest extends ZKTestCase {
Thread.sleep(50);
}
+ assertTrue(finished.await(5, TimeUnit.SECONDS));
+
// assert after all requests processed to avoid concurrent issues as metrics are
// counted in different threads.
Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
@@ -327,7 +372,6 @@ public class RequestThrottlerTest extends ZKTestCase {
RequestThrottler.setMaxRequests(0);
resumeProcess = new CountDownLatch(1);
int totalRequests = 10;
- submitted = new CountDownLatch(totalRequests);
for (int i = 0; i < totalRequests; i++) {
zk.create("/request_throttle_test- " + i, ("/request_throttle_test- "
@@ -335,16 +379,16 @@ public class RequestThrottlerTest extends ZKTestCase {
}, null);
}
- submitted.await(5, TimeUnit.SECONDS);
-
// We should start throttling instead of queuing more requests.
//
// We always allow up to GLOBAL_OUTSTANDING_LIMIT + 1 number of requests coming in request processing pipeline
// before throttling. For the next request, we will throttle by disabling receiving future requests but we still
- // allow this single request coming in. So the total number of queued requests in processing pipeline would
+ // allow this single request coming in. Ideally, the total number of queued requests in processing pipeline would
// be GLOBAL_OUTSTANDING_LIMIT + 2.
- assertEquals(Integer.parseInt(GLOBAL_OUTSTANDING_LIMIT) + 2,
- (long) MetricsUtils.currentServerMetrics().get("prep_processor_request_queued"));
+ //
+ // But due to leak of consistent view of number of outstanding requests, the number could be larger.
+ WaitForCondition requestQueued = () -> getCounterMetric("prep_processor_request_queued") >= Integer.parseInt(GLOBAL_OUTSTANDING_LIMIT) + 2;
+ waitFor("no enough requests queued", requestQueued, 5);
resumeProcess.countDown();
} catch (Exception e) {