diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 04ecea6..24034bd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -741,6 +741,20 @@ */ public static final float DEFAULT_NM_MIN_HEALTHY_DISKS_FRACTION = 0.25F; + + /** + * The maximum percentage of disk space that can be used after which a + * disk is marked as offline. If the value is greater than or equal to + * 1.0, NM will check for full disk. + * This applies to nm-local-dirs and nm-log-dirs. + */ + public static final String NM_HEALTHY_DISKS_MAX_SPACE_UTILIZATION = + NM_PREFIX + "disk-health-checker.max-space-utilization"; + /** + * By default, 100% of the disk can be used before it is marked as offline. + */ + public static final float DEFAULT_NM_HEALTHY_DISKS_MAX_SPACE_UTILIZATION + = 1.0F; /** Frequency of running node health script.*/ public static final String NM_HEALTH_CHECK_INTERVAL_MS = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java index 10362d2..1d3ec4d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java @@ -22,6 +22,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -43,10 +44,14 @@ private List localDirs; private List failedDirs; private int numFailures; - - public DirectoryCollection(String[] dirs) { + + private float diskUtilizationCutoff; + + public DirectoryCollection(String[] dirs, + float utilizationCutOff) { localDirs = new CopyOnWriteArrayList(dirs); failedDirs = new CopyOnWriteArrayList(); + diskUtilizationCutoff = utilizationCutOff; } /** @@ -103,17 +108,39 @@ synchronized boolean createNonExistentDirs(FileContext localFs, */ synchronized boolean checkDirs() { int oldNumFailures = numFailures; + HashSet checkFailedDirs = new HashSet(); for (final String dir : localDirs) { try { - DiskChecker.checkDir(new File(dir)); + File testDir = new File(dir); + DiskChecker.checkDir(testDir); + float freePerc = + testDir.getUsableSpace()/(float)testDir.getTotalSpace(); + float usedPerc = 1.0F - freePerc; + if(usedPerc > diskUtilizationCutoff) { + LOG.warn("Directory " + dir + + " error, used space above threshold of " + + diskUtilizationCutoff + + ", removing from the list of valid directories."); + checkFailedDirs.add(dir); + } + else if(usedPerc >= 1.0F) { + LOG.warn("Directory " + dir + + " error, disk full" + + ", removing from the list of valid directories."); + checkFailedDirs.add(dir); + } + } catch (DiskErrorException de) { LOG.warn("Directory " + dir + " error " + de.getMessage() + ", removing from the list of valid directories."); - localDirs.remove(dir); - failedDirs.add(dir); - numFailures++; + checkFailedDirs.add(dir); } } + for(String dir: checkFailedDirs) { + localDirs.remove(dir); + failedDirs.add(dir); + numFailures++; + } return numFailures > oldNumFailures; } @@ -132,4 +159,13 @@ private void createDir(FileContext localFs, Path dir, FsPermission perm) } } } + + public float getDiskUtilizationCutoff() { + return diskUtilizationCutoff; + } + + public void setDiskUtilizationCutoff(float diskUtilizationCutoff) { + this.diskUtilizationCutoff = diskUtilizationCutoff; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java index b9b46eb..3a020b6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java @@ -90,9 +90,13 @@ public MonitoringTimerTask(Configuration conf) throws YarnRuntimeException { localDirs = new DirectoryCollection( - validatePaths(conf.getTrimmedStrings(YarnConfiguration.NM_LOCAL_DIRS))); + validatePaths(conf.getTrimmedStrings(YarnConfiguration.NM_LOCAL_DIRS)), + conf.getFloat(YarnConfiguration.NM_HEALTHY_DISKS_MAX_SPACE_UTILIZATION, + YarnConfiguration.DEFAULT_NM_HEALTHY_DISKS_MAX_SPACE_UTILIZATION)); logDirs = new DirectoryCollection( - validatePaths(conf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS))); + validatePaths(conf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS)), + conf.getFloat(YarnConfiguration.NM_HEALTHY_DISKS_MAX_SPACE_UTILIZATION, + YarnConfiguration.DEFAULT_NM_HEALTHY_DISKS_MAX_SPACE_UTILIZATION)); localDirsAllocator = new LocalDirAllocator( YarnConfiguration.NM_LOCAL_DIRS); logDirsAllocator = new LocalDirAllocator(YarnConfiguration.NM_LOG_DIRS); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java index 4ab61c9..5b08aaa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -55,8 +56,11 @@ public static void teardown() { @Test public void testConcurrentAccess() throws IOException { // Initialize DirectoryCollection with a file instead of a directory + Configuration conf = new Configuration(); String[] dirs = {testFile.getPath()}; - DirectoryCollection dc = new DirectoryCollection(dirs); + DirectoryCollection dc = new DirectoryCollection(dirs, + conf.getFloat(YarnConfiguration.NM_HEALTHY_DISKS_MAX_SPACE_UTILIZATION, + YarnConfiguration.DEFAULT_NM_HEALTHY_DISKS_MAX_SPACE_UTILIZATION)); // Create an iterator before checkDirs is called to reliable test case List list = dc.getGoodDirs(); @@ -88,7 +92,9 @@ public void testCreateDirectories() throws IOException { localFs.setPermission(pathC, permDirC); String[] dirs = { dirA, dirB, dirC }; - DirectoryCollection dc = new DirectoryCollection(dirs); + DirectoryCollection dc = new DirectoryCollection(dirs, + conf.getFloat(YarnConfiguration.NM_HEALTHY_DISKS_MAX_SPACE_UTILIZATION, + YarnConfiguration.DEFAULT_NM_HEALTHY_DISKS_MAX_SPACE_UTILIZATION)); FsPermission defaultPerm = FsPermission.getDefault() .applyUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK)); boolean createResult = dc.createNonExistentDirs(localFs, defaultPerm); @@ -104,4 +110,20 @@ public void testCreateDirectories() throws IOException { Assert.assertEquals("existing local directory permissions modified", permDirC, status.getPermission()); } + + @Test + public void testDiskSpaceUtilizationLimit() throws IOException { + + String dirA = new File(testDir, "dirA").getPath(); + String[] dirs = { dirA }; + DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F); + dc.checkDirs(); + Assert.assertEquals(0, dc.getGoodDirs().size()); + Assert.assertEquals(1, dc.getFailedDirs().size()); + + DirectoryCollection dc2 = new DirectoryCollection(dirs, 1.0F); + dc2.checkDirs(); + Assert.assertEquals(1, dc2.getGoodDirs().size()); + Assert.assertEquals(0, dc2.getFailedDirs().size()); + } }