From c58e312831ea3ecd650f3f64c423974c7466970c Mon Sep 17 00:00:00 2001 From: Akihiro Suda Date: Wed, 27 Jan 2016 08:12:36 +0000 Subject: [PATCH] YARN-4301. NM disk health checker should have a timeout. This fails due to "java.lang.RuntimeException: strange NPE" at DirectoryCollection.java:381 --- .../apache/hadoop/yarn/conf/YarnConfiguration.java | 6 ++ .../src/main/resources/yarn-default.xml | 6 ++ .../server/nodemanager/DirectoryCollection.java | 105 ++++++++++++++++++++- .../nodemanager/LocalDirsHandlerService.java | 15 ++- .../nodemanager/TestDirectoryCollection.java | 26 +++++ 5 files changed, 153 insertions(+), 5 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index e214a86..0ee7d4a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1097,6 +1097,12 @@ public static boolean isAclEnabled(Configuration conf) { public static final long DEFAULT_NM_DISK_HEALTH_CHECK_INTERVAL_MS = 2 * 60 * 1000; + /* timeout of running disks' health checker. */ + public static final String NM_DISK_HEALTH_CHECK_TIMEOUT_MS = + NM_DISK_HEALTH_CHECK_PREFIX + "timeout-ms"; + public static final long DEFAULT_NM_DISK_HEALTH_CHECK_TIMEOUT_MS = + 30 * 1000; + /** * The minimum fraction of number of disks to be healthy for the nodemanager * to launch new containers. This applies to nm-local-dirs and nm-log-dirs. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 6508a2a..72ebd08 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1325,6 +1325,12 @@ + Timeout of running disk health checker code. + yarn.nodemanager.disk-health-checker.timeout-ms + 30000 + + + The minimum fraction of number of disks to be healthy for the nodemanager to launch new containers. This correspond to both yarn.nodemanager.local-dirs and yarn.nodemanager.log-dirs. i.e. If there diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java index a2bfd20..2ee6ace 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java @@ -28,7 +28,16 @@ import java.util.List; import java.util.Map; import java.util.Set; + +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.RandomStringUtils; @@ -38,6 +47,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.DiskChecker; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import com.google.common.annotations.VisibleForTesting; @@ -51,7 +61,7 @@ * The enum defines disk failure type. */ public enum DiskErrorCause { - DISK_FULL, OTHER + DISK_FULL, TIMEDOUT, OTHER } static class DiskErrorInformation { @@ -99,6 +109,9 @@ private Set dirsChangeListeners; + private ExecutorService asyncTestDirsExecutor; + private ConcurrentHashMap inprogressTestDirs; + /** * Create collection for the directories specified. No check for free space. * @@ -175,6 +188,9 @@ public DirectoryCollection(String[] dirs, utilizationSpaceCutOff < 0 ? 0 : utilizationSpaceCutOff; dirsChangeListeners = new HashSet(); + + asyncTestDirsExecutor = Executors.newCachedThreadPool(); + inprogressTestDirs = new ConcurrentHashMap(); } synchronized void registerDirsChangeListener( @@ -248,11 +264,13 @@ synchronized boolean createNonExistentDirs(FileContext localFs, * Check the health of current set of local directories(good and failed), * updating the list of valid directories if necessary. * + * @param timeout + * timeout (milliseconds) value * @return true if there is a new disk-failure identified in this * checking or a failed directory passes the disk check false * otherwise. */ - synchronized boolean checkDirs() { + synchronized boolean checkDirs(long timeout) { boolean setChanged = false; Set preCheckGoodDirs = new HashSet(localDirs); Set preCheckFullDirs = new HashSet(fullDirs); @@ -262,7 +280,7 @@ synchronized boolean checkDirs() { DirectoryCollection.concat(localDirs, failedDirs); Map dirsFailedCheck = testDirs(allLocalDirs, - preCheckGoodDirs); + preCheckGoodDirs, timeout); localDirs.clear(); errorDirs.clear(); @@ -276,6 +294,7 @@ synchronized boolean checkDirs() { case DISK_FULL: fullDirs.add(entry.getKey()); break; + case TIMEDOUT: case OTHER: errorDirs.add(entry.getKey()); break; @@ -322,7 +341,87 @@ synchronized boolean checkDirs() { return setChanged; } + synchronized boolean checkDirs() { + return checkDirs(0); + } + + private class AsyncTestDirsCallable implements Callable > { + private List dirs; + private Set goodDirs; + + public AsyncTestDirsCallable(List dirs, Set goodDirs) { + this.dirs = dirs; + this.goodDirs = goodDirs; + } + + private String hashArgs() { + String hash = ""; + for (String s: dirs) { + hash += s; + } + hash += ":"; + for (String s: goodDirs) { + hash += s; + } + return hash; + } + + public Map call() throws Exception { + String hash = hashArgs(); + // debug YARN-4301 + if (hash == null || inprogressTestDirs == null) { + throw new RuntimeException("as you can see, this RuntimeException doesn't happen. so there should not be NPE"); + } + // end debug YARN-4301 + // not sure why NPE happens here + boolean inprogress; + try { + inprogress = inprogressTestDirs.putIfAbsent(hash, true); + } catch (NullPointerException npe) { + throw new RuntimeException("strange NPE", npe); + } + + if (inprogress) { + throw new YarnRuntimeException("testDirs() in-progress: " + dirs + "," + goodDirs); + } + Map result = synchronousTestDirs(dirs, goodDirs); + inprogressTestDirs.remove(hash); + return result; + } + } + Map testDirs(List dirs, + Set goodDirs, long timeout) { + if (timeout == 0) { + return synchronousTestDirs(dirs, goodDirs); + } + AsyncTestDirsCallable callable = new AsyncTestDirsCallable(dirs, goodDirs); + Future > future = asyncTestDirsExecutor.submit(callable); + Map result = null; + // loop is needed for Thread.interrupt() + while ( result == null ) { + try { + result = future.get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException ie) { + // Restore the interrupted status + Thread.currentThread().interrupt(); + } catch (ExecutionException ee) { + throw new YarnRuntimeException(ee); + } catch (TimeoutException te) { + result = new HashMap(); + // we are not sure which directory caused timeout, + // so mark all the directories as timedout + for (String dir: dirs) { + result.put(dir, new DiskErrorInformation(DiskErrorCause.TIMEDOUT, "timedout")); + } + return result; + } + } + return result; + } + + @VisibleForTesting + Map synchronousTestDirs(List dirs, Set goodDirs) { HashMap ret = new HashMap(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java index 5cc4e19..08a4e96 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java @@ -102,6 +102,9 @@ /** when disk health checking code was last run */ private long lastDisksCheckTime; + + /** timeout (milliseconds) value for checking directories */ + private long checkDirsTimeout; private static String FILE_SCHEME = "file"; @@ -159,6 +162,14 @@ public MonitoringTimerTask(Configuration conf) throws YarnRuntimeException { (log != null) ? log : ""); logDirsAllocator = new LocalDirAllocator( NM_GOOD_LOG_DIRS); + + checkDirsTimeout = + conf.getLong(YarnConfiguration.NM_DISK_HEALTH_CHECK_TIMEOUT_MS, + YarnConfiguration.DEFAULT_NM_DISK_HEALTH_CHECK_TIMEOUT_MS); + if (checkDirsTimeout < 0 ) { + throw new YarnRuntimeException("Non-positive value passed as " + + "yarn.nodemanager.disk-health-checker.dir-timeout-ms"); + } } @Override @@ -446,10 +457,10 @@ private void checkDirs() { Set failedLogDirsPreCheck = new HashSet(logDirs.getFailedDirs()); - if (localDirs.checkDirs()) { + if (localDirs.checkDirs(checkDirsTimeout)) { disksStatusChange = true; } - if (logDirs.checkDirs()) { + if (logDirs.checkDirs(checkDirsTimeout)) { disksStatusChange = true; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java index e529628..29767e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java @@ -22,6 +22,9 @@ import java.io.IOException; import java.util.List; import java.util.ListIterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; @@ -333,4 +336,27 @@ public void onDirsChanged() { num++; } } + + @Test + public void testTimeout() throws IOException { + String[] dirs = {testFile.getPath()}; + final CountDownLatch latch = new CountDownLatch(1); + DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F, 0.0F, 100) { + Map synchronousTestDirs(List dirs, + Set goodDirs) { + try { + Thread.sleep(10 * 1000); + } catch (InterruptedException ie) { + } + latch.countDown(); + return super.synchronousTestDirs(dirs, goodDirs); + } + }; + Assert.assertTrue(dc.checkDirs((long)(0.1 * 1000))); + Assert.assertFalse("checkDirs should return false when timedout", + dc.checkDirs(3 * 1000)); + try { + latch.await(); + } catch (InterruptedException ie) { } + } } -- 2.5.0