diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java index c1e9d21ecc7..14368ea5cae 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java @@ -163,9 +163,15 @@ public Path getLocalPathForWrite(String pathStr, long size, public Path getLocalPathToRead(String pathStr, Configuration conf) throws IOException { AllocatorPerContext context = obtainContext(contextCfgItemName); - return context.getLocalPathToRead(pathStr, conf); + return context.getLocalPathToRead(pathStr, conf, true); } - + + public Path getLocalPathToRead(String pathStr, + Configuration conf, boolean shouldFilter) throws IOException { + AllocatorPerContext context = obtainContext(contextCfgItemName); + return context.getLocalPathToRead(pathStr, conf, shouldFilter); + } + /** * Get all of the paths that currently exist in the working directories. * @param pathStr the path underneath the roots @@ -288,7 +294,7 @@ public AllocatorPerContext(String contextCfgItemName) { /** This method gets called everytime before any read/write to make sure * that any change to localDirs is reflected immediately. */ - private Context confChanged(Configuration conf) + private Context confChanged(Configuration conf, boolean shouldFilter) throws IOException { Context ctx = currentContext.get(); String newLocalDirs = conf.get(contextCfgItemName); @@ -306,20 +312,14 @@ private Context confChanged(Configuration conf) try { // filter problematic directories Path tmpDir = new Path(dirStrings[i]); - if(ctx.localFS.mkdirs(tmpDir)|| ctx.localFS.exists(tmpDir)) { - try { - File tmpFile = tmpDir.isAbsolute() - ? new File(ctx.localFS.makeQualified(tmpDir).toUri()) - : new File(dirStrings[i]); - - DiskChecker.checkDir(tmpFile); - dirs.add(new Path(tmpFile.getPath())); - dfList.add(new DF(tmpFile, 30000)); - } catch (DiskErrorException de) { - LOG.warn(dirStrings[i] + " is not writable\n", de); - } + File tmpFile = tmpDir.isAbsolute() + ? new File(ctx.localFS.makeQualified(tmpDir).toUri()) + : new File(dirStrings[i]); + if (shouldFilter) { + checkDirsAndFilter(ctx, dirStrings, dirs, dfList, i, tmpDir, + tmpFile); } else { - LOG.warn("Failed to create " + dirStrings[i]); + addToDirsList(dirs, dfList, tmpFile); } } catch (IOException ie) { LOG.warn("Failed to create " + dirStrings[i] + ": " + @@ -341,6 +341,28 @@ private Context confChanged(Configuration conf) return ctx; } + private void checkDirsAndFilter(AllocatorPerContext.Context ctx, + String[] dirStrings, ArrayList dirs, + ArrayList dfList, int i, Path tmpDir, File tmpFile) + throws IOException { + if (ctx.localFS.mkdirs(tmpDir) || ctx.localFS.exists(tmpDir)) { + try { + DiskChecker.checkDir(tmpFile); + addToDirsList(dirs, dfList, tmpFile); + } catch (DiskErrorException de) { + LOG.warn(dirStrings[i] + " is not writable\n", de); + } + } else { + LOG.warn("Failed to create " + dirStrings[i]); + } + } + + private void addToDirsList(ArrayList dirs, ArrayList dfList, + File tmpFile) throws IOException { + dirs.add(new Path(tmpFile.getPath())); + dfList.add(new DF(tmpFile, 30000)); + } + private Path createPath(Path dir, String path, boolean checkWrite) throws IOException { Path file = new Path(dir, path); @@ -375,7 +397,7 @@ int getCurrentDirectoryIndex() { */ public Path getLocalPathForWrite(String pathStr, long size, Configuration conf, boolean checkWrite) throws IOException { - Context ctx = confChanged(conf); + Context ctx = confChanged(conf, true); int numDirs = ctx.localDirs.length; int numDirsSearched = 0; //remove the leading slash from the path (to make sure that the uri @@ -468,8 +490,8 @@ public File createTmpFileForWrite(String pathStr, long size, * path to the file when we find one */ public Path getLocalPathToRead(String pathStr, - Configuration conf) throws IOException { - Context ctx = confChanged(conf); + Configuration conf, boolean shouldFilter) throws IOException { + Context ctx = confChanged(conf, shouldFilter); int numDirs = ctx.localDirs.length; int numDirsSearched = 0; //remove the leading slash from the path (to make sure that the uri @@ -554,7 +576,7 @@ public void remove() { */ Iterable getAllLocalPathsToRead(String pathStr, Configuration conf) throws IOException { - Context ctx = confChanged(conf); + Context ctx = confChanged(conf, false); if (pathStr.startsWith("/")) { pathStr = pathStr.substring(1); } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index 863da7efde9..3fc2d0f2720 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -888,10 +888,13 @@ public AttemptPathInfo load(AttemptPathIdentifier key) throws Exception { String base = getBaseLocation(key.jobId, key.user); String attemptBase = base + key.attemptId; + boolean shouldFilter = conf.getBoolean( + YarnConfiguration.YARN_SHUFFLE_BAD_DIRS_FILTER_ENABLED, + YarnConfiguration.YARN_SHUFFLE_BAD_DIRS_FILTER_ENABLED_DEFAULT); Path indexFileName = lDirAlloc.getLocalPathToRead( - attemptBase + "/" + INDEX_FILE_NAME, conf); + attemptBase + "/" + INDEX_FILE_NAME, conf, shouldFilter); Path mapOutputFileName = lDirAlloc.getLocalPathToRead( - attemptBase + "/" + DATA_FILE_NAME, conf); + attemptBase + "/" + DATA_FILE_NAME, conf, shouldFilter); if (LOG.isDebugEnabled()) { LOG.debug("Loaded : " + key + " via loader"); @@ -1360,6 +1363,12 @@ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) sendError(ctx, INTERNAL_SERVER_ERROR); } } + + @VisibleForTesting + LocalDirAllocator getlDirAlloc() { + return lDirAlloc; + } + } static class AttemptPathInfo { diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java index 849ce1a1563..f17cccbc0d7 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java @@ -744,6 +744,94 @@ protected void verifyRequest(String appid, ChannelHandlerContext ctx, } } + @Test + public void testBadDirs() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean( + YarnConfiguration.YARN_SHUFFLE_BAD_DIRS_FILTER_ENABLED, false); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); + UserGroupInformation.setConfiguration(conf); + File absLogDir1 = new File("target", + TestShuffleHandler.class.getSimpleName() + "LocDir/").getAbsoluteFile(); + File absLogDir2 = new File("target1", + TestShuffleHandler.class.getSimpleName() + "/LocDir2/"); + String localDirs = absLogDir1.getAbsolutePath() + "," + absLogDir2; + ApplicationId appId = ApplicationId.newInstance(12345, 1); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDirs); + LOG.info(appId.toString()); + String appAttemptId = "attempt_12345_1_m_1_0"; + String user = "randomUser"; + String reducerId = "0"; + List fileMap = new ArrayList(); + createShuffleHandlerFiles(absLogDir1, user, appId.toString(), appAttemptId, + conf, fileMap); + createShuffleHandlerFiles(absLogDir2, user, appId.toString(), appAttemptId, + conf, fileMap); + absLogDir1.setWritable(false); + ShuffleHandler shuffleHandler = new ShuffleHandler() { + + @Override + protected Shuffle getShuffle(Configuration conf) { + + // replace the shuffle handler with one stubbed for testing + return new Shuffle(conf) { + + @Override + protected void verifyRequest(String appid, ChannelHandlerContext ctx, + HttpRequest request, HttpResponse response, URL requestUri) + throws IOException { + // Do nothing. + } + }; + } + }; + shuffleHandler.init(conf); + try { + shuffleHandler.start(); + DataOutputBuffer outputBuffer = new DataOutputBuffer(); + outputBuffer.reset(); + Token jt = + new Token("identifier".getBytes(), + "password".getBytes(), new Text(user), new Text("shuffleService")); + jt.write(outputBuffer); + shuffleHandler + .initializeApplication(new ApplicationInitializationContext(user, + appId, ByteBuffer.wrap(outputBuffer.getData(), 0, + outputBuffer.getLength()))); + URL url = + new URL( + "http://127.0.0.1:" + + shuffleHandler.getConfig().get( + ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) + + "/mapOutput?job=job_12345_0001&reduce=" + reducerId + + "&map=attempt_12345_1_m_1_0"); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + conn.connect(); + try { + DataInputStream is = new DataInputStream(conn.getInputStream()); + Path path = shuffleHandler.getShuffle(conf).getlDirAlloc(). + getLocalPathToRead( + "usercache/randomUser/appcache/application_12345_0001/" + + "output/attempt_12345_1_m_1_0/file.out.index", conf); + Assert.assertTrue( + "Path did not match!", path.toString().startsWith( + absLogDir1.getPath())); + } catch (EOFException e) { + // ignore + } + } finally { + absLogDir1.setWritable(true); + shuffleHandler.stop(); + FileUtil.fullyDelete(absLogDir1); + FileUtil.fullyDelete(absLogDir2); + } + } + private static void createShuffleHandlerFiles(File logDir, String user, String appId, String appAttemptId, Configuration conf, List fileMap) throws IOException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 114453f6dc9..f9f00895b21 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2744,6 +2744,10 @@ public static boolean isAclEnabled(Configuration conf) { SHARED_CACHE_PREFIX + "nm.uploader.thread-count"; public static final int DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT = 20; + public static final String YARN_SHUFFLE_BAD_DIRS_FILTER_ENABLED = + YARN_PREFIX + "shuffle.bad.dirs.filter.enabled"; + public static final boolean YARN_SHUFFLE_BAD_DIRS_FILTER_ENABLED_DEFAULT = + true; //////////////////////////////// // Federation Configs ////////////////////////////////