diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java index 6aee6a8f75d..29d257d4d72 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java @@ -48,7 +48,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; /** * Manages a list of local storage directories. @@ -97,9 +96,9 @@ } // Good local storage directories - private List localDirs; - private List errorDirs; - private List fullDirs; + private CopyOnWriteArrayList localDirs; + private CopyOnWriteArrayList errorDirs; + private CopyOnWriteArrayList fullDirs; private Map directoryErrorInfo; // read/write lock for accessing above directories. @@ -275,7 +274,7 @@ void deregisterDirsChangeListener( List getGoodDirs() { this.readLock.lock(); try { - return ImmutableList.copyOf(localDirs); + return Collections.unmodifiableList(localDirs); } finally { this.readLock.unlock(); } @@ -287,7 +286,7 @@ void deregisterDirsChangeListener( List getFailedDirs() { this.readLock.lock(); try { - return ImmutableList.copyOf( + return Collections.unmodifiableList( DirectoryCollection.concat(errorDirs, fullDirs)); } finally { this.readLock.unlock(); @@ -300,7 +299,7 @@ void deregisterDirsChangeListener( List getFullDirs() { this.readLock.lock(); try { - return ImmutableList.copyOf(fullDirs); + return Collections.unmodifiableList(fullDirs); } finally { this.readLock.unlock(); } @@ -439,62 +438,61 @@ boolean checkDirs() { this.writeLock.lock(); try { - localDirs.clear(); - errorDirs.clear(); - fullDirs.clear(); directoryErrorInfo.clear(); - - for (Map.Entry entry : dirsFailedCheck - .entrySet()) { - String dir = entry.getKey(); - DiskErrorInformation errorInformation = entry.getValue(); - - switch (entry.getValue().cause) { - case DISK_FULL: - fullDirs.add(entry.getKey()); - break; - case OTHER: - errorDirs.add(entry.getKey()); - break; - default: - LOG.warn(entry.getValue().cause + " is unknown for disk error."); - break; - } - directoryErrorInfo.put(entry.getKey(), errorInformation); - - if (preCheckGoodDirs.contains(dir)) { - LOG.warn("Directory " + dir + " error, " + errorInformation.message - + ", removing from list of valid directories"); - setChanged = true; - numFailures++; - } - } for (String dir : allLocalDirs) { - if (!dirsFailedCheck.containsKey(dir)) { - localDirs.add(dir); - if (preCheckFullDirs.contains(dir) - || preCheckOtherErrorDirs.contains(dir)) { + if (dirsFailedCheck.containsKey(dir)) { + // Handle failed dirs + DiskErrorInformation errorInformation = dirsFailedCheck.get(dir); + if (preCheckGoodDirs.contains(dir)) { + localDirs.remove(dir); + LOG.warn("Directory " + dir + " error, " + errorInformation.message + + ", removing from list of valid directories"); + setChanged = true; + numFailures++; + } + switch (errorInformation.cause) { + case DISK_FULL: + if (!preCheckFullDirs.contains(dir)) { + fullDirs.addIfAbsent(dir); + if (preCheckOtherErrorDirs.contains(dir)) { + LOG.warn("Directory " + dir + " error " + + errorInformation.message); + } + } + break; + case OTHER: + if (!preCheckOtherErrorDirs.contains(dir)) { + errorDirs.addIfAbsent(dir); + if (preCheckFullDirs.contains(dir)) { + LOG.warn("Directory " + dir + " error " + + errorInformation.message); + } + } + break; + default: + LOG.warn("Directory " + dir + " cause " + + errorInformation.cause + " is unknown for disk error " + + errorInformation.message); + break; + } + directoryErrorInfo.put(dir, errorInformation); + } else { + // Handle good dirs + if (!preCheckGoodDirs.contains(dir)) { + localDirs.addIfAbsent(dir); setChanged = true; LOG.info("Directory " + dir + " passed disk check, adding to list of valid directories."); } - } - } - Set postCheckFullDirs = new HashSet(fullDirs); - Set postCheckOtherDirs = new HashSet(errorDirs); - for (String dir : preCheckFullDirs) { - if (postCheckOtherDirs.contains(dir)) { - LOG.warn("Directory " + dir + " error " - + dirsFailedCheck.get(dir).message); + if (preCheckFullDirs.contains(dir)) { + fullDirs.remove(dir); + } + if (preCheckOtherErrorDirs.contains(dir)) { + errorDirs.remove(dir); + } } } - for (String dir : preCheckOtherErrorDirs) { - if (postCheckFullDirs.contains(dir)) { - LOG.warn("Directory " + dir + " error " - + dirsFailedCheck.get(dir).message); - } - } setGoodDirsDiskUtilizationPercentage(); if (setChanged) { for (DirsChangeListener listener : dirsChangeListeners) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java index b99c7d625ee..b67e8b7c1ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java @@ -419,6 +419,96 @@ public void testDirsChangeListener() { Assert.assertEquals(listener3.num, 1); } + @Test + public void testCheckFailedDirsRace() throws IOException { + String dir = new File(testDir, "dir").getPath(); + String[] dirs = { dir }; + DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F); + + // Run once for the lists to initially populate + dc.checkDirs(); + + List failedDirs = dc.getFailedDirs(); + checkDirsRace(dc, failedDirs, "failedDirs", 1); + } + + @Test + public void testCheckFullDirsRace() throws IOException { + String dir = new File(testDir, "dir").getPath(); + String[] dirs = { dir }; + DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F); + + // Run once for the lists to initially populate + dc.checkDirs(); + + List fullDirs = dc.getFullDirs(); + checkDirsRace(dc, fullDirs, "fullDirs", 1); + } + + @Test + public void testCheckErroredDirsRace() throws IOException { + String dir = new File(testDir, "dir").getPath(); + Path path = new Path(dir); + FsPermission permDirB = new FsPermission((short) 0400); + + localFs.mkdir(path, null, true); + localFs.setPermission(path, permDirB); + + String[] dirs = { dir }; + + DirectoryCollection dc = + new DirectoryCollection(dirs, conf.getFloat( + YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, + YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE)); + + // Run once for the lists to initially populate + dc.checkDirs(); + + List erroredDirs = dc.getErroredDirs(); + checkDirsRace(dc, erroredDirs, "erroredDirs", 1); + } + + @Test + public void testCheckGoodDirsRace() throws IOException { + String dir = new File(testDir, "dir").getPath(); + + String[] dirs = { dir }; + DirectoryCollection dc = + new DirectoryCollection(dirs, conf.getFloat( + YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, + YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE)); + + // Run once for the lists to initially populate + dc.checkDirs(); + + List goodDirs = dc.getGoodDirs(); + checkDirsRace(dc, goodDirs, "goodDirs", 1); + } + + private void checkDirsRace(DirectoryCollection dc, List dirs, + String dirsType, int expectedDirsSize) { + Assert.assertEquals("Unexpected number of " + dirsType, + expectedDirsSize, dirs.size()); + + Thread checkDirsThread = new Thread() { + public void run() { + // The race should be caught on the first iteration, but adding a few + // more just in case + for (int i = 0; i < 10; i++) { + dc.checkDirs(); + } + } + }; + checkDirsThread.start(); + + // In testing, this race manifests on iteration 30000-50000. Quadrupling + // that to make sure that it gets exposed + for (int i = 0; i < 200000; i++) { + Assert.assertEquals("Unexpected number of " + dirsType + " on iteration " + + i, expectedDirsSize, dirs.size()); + } + } + static class DirsChangeListenerTest implements DirsChangeListener { public int num = 0; public DirsChangeListenerTest() {