summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKezhu Wang <kezhuw@gmail.com>2023-05-05 15:58:42 +0800
committerGitHub <noreply@github.com>2023-05-05 15:58:42 +0800
commita64dbf5b06ca1a73dc2ad6c7d31e679e312082aa (patch)
tree4453eebb9d0feae83f25fb926851ad459fba86c3
parent89c1831f84891f425f1fa9224210587124f1c1ec (diff)
downloadzookeeper-a64dbf5b06ca1a73dc2ad6c7d31e679e312082aa.tar.gz
ZOOKEEPER-4466: Support different watch modes on same path (#1859)HEADmaster
Signed-off-by: Kezhu Wang <kezhuw@gmail.com> Co-authored-by: tison <wander4096@gmail.com>
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java5
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java9
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java115
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchStats.java89
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherMode.java2
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherModeManager.java96
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/RecursiveWatchQtyTest.java50
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java48
8 files changed, 219 insertions, 195 deletions
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
index a6f605390..603cb0b38 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
@@ -675,7 +675,9 @@ public class DataTree {
public void addWatch(String basePath, Watcher watcher, int mode) {
WatcherMode watcherMode = WatcherMode.fromZooDef(mode);
dataWatches.addWatch(basePath, watcher, watcherMode);
- childWatches.addWatch(basePath, watcher, watcherMode);
+ if (watcherMode != WatcherMode.PERSISTENT_RECURSIVE) {
+ childWatches.addWatch(basePath, watcher, watcherMode);
+ }
}
public byte[] getData(String path, Stat stat, Watcher watcher) throws NoNodeException {
@@ -1511,7 +1513,6 @@ public class DataTree {
this.dataWatches.addWatch(path, watcher, WatcherMode.PERSISTENT);
}
for (String path : persistentRecursiveWatches) {
- this.childWatches.addWatch(path, watcher, WatcherMode.PERSISTENT_RECURSIVE);
this.dataWatches.addWatch(path, watcher, WatcherMode.PERSISTENT_RECURSIVE);
}
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java
index 1bc44c805..4eea5eca0 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java
@@ -144,13 +144,4 @@ public interface IWatchManager {
*
*/
void dumpWatches(PrintWriter pwriter, boolean byPath);
-
- /**
- * Return the current number of recursive watchers
- *
- * @return qty
- */
- default int getRecursiveWatchQty() {
- return 0;
- }
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java
index c5b133059..c85c3d846 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java
@@ -19,6 +19,7 @@
package org.apache.zookeeper.server.watch;
import java.io.PrintWriter;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -45,9 +46,9 @@ public class WatchManager implements IWatchManager {
private final Map<String, Set<Watcher>> watchTable = new HashMap<>();
- private final Map<Watcher, Set<String>> watch2Paths = new HashMap<>();
+ private final Map<Watcher, Map<String, WatchStats>> watch2Paths = new HashMap<>();
- private final WatcherModeManager watcherModeManager = new WatcherModeManager();
+ private int recursiveWatchQty = 0;
@Override
public synchronized int size() {
@@ -84,25 +85,34 @@ public class WatchManager implements IWatchManager {
}
list.add(watcher);
- Set<String> paths = watch2Paths.get(watcher);
+ Map<String, WatchStats> paths = watch2Paths.get(watcher);
if (paths == null) {
// cnxns typically have many watches, so use default cap here
- paths = new HashSet<>();
+ paths = new HashMap<>();
watch2Paths.put(watcher, paths);
}
- watcherModeManager.setWatcherMode(watcher, path, watcherMode);
+ WatchStats stats = paths.getOrDefault(path, WatchStats.NONE);
+ WatchStats newStats = stats.addMode(watcherMode);
- return paths.add(path);
+ if (newStats != stats) {
+ paths.put(path, newStats);
+ if (watcherMode.isRecursive()) {
+ ++recursiveWatchQty;
+ }
+ return true;
+ }
+
+ return false;
}
@Override
public synchronized void removeWatcher(Watcher watcher) {
- Set<String> paths = watch2Paths.remove(watcher);
+ Map<String, WatchStats> paths = watch2Paths.remove(watcher);
if (paths == null) {
return;
}
- for (String p : paths) {
+ for (String p : paths.keySet()) {
Set<Watcher> list = watchTable.get(p);
if (list != null) {
list.remove(watcher);
@@ -110,7 +120,11 @@ public class WatchManager implements IWatchManager {
watchTable.remove(p);
}
}
- watcherModeManager.removeWatcher(watcher, p);
+ }
+ for (WatchStats stats : paths.values()) {
+ if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {
+ --recursiveWatchQty;
+ }
}
}
@@ -123,8 +137,8 @@ public class WatchManager implements IWatchManager {
public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
Set<Watcher> watchers = new HashSet<>();
- PathParentIterator pathParentIterator = getPathParentIterator(path);
synchronized (this) {
+ PathParentIterator pathParentIterator = getPathParentIterator(path);
for (String localPath : pathParentIterator.asIterable()) {
Set<Watcher> thisWatchers = watchTable.get(localPath);
if (thisWatchers == null || thisWatchers.isEmpty()) {
@@ -133,20 +147,23 @@ public class WatchManager implements IWatchManager {
Iterator<Watcher> iterator = thisWatchers.iterator();
while (iterator.hasNext()) {
Watcher watcher = iterator.next();
- WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, localPath);
- if (watcherMode.isRecursive()) {
- if (type != EventType.NodeChildrenChanged) {
- watchers.add(watcher);
- }
- } else if (!pathParentIterator.atParentPath()) {
+ Map<String, WatchStats> paths = watch2Paths.getOrDefault(watcher, Collections.emptyMap());
+ WatchStats stats = paths.get(localPath);
+ if (stats == null) {
+ LOG.warn("inconsistent watch table for watcher {}, {} not in path list", watcher, localPath);
+ continue;
+ }
+ if (!pathParentIterator.atParentPath()) {
watchers.add(watcher);
- if (!watcherMode.isPersistent()) {
+ WatchStats newStats = stats.removeMode(WatcherMode.STANDARD);
+ if (newStats == WatchStats.NONE) {
iterator.remove();
- Set<String> paths = watch2Paths.get(watcher);
- if (paths != null) {
- paths.remove(localPath);
- }
+ paths.remove(localPath);
+ } else if (newStats != stats) {
+ paths.put(localPath, newStats);
}
+ } else if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {
+ watchers.add(watcher);
}
}
if (thisWatchers.isEmpty()) {
@@ -199,7 +216,7 @@ public class WatchManager implements IWatchManager {
sb.append(watch2Paths.size()).append(" connections watching ").append(watchTable.size()).append(" paths\n");
int total = 0;
- for (Set<String> paths : watch2Paths.values()) {
+ for (Map<String, WatchStats> paths : watch2Paths.values()) {
total += paths.size();
}
sb.append("Total watches:").append(total);
@@ -219,10 +236,10 @@ public class WatchManager implements IWatchManager {
}
}
} else {
- for (Entry<Watcher, Set<String>> e : watch2Paths.entrySet()) {
+ for (Entry<Watcher, Map<String, WatchStats>> e : watch2Paths.entrySet()) {
pwriter.print("0x");
pwriter.println(Long.toHexString(((ServerCnxn) e.getKey()).getSessionId()));
- for (String path : e.getValue()) {
+ for (String path : e.getValue().keySet()) {
pwriter.print("\t");
pwriter.println(path);
}
@@ -232,31 +249,28 @@ public class WatchManager implements IWatchManager {
@Override
public synchronized boolean containsWatcher(String path, Watcher watcher) {
- WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, path);
- PathParentIterator pathParentIterator = getPathParentIterator(path);
- for (String localPath : pathParentIterator.asIterable()) {
- Set<Watcher> watchers = watchTable.get(localPath);
- if (!pathParentIterator.atParentPath()) {
- if (watchers != null) {
- return true; // at the leaf node, all watcher types match
- }
- }
- if (watcherMode.isRecursive()) {
- return true;
- }
- }
- return false;
+ Set<Watcher> list = watchTable.get(path);
+ return list != null && list.contains(watcher);
}
@Override
public synchronized boolean removeWatcher(String path, Watcher watcher) {
- Set<String> paths = watch2Paths.get(watcher);
- if (paths == null || !paths.remove(path)) {
+ Map<String, WatchStats> paths = watch2Paths.get(watcher);
+ if (paths == null) {
return false;
}
+ WatchStats stats = paths.remove(path);
+ if (stats == null) {
+ return false;
+ }
+ if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {
+ --recursiveWatchQty;
+ }
+
Set<Watcher> list = watchTable.get(path);
if (list == null || !list.remove(watcher)) {
+ LOG.warn("inconsistent watch table for path {}, {} not in watcher list", path, watcher);
return false;
}
@@ -264,17 +278,20 @@ public class WatchManager implements IWatchManager {
watchTable.remove(path);
}
- watcherModeManager.removeWatcher(watcher, path);
-
return true;
}
+ // VisibleForTesting
+ Map<Watcher, Map<String, WatchStats>> getWatch2Paths() {
+ return watch2Paths;
+ }
+
@Override
public synchronized WatchesReport getWatches() {
Map<Long, Set<String>> id2paths = new HashMap<>();
- for (Entry<Watcher, Set<String>> e : watch2Paths.entrySet()) {
+ for (Entry<Watcher, Map<String, WatchStats>> e : watch2Paths.entrySet()) {
Long id = ((ServerCnxn) e.getKey()).getSessionId();
- Set<String> paths = new HashSet<>(e.getValue());
+ Set<String> paths = new HashSet<>(e.getValue().keySet());
id2paths.put(id, paths);
}
return new WatchesReport(id2paths);
@@ -296,7 +313,7 @@ public class WatchManager implements IWatchManager {
@Override
public synchronized WatchesSummary getWatchesSummary() {
int totalWatches = 0;
- for (Set<String> paths : watch2Paths.values()) {
+ for (Map<String, WatchStats> paths : watch2Paths.values()) {
totalWatches += paths.size();
}
return new WatchesSummary(watch2Paths.size(), watchTable.size(), totalWatches);
@@ -305,13 +322,13 @@ public class WatchManager implements IWatchManager {
@Override
public void shutdown() { /* do nothing */ }
- @Override
- public int getRecursiveWatchQty() {
- return watcherModeManager.getRecursiveQty();
+ // VisibleForTesting
+ synchronized int getRecursiveWatchQty() {
+ return recursiveWatchQty;
}
private PathParentIterator getPathParentIterator(String path) {
- if (watcherModeManager.getRecursiveQty() == 0) {
+ if (getRecursiveWatchQty() == 0) {
return PathParentIterator.forPathOnly(path);
}
return PathParentIterator.forAll(path);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchStats.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchStats.java
new file mode 100644
index 000000000..fd0c0259e
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchStats.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+/**
+ * Statistics for multiple different watches on one node.
+ */
+public final class WatchStats {
+ private static final WatchStats[] WATCH_STATS = new WatchStats[] {
+ new WatchStats(0), // NONE
+ new WatchStats(1), // STANDARD
+ new WatchStats(2), // PERSISTENT
+ new WatchStats(3), // STANDARD + PERSISTENT
+ new WatchStats(4), // PERSISTENT_RECURSIVE
+ new WatchStats(5), // STANDARD + PERSISTENT_RECURSIVE
+ new WatchStats(6), // PERSISTENT + PERSISTENT_RECURSIVE
+ new WatchStats(7), // STANDARD + PERSISTENT + PERSISTENT_RECURSIVE
+ };
+
+ /**
+ * Stats that have no watchers attached.
+ *
+ * <p>This could be used as start point to compute new stats using {@link #addMode(WatcherMode)}.
+ */
+ public static final WatchStats NONE = WATCH_STATS[0];
+
+ private final int flags;
+
+ private WatchStats(int flags) {
+ this.flags = flags;
+ }
+
+ private static int modeToFlag(WatcherMode mode) {
+ return 1 << mode.ordinal();
+ }
+
+ /**
+ * Compute stats after given mode attached to node.
+ *
+ * @param mode watcher mode
+ * @return a new stats if given mode is not attached to this node before, otherwise old stats
+ */
+ public WatchStats addMode(WatcherMode mode) {
+ int flags = this.flags | modeToFlag(mode);
+ return WATCH_STATS[flags];
+ }
+
+ /**
+ * Compute stats after given mode removed from node.
+ *
+ * @param mode watcher mode
+ * @return null if given mode is the last attached mode, otherwise a new stats
+ */
+ public WatchStats removeMode(WatcherMode mode) {
+ int mask = ~modeToFlag(mode);
+ int flags = this.flags & mask;
+ if (flags == 0) {
+ return NONE;
+ }
+ return WATCH_STATS[flags];
+ }
+
+ /**
+ * Check whether given mode is attached to this node.
+ *
+ * @param mode watcher mode
+ * @return true if given mode is attached to this node.
+ */
+ public boolean hasMode(WatcherMode mode) {
+ int flags = modeToFlag(mode);
+ return (this.flags & flags) != 0;
+ }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherMode.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherMode.java
index b8a1dda74..e05ba900e 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherMode.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherMode.java
@@ -23,7 +23,7 @@ import org.apache.zookeeper.ZooDefs;
public enum WatcherMode {
STANDARD(false, false),
PERSISTENT(true, false),
- PERSISTENT_RECURSIVE(true, true)
+ PERSISTENT_RECURSIVE(true, true),
;
public static final WatcherMode DEFAULT_WATCHER_MODE = WatcherMode.STANDARD;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherModeManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherModeManager.java
deleted file mode 100644
index c1a8225f8..000000000
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherModeManager.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.watch;
-
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.zookeeper.Watcher;
-
-class WatcherModeManager {
- private final Map<Key, WatcherMode> watcherModes = new ConcurrentHashMap<>();
- private final AtomicInteger recursiveQty = new AtomicInteger(0);
-
- private static class Key {
- private final Watcher watcher;
- private final String path;
-
- Key(Watcher watcher, String path) {
- this.watcher = watcher;
- this.path = path;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Key key = (Key) o;
- return watcher.equals(key.watcher) && path.equals(key.path);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(watcher, path);
- }
- }
-
- // VisibleForTesting
- Map<Key, WatcherMode> getWatcherModes() {
- return watcherModes;
- }
-
- void setWatcherMode(Watcher watcher, String path, WatcherMode mode) {
- if (mode == WatcherMode.DEFAULT_WATCHER_MODE) {
- removeWatcher(watcher, path);
- } else {
- adjustRecursiveQty(watcherModes.put(new Key(watcher, path), mode), mode);
- }
- }
-
- WatcherMode getWatcherMode(Watcher watcher, String path) {
- return watcherModes.getOrDefault(new Key(watcher, path), WatcherMode.DEFAULT_WATCHER_MODE);
- }
-
- void removeWatcher(Watcher watcher, String path) {
- adjustRecursiveQty(watcherModes.remove(new Key(watcher, path)), WatcherMode.DEFAULT_WATCHER_MODE);
- }
-
- int getRecursiveQty() {
- return recursiveQty.get();
- }
-
- // recursiveQty is an optimization to avoid having to walk the map every time this value is needed
- private void adjustRecursiveQty(WatcherMode oldMode, WatcherMode newMode) {
- if (oldMode == null) {
- oldMode = WatcherMode.DEFAULT_WATCHER_MODE;
- }
- if (oldMode.isRecursive() != newMode.isRecursive()) {
- if (newMode.isRecursive()) {
- recursiveQty.incrementAndGet();
- } else {
- recursiveQty.decrementAndGet();
- }
- }
- }
-}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/RecursiveWatchQtyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/RecursiveWatchQtyTest.java
index 78b13bb33..0582ddafc 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/RecursiveWatchQtyTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/RecursiveWatchQtyTest.java
@@ -52,28 +52,6 @@ public class RecursiveWatchQtyTest {
}
@Test
- public void testRecursiveQty() {
- WatcherModeManager manager = new WatcherModeManager();
- DummyWatcher watcher = new DummyWatcher();
- manager.setWatcherMode(watcher, "/a", WatcherMode.DEFAULT_WATCHER_MODE);
- assertEquals(0, manager.getRecursiveQty());
- manager.setWatcherMode(watcher, "/a", WatcherMode.PERSISTENT_RECURSIVE);
- assertEquals(1, manager.getRecursiveQty());
- manager.setWatcherMode(watcher, "/a/b", WatcherMode.PERSISTENT_RECURSIVE);
- assertEquals(2, manager.getRecursiveQty());
- manager.setWatcherMode(watcher, "/a", WatcherMode.PERSISTENT_RECURSIVE);
- assertEquals(2, manager.getRecursiveQty());
- manager.setWatcherMode(watcher, "/a/b", WatcherMode.PERSISTENT);
- assertEquals(1, manager.getRecursiveQty());
- manager.setWatcherMode(watcher, "/a/b", WatcherMode.PERSISTENT_RECURSIVE);
- assertEquals(2, manager.getRecursiveQty());
- manager.setWatcherMode(watcher, "/a/b", WatcherMode.DEFAULT_WATCHER_MODE);
- assertEquals(1, manager.getRecursiveQty());
- manager.setWatcherMode(watcher, "/a", WatcherMode.PERSISTENT);
- assertEquals(0, manager.getRecursiveQty());
- }
-
- @Test
public void testAddRemove() {
Watcher watcher1 = new DummyWatcher();
Watcher watcher2 = new DummyWatcher();
@@ -125,7 +103,7 @@ public class RecursiveWatchQtyTest {
}
@Test
- public void testChangeType() {
+ public void testDifferentWatchModes() {
Watcher watcher = new DummyWatcher();
watchManager.addWatch("/a", watcher, WatcherMode.PERSISTENT);
@@ -133,15 +111,14 @@ public class RecursiveWatchQtyTest {
watchManager.addWatch("/a", watcher, WatcherMode.PERSISTENT_RECURSIVE);
assertEquals(1, watchManager.getRecursiveWatchQty());
watchManager.addWatch("/a", watcher, WatcherMode.STANDARD);
- assertEquals(0, watchManager.getRecursiveWatchQty());
+ assertEquals(1, watchManager.getRecursiveWatchQty());
assertTrue(watchManager.removeWatcher("/a", watcher));
assertEquals(0, watchManager.getRecursiveWatchQty());
}
@Test
- public void testRecursiveQtyConcurrency() {
- ThreadLocalRandom random = ThreadLocalRandom.current();
- WatcherModeManager manager = new WatcherModeManager();
+ public void testRecursiveQtyConcurrency() throws Exception {
+ WatchManager manager = new WatchManager();
ExecutorService threadPool = Executors.newFixedThreadPool(clientQty);
List<Future<?>> tasks = null;
CountDownLatch completedLatch = new CountDownLatch(clientQty);
@@ -149,11 +126,7 @@ public class RecursiveWatchQtyTest {
tasks = IntStream.range(0, clientQty)
.mapToObj(__ -> threadPool.submit(() -> iterate(manager, completedLatch)))
.collect(Collectors.toList());
- try {
- completedLatch.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ completedLatch.await();
} finally {
if (tasks != null) {
tasks.forEach(t -> t.cancel(true));
@@ -161,14 +134,15 @@ public class RecursiveWatchQtyTest {
threadPool.shutdownNow();
}
- int expectedRecursiveQty = (int) manager.getWatcherModes().values()
+ int expectedRecursiveQty = (int) manager.getWatch2Paths().values()
.stream()
- .filter(mode -> mode == WatcherMode.PERSISTENT_RECURSIVE)
+ .flatMap(paths -> paths.values().stream())
+ .filter(stats -> stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE))
.count();
- assertEquals(expectedRecursiveQty, manager.getRecursiveQty());
+ assertEquals(expectedRecursiveQty, manager.getRecursiveWatchQty());
}
- private void iterate(WatcherModeManager manager, CountDownLatch completedLatch) {
+ private void iterate(WatchManager manager, CountDownLatch completedLatch) {
ThreadLocalRandom random = ThreadLocalRandom.current();
try {
for (int i = 0; i < iterations; ++i) {
@@ -176,9 +150,9 @@ public class RecursiveWatchQtyTest {
boolean doSet = random.nextInt(100) > 33; // 2/3 will be sets
if (doSet) {
WatcherMode mode = WatcherMode.values()[random.nextInt(WatcherMode.values().length)];
- manager.setWatcherMode(new DummyWatcher(), path, mode);
+ manager.addWatch(path, new DummyWatcher(), mode);
} else {
- manager.removeWatcher(new DummyWatcher(), path);
+ manager.removeWatcher(path, new DummyWatcher());
}
int sleepMillis = random.nextInt(2);
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java
index e74ee2fd6..077af3c45 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java
@@ -18,6 +18,7 @@
package org.apache.zookeeper.test;
+import static org.apache.zookeeper.AddWatchMode.PERSISTENT;
import static org.apache.zookeeper.AddWatchMode.PERSISTENT_RECURSIVE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -172,6 +173,53 @@ public class PersistentRecursiveWatcherTest extends ClientBase {
}
@Test
+ public void testSamePathWithDifferentWatchModes() throws Exception {
+ try (ZooKeeper zk = createClient()) {
+ BlockingQueue<WatchedEvent> dataEvents = new LinkedBlockingQueue<>();
+ BlockingQueue<WatchedEvent> childEvents = new LinkedBlockingQueue<>();
+ BlockingQueue<WatchedEvent> persistentEvents = new LinkedBlockingQueue<>();
+ BlockingQueue<WatchedEvent> recursiveEvents = new LinkedBlockingQueue<>();
+
+ zk.addWatch("/a", persistentEvents::add, PERSISTENT);
+ zk.addWatch("/a", recursiveEvents::add, PERSISTENT_RECURSIVE);
+ zk.exists("/a", dataEvents::add);
+
+ zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ assertEvent(dataEvents, Watcher.Event.EventType.NodeCreated, "/a");
+ assertEvent(persistentEvents, Watcher.Event.EventType.NodeCreated, "/a");
+ assertEvent(recursiveEvents, Watcher.Event.EventType.NodeCreated, "/a");
+
+ zk.getData("/a", dataEvents::add, null);
+ zk.setData("/a", new byte[0], -1);
+ assertEvent(dataEvents, Watcher.Event.EventType.NodeDataChanged, "/a");
+ assertEvent(persistentEvents, Watcher.Event.EventType.NodeDataChanged, "/a");
+ assertEvent(recursiveEvents, Watcher.Event.EventType.NodeDataChanged, "/a");
+
+ zk.getChildren("/a", childEvents::add);
+ zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ assertEvent(childEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a");
+ assertEvent(persistentEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a");
+ assertEvent(recursiveEvents, Watcher.Event.EventType.NodeCreated, "/a/b");
+
+ zk.getChildren("/a", childEvents::add);
+ zk.delete("/a/b", -1);
+ assertEvent(childEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a");
+ assertEvent(persistentEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a");
+ assertEvent(recursiveEvents, Watcher.Event.EventType.NodeDeleted, "/a/b");
+
+ zk.getChildren("/a", childEvents::add);
+ zk.getData("/a", dataEvents::add, null);
+ zk.exists("/a", dataEvents::add);
+ zk.delete("/a", -1);
+ assertEvent(childEvents, Watcher.Event.EventType.NodeDeleted, "/a");
+ assertEvent(dataEvents, Watcher.Event.EventType.NodeDeleted, "/a");
+ assertEvent(dataEvents, Watcher.Event.EventType.NodeDeleted, "/a");
+ assertEvent(persistentEvents, Watcher.Event.EventType.NodeDeleted, "/a");
+ assertEvent(recursiveEvents, Watcher.Event.EventType.NodeDeleted, "/a");
+ }
+ }
+
+ @Test
public void testRootWatcher()
throws IOException, InterruptedException, KeeperException {
try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {