Details
-
Bug
-
Status: Open
-
Critical
-
Resolution: Unresolved
-
3.9.0
-
None
-
None
-
download zookeeper 3.9.0-SNAPSHOT from github repository (https://github.com/apache/zookeeper)
Then run : mvn test -Dmaven.test.failure.ignore=true -Dtest=org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers -DfailIfNoTests=false -DredirectTestOutputToFile=false
Description
When we run :
mvn test -Dmaven.test.failure.ignore=true -Dtest=org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers -DfailIfNoTests=false -DredirectTestOutputToFile=false
The following method in class : org.apache.zookeeper.server.watch.WatcherCleaner
public void addDeadWatcher(int watcherBit) { // Wait if there are too many watchers waiting to be closed, // this is will slow down the socket packet processing and // the adding watches in the ZK pipeline. while (maxInProcessingDeadWatchers > 0 && !stopped && totalDeadWatchers.get() >= maxInProcessingDeadWatchers) { try { RATE_LOGGER.rateLimitLog("Waiting for dead watchers cleaning"); long startTime = Time.currentElapsedTime(); synchronized (processingCompletedEvent) { processingCompletedEvent.wait(100); } long latency = Time.currentElapsedTime() - startTime; ServerMetrics.getMetrics().ADD_DEAD_WATCHER_STALL_TIME.add(latency); } catch (InterruptedException e) { LOG.info("Got interrupted while waiting for dead watches queue size"); break; } } synchronized (this) { if (deadWatchers.add(watcherBit)) { totalDeadWatchers.incrementAndGet(); ServerMetrics.getMetrics().DEAD_WATCHERS_QUEUED.add(1); if (deadWatchers.size() >= watcherCleanThreshold) { synchronized (cleanEvent) { cleanEvent.notifyAll(); } } } } }
@Override public void run() { while (!stopped) { synchronized (cleanEvent) { try { // add some jitter to avoid cleaning dead watchers at the // same time in the quorum if (!stopped && deadWatchers.size() < watcherCleanThreshold) { int maxWaitMs = (watcherCleanIntervalInSeconds + ThreadLocalRandom.current().nextInt(watcherCleanIntervalInSeconds / 2 + 1)) * 1000; cleanEvent.wait(maxWaitMs); } } catch (InterruptedException e) { LOG.info("Received InterruptedException while waiting for cleanEvent"); break; } } if (deadWatchers.isEmpty()) { continue; } synchronized (this) { // Clean the dead watchers need to go through all the current // watches, which is pretty heavy and may take a second if // there are millions of watches, that's why we're doing lazily // batch clean up in a separate thread with a snapshot of the // current dead watchers. final Set<Integer> snapshot = new HashSet<>(deadWatchers); deadWatchers.clear(); int total = snapshot.size(); LOG.info("Processing {} dead watchers", total); cleaners.schedule(new WorkRequest() { @Override public void doWork() throws Exception { long startTime = Time.currentElapsedTime(); listener.processDeadWatchers(snapshot); long latency = Time.currentElapsedTime() - startTime; LOG.info("Takes {} to process {} watches", latency, total); ServerMetrics.getMetrics().DEAD_WATCHERS_CLEANER_LATENCY.add(latency); ServerMetrics.getMetrics().DEAD_WATCHERS_CLEARED.add(total); totalDeadWatchers.addAndGet(-total); synchronized (processingCompletedEvent) { processingCompletedEvent.notifyAll(); } } }); } } LOG.info("WatcherCleaner thread exited"); }
As we can see, the two methods visist deadWatchers Object by different thread. Thread in run() is read operation on deadWachers and Thread in addDeadWatcher is write operation on deadWachers. This causes a data race without any lock.