diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java index b14e1f0..c25e249 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java @@ -22,6 +22,8 @@ import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.logging.*; import org.apache.hadoop.util.*; @@ -242,7 +244,68 @@ int getCurrentDirectoryIndex() { AllocatorPerContext context = obtainContext(contextCfgItemName); return context.getCurrentDirectoryIndex(); } - + + /** + * Defines the types of supported storage media for local storage. + * The default storage medium is assumed to be DISK. + */ + public enum StorageType { + SSD, + DISK; + public static final StorageType DEFAULT = DISK; + } + + /** + * Encapsulates the URI and storage medium that together describe a + * local storage directory. + * The default storage medium is assumed to be DISK, if none is specified. + */ + public static class StorageLocation { + + /** + * Regular expression that describes path with a storage type. + * e.g. [SSD]/absolute/dir or [SSD]relative/dir + */ + private static final Pattern STORAGE_TYPE_REGEX = + Pattern.compile("^\\[(\\w*)\\](.+)$"); + + private StorageType storageType; + + private String location; + + public StorageLocation(String pathStr) { + this(pathStr, StorageType.DEFAULT); + } + + protected StorageLocation(String pathStr, StorageType defaultStorageType) { + Matcher matcher = STORAGE_TYPE_REGEX.matcher(pathStr); + if (matcher.matches()) { + String storageTypeString = matcher.group(1); + this.location = matcher.group(2); + if (!storageTypeString.isEmpty()) { + this.storageType = + StorageType.valueOf(storageTypeString.toUpperCase()); + } + } else { + this.storageType = defaultStorageType; + this.location = pathStr; + } + } + + public String getLocation() { + return location; + } + + public StorageType getStorageType() { + return storageType; + } + + public String toString(){ + return (storageType==StorageType.DEFAULT ? "":"["+storageType.name()+"]") + + location; + } + } + private static class AllocatorPerContext { private final Log LOG = @@ -261,6 +324,7 @@ int getCurrentDirectoryIndex() { private DF[] dirDF; private Path[] localDirs; private String savedLocalDirs; + private StorageType[] storageTypes; public int getAndIncrDirNumLastAccessed() { return getAndIncrDirNumLastAccessed(1); @@ -301,8 +365,14 @@ private Context confChanged(Configuration conf) int numDirs = dirStrings.length; ArrayList dirs = new ArrayList(numDirs); ArrayList dfList = new ArrayList(numDirs); + ArrayList storageTypeList = + new ArrayList(numDirs); for (int i = 0; i < numDirs; i++) { try { + // parse storage type prefix and drop the prefix for dir string + StorageLocation storageLocation = + new StorageLocation(dirStrings[i]); + dirStrings[i] = storageLocation.location; // filter problematic directories Path tmpDir = new Path(dirStrings[i]); if(ctx.localFS.mkdirs(tmpDir)|| ctx.localFS.exists(tmpDir)) { @@ -313,6 +383,7 @@ private Context confChanged(Configuration conf) DiskChecker.checkDir(tmpFile); dirs.add(new Path(tmpFile.getPath())); + storageTypeList.add(storageLocation.storageType); dfList.add(new DF(tmpFile, 30000)); } catch (DiskErrorException de) { LOG.warn(dirStrings[i] + " is not writable\n", de); @@ -325,10 +396,18 @@ private Context confChanged(Configuration conf) ie.getMessage() + "\n", ie); } //ignore } + if (LOG.isDebugEnabled()) { + LOG.info(String.format( + "Value changed for conf - %s: savedLocalDirs(%s) newLocalDirs(%s)" + + " dirs(%s) storageTypes(%s)", contextCfgItemName, + ctx.savedLocalDirs, newLocalDirs, dirs.toString(), + storageTypeList.toString())); + } ctx.localDirs = dirs.toArray(new Path[dirs.size()]); ctx.dirDF = dfList.toArray(new DF[dirs.size()]); ctx.savedLocalDirs = newLocalDirs; - + ctx.storageTypes = + storageTypeList.toArray(new StorageType[storageTypeList.size()]); if (dirs.size() > 0) { // randomize the first disk picked in the round-robin selection ctx.dirNumLastAccessed.set(dirIndexRandomizer.nextInt(dirs.size())); @@ -377,6 +456,10 @@ public Path getLocalPathForWrite(String pathStr, long size, Context ctx = confChanged(conf); int numDirs = ctx.localDirs.length; int numDirsSearched = 0; + + StorageLocation locationRequirement = new StorageLocation(pathStr); + pathStr = locationRequirement.location; + //remove the leading slash from the path (to make sure that the uri //resolution results in a valid path on the dir being checked) if (pathStr.startsWith("/")) { @@ -387,12 +470,19 @@ public Path getLocalPathForWrite(String pathStr, long size, if(size == SIZE_UNKNOWN) { //do roulette selection: pick dir with probability //proportional to available size long[] availableOnDisk = new long[ctx.dirDF.length]; + long[] availableOnPreferedDisk = new long[ctx.dirDF.length]; long totalAvailable = 0; - + long totalAvailablePrefered = 0; + //build the "roulette wheel" for(int i =0; i < ctx.dirDF.length; ++i) { availableOnDisk[i] = ctx.dirDF[i].getAvailable(); totalAvailable += availableOnDisk[i]; + availableOnPreferedDisk[i] = 0; + if (ctx.storageTypes[i].equals(locationRequirement.storageType)) { + availableOnPreferedDisk[i] = ctx.dirDF[i].getAvailable(); + totalAvailablePrefered += availableOnPreferedDisk[i]; + } } if (totalAvailable == 0){ @@ -402,27 +492,41 @@ public Path getLocalPathForWrite(String pathStr, long size, // Keep rolling the wheel till we get a valid path Random r = new java.util.Random(); while (numDirsSearched < numDirs && returnPath == null) { - long randomPosition = (r.nextLong() >>> 1) % totalAvailable; + long totalAvailableOnAllOrPreferedDisk = totalAvailable; + long[] availableOnAllOrPreferedDisk = availableOnDisk; + // prefer to choose the disks of the specified type + if (totalAvailablePrefered > 0) { + totalAvailableOnAllOrPreferedDisk = totalAvailablePrefered; + availableOnAllOrPreferedDisk = availableOnPreferedDisk; + } + long randomPosition = + (r.nextLong() >>> 1) % totalAvailableOnAllOrPreferedDisk; int dir = 0; - while (randomPosition > availableOnDisk[dir]) { - randomPosition -= availableOnDisk[dir]; + while (randomPosition > availableOnAllOrPreferedDisk[dir]) { + randomPosition -= availableOnAllOrPreferedDisk[dir]; dir++; } ctx.dirNumLastAccessed.set(dir); returnPath = createPath(ctx.localDirs[dir], pathStr, checkWrite); if (returnPath == null) { - totalAvailable -= availableOnDisk[dir]; + totalAvailable -= availableOnAllOrPreferedDisk[dir]; availableOnDisk[dir] = 0; // skip this disk + totalAvailablePrefered -= availableOnAllOrPreferedDisk[dir]; + availableOnPreferedDisk[dir] = 0; numDirsSearched++; } } } else { int dirNum = ctx.getAndIncrDirNumLastAccessed(); - while (numDirsSearched < numDirs) { + while (numDirsSearched < 2 * numDirs) { long capacity = ctx.dirDF[dirNum].getAvailable(); - if (capacity > size) { - returnPath = - createPath(ctx.localDirs[dirNum], pathStr, checkWrite); + //prefer to choose the disks of the specified storage type + // in the first round + // and fallback to not care storage type in the second round + if ((numDirsSearched < numDirs && capacity > size && + ctx.storageTypes[dirNum].equals(locationRequirement.storageType)) + || (numDirsSearched >= numDirs && capacity > size)) { + returnPath = createPath(ctx.localDirs[dirNum], pathStr, checkWrite); if (returnPath != null) { ctx.getAndIncrDirNumLastAccessed(numDirsSearched); break; @@ -436,7 +540,7 @@ public Path getLocalPathForWrite(String pathStr, long size, if (returnPath != null) { return returnPath; } - + //no path found throw new DiskErrorException("Could not find any valid local " + "directory for " + pathStr); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java index 825efe0..cba18a9 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java @@ -473,4 +473,137 @@ public void testGetLocalPathForWriteForInvalidPaths() throws Exception { } } + /** + * Test request local-dirs with specified storage type(SSD/DISK) + * on three disk distributions(ALL_SSD/ALL_DISK/MIXED) + * and two test size(SIZE_UNKNOWN/SMALL_FILE_SIZE). + */ + @Test(timeout = 30000) public void testGetLocalPathForWriteForStorageType() + throws IOException { + /* + * prepare test parameters: disk distributions & expected results + * for getting SSD,DISK,DEFAULT locations + */ + // all ssd + LocalDirAllocator.StorageType[] allSSDDistribution = + new LocalDirAllocator.StorageType[]{ + LocalDirAllocator.StorageType.SSD, + LocalDirAllocator.StorageType.SSD, + LocalDirAllocator.StorageType.SSD, + LocalDirAllocator.StorageType.SSD, + LocalDirAllocator.StorageType.SSD, + LocalDirAllocator.StorageType.SSD }; + LocalDirAllocator.StorageType[] allSSDExpectedResult = + new LocalDirAllocator.StorageType[]{ + LocalDirAllocator.StorageType.SSD, + LocalDirAllocator.StorageType.SSD, + LocalDirAllocator.StorageType.SSD }; + // all disk + LocalDirAllocator.StorageType[] allDISKDistribution = + new LocalDirAllocator.StorageType[]{ + LocalDirAllocator.StorageType.DISK, + LocalDirAllocator.StorageType.DISK, + LocalDirAllocator.StorageType.DISK, + LocalDirAllocator.StorageType.DISK, + LocalDirAllocator.StorageType.DISK, + LocalDirAllocator.StorageType.DISK }; + LocalDirAllocator.StorageType[] allDISKExpectedResult = + new LocalDirAllocator.StorageType[]{ + LocalDirAllocator.StorageType.DISK, + LocalDirAllocator.StorageType.DISK, + LocalDirAllocator.StorageType.DISK }; + // mixed + LocalDirAllocator.StorageType[] mixedDistribution = + new LocalDirAllocator.StorageType[]{ + LocalDirAllocator.StorageType.DISK, + LocalDirAllocator.StorageType.SSD, + LocalDirAllocator.StorageType.DISK, + LocalDirAllocator.StorageType.DISK, + LocalDirAllocator.StorageType.SSD, + LocalDirAllocator.StorageType.DISK, + LocalDirAllocator.StorageType.DISK, + LocalDirAllocator.StorageType.DISK, + LocalDirAllocator.StorageType.SSD, + LocalDirAllocator.StorageType.DISK, + LocalDirAllocator.StorageType.DISK, + LocalDirAllocator.StorageType.DISK, + LocalDirAllocator.StorageType.DISK, + LocalDirAllocator.StorageType.SSD }; + LocalDirAllocator.StorageType[] mixedExpectedResult = + new LocalDirAllocator.StorageType[] { + LocalDirAllocator.StorageType.SSD, + LocalDirAllocator.StorageType.DISK, + LocalDirAllocator.StorageType.DISK }; + + LocalDirAllocator.StorageType[][] diskDistributions = + new LocalDirAllocator.StorageType[][] { + allSSDDistribution, allDISKDistribution, mixedDistribution }; + LocalDirAllocator.StorageType[][] expectedResults = + new LocalDirAllocator.StorageType[][] { + allSSDExpectedResult, allDISKExpectedResult, mixedExpectedResult }; + // prepare other test parameters + int[] testSizes = + new int[] {SMALL_FILE_SIZE, LocalDirAllocator.SIZE_UNKNOWN }; + int testNumForSingleAssert = 20; + /* + * test start + */ + try { + for (int distributionNo = 0; + distributionNo < diskDistributions.length; distributionNo++) { + LocalDirAllocator.StorageType[] diskDistribution = + diskDistributions[distributionNo]; + LocalDirAllocator.StorageType[] expectedResult = + expectedResults[distributionNo]; + StringBuffer localDirs = new StringBuffer(); + for (int i = 0; i < diskDistribution.length; i++) { + if (i > 0) { + localDirs.append(","); + } + localDirs.append( + diskDistribution[i] == LocalDirAllocator.StorageType.SSD ? + "[SSD]" : + "").append(buildBufferDir(ROOT, i)); + } + conf.set(CONTEXT, localDirs.toString()); + for (int testSize : testSizes) { + System.out.println(String.format( + "testGetSSDLocalPathForWrite - TEST CASE: " + + "diskDistribution(%s) testSize(%d)", + Arrays.toString(diskDistribution), testSize)); + for (int testNo = 0; testNo < testNumForSingleAssert; testNo++) { + //Check SSD requirements + Path p1 = dirAllocator + .getLocalPathForWrite("[SSD]ssdp1/ssdx", testSize, conf); + assertTrue(localFs.getFileStatus(p1.getParent()).isDirectory()); + Integer dirIndex1 = Integer.valueOf( + p1.getParent().getParent().getName() + .substring(PREFIX.length() - 1)); + assertEquals(expectedResult[0], diskDistribution[dirIndex1]); + //Check DISK requirements + Path p2 = dirAllocator + .getLocalPathForWrite("[DISK]hddp2/hddx", testSize, conf); + Integer dirIndex2 = Integer.valueOf( + p2.getParent().getParent().getName() + .substring(PREFIX.length() - 1)); + assertTrue(localFs.getFileStatus(p2.getParent()).isDirectory()); + assertEquals(expectedResult[1], diskDistribution[dirIndex2]); + //Check DEFAULT requirements + Path p3 = + dirAllocator.getLocalPathForWrite("hddp3/hddx", testSize, conf); + Integer dirIndex3 = Integer.valueOf( + p3.getParent().getParent().getName() + .substring(PREFIX.length() - 1)); + assertTrue(localFs.getFileStatus(p3.getParent()).isDirectory()); + assertEquals(expectedResult[2], diskDistribution[dirIndex3]); + } + } + } + } finally { + Shell.execCommand( + Shell.getSetPermissionCommand("u+w", false, BUFFER_DIR_ROOT)); + rmBufferDirs(); + } + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java index 164f19d..4b0852c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java @@ -266,8 +266,13 @@ private static void configureLocalDirs(Task task, JobConf job) throws IOExceptio // be created below. } if (workDir == null) { + String storageTypePrefix = ""; + String storageType = System.getenv(Environment.LOCAL_STORAGE_TYPE.name()); + if (storageType != null && !storageType.trim().isEmpty()) { + storageTypePrefix = "[" + storageType + "]"; + } // JOB_LOCAL_DIR doesn't exist on this host -- Create it. - workDir = lDirAlloc.getLocalPathForWrite("work", job); + workDir = lDirAlloc.getLocalPathForWrite(storageTypePrefix + "work", job); FileSystem lfs = FileSystem.getLocal(job).getRaw(); boolean madeDir = false; try { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java index e099b8f..a410029 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java @@ -20,8 +20,6 @@ import java.io.IOException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -29,6 +27,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.MRJobConfig; /** * Manipulate the working area for the transient store for maps and reduces. @@ -79,8 +78,9 @@ public Path getOutputFile() throws IOException { * @throws IOException */ public Path getOutputFileForWrite(long size) throws IOException { - Path attemptOutput = - new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING); + Path attemptOutput = + new Path(getStorageTypePrefix() + getAttemptOutputDir(), + MAP_OUTPUT_FILENAME_STRING); return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf); } @@ -116,8 +116,8 @@ public Path getOutputIndexFile() throws IOException { */ public Path getOutputIndexFileForWrite(long size) throws IOException { Path attemptIndexOutput = - new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING + - MAP_OUTPUT_INDEX_SUFFIX_STRING); + new Path(getStorageTypePrefix() + getAttemptOutputDir(), + MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING); return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(), size, conf); } @@ -156,9 +156,9 @@ public Path getSpillFile(int spillNumber) throws IOException { */ public Path getSpillFileForWrite(int spillNumber, long size) throws IOException { - return lDirAlloc.getLocalPathForWrite( - String.format(SPILL_FILE_PATTERN, - conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), size, conf); + return lDirAlloc.getLocalPathForWrite(getStorageTypePrefix() + String + .format(SPILL_FILE_PATTERN, conf.get(JobContext.TASK_ATTEMPT_ID), + spillNumber), size, conf); } /** @@ -170,7 +170,7 @@ public Path getSpillFileForWrite(int spillNumber, long size) */ public Path getSpillIndexFile(int spillNumber) throws IOException { return lDirAlloc.getLocalPathToRead( - String.format(SPILL_INDEX_FILE_PATTERN, + String.format(SPILL_INDEX_FILE_PATTERN, conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), conf); } @@ -184,9 +184,9 @@ public Path getSpillIndexFile(int spillNumber) throws IOException { */ public Path getSpillIndexFileForWrite(int spillNumber, long size) throws IOException { - return lDirAlloc.getLocalPathForWrite( - String.format(SPILL_INDEX_FILE_PATTERN, - conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), size, conf); + return lDirAlloc.getLocalPathForWrite(getStorageTypePrefix() + String + .format(SPILL_INDEX_FILE_PATTERN, + conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), size, conf); } /** @@ -210,10 +210,9 @@ public Path getInputFile(int mapId) throws IOException { */ public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId, long size) throws IOException { - return lDirAlloc.getLocalPathForWrite(String.format( - REDUCE_INPUT_FILE_FORMAT_STRING, - getAttemptOutputDir().toString(), mapId.getId()), - size, conf); + return lDirAlloc.getLocalPathForWrite(getStorageTypePrefix() + String + .format(REDUCE_INPUT_FILE_FORMAT_STRING, + getAttemptOutputDir().toString(), mapId.getId()), size, conf); } /** Removes all of the files related to a task. */ @@ -234,5 +233,12 @@ public void setConf(Configuration conf) { public Configuration getConf() { return conf; } - + + private String getStorageTypePrefix() { + String storageType = getConf().get(MRJobConfig.MR_LOCAL_STORAGE_TYPE); + if (storageType != null && !storageType.trim().isEmpty()) { + return "[" + storageType + "]"; + } + return ""; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 4305824..8a706c2 100755 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -904,6 +904,25 @@ private static ContainerLaunchContext createCommonContainerLaunchContext( MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV), conf ); + // Setup the environment variables for storage type of + // local-dirs and log-dirs + String localStorageType = conf.get(MRJobConfig.MR_LOCAL_STORAGE_TYPE); + if (localStorageType != null && !localStorageType.isEmpty()) { + environment.put(Environment.LOCAL_STORAGE_TYPE.name(), localStorageType); + } + String logStorageType = conf.get(MRJobConfig.MR_LOG_STORAGE_TYPE); + if (logStorageType != null && !logStorageType.isEmpty()) { + environment.put(Environment.LOG_STORAGE_TYPE.name(), logStorageType); + } + if (conf.get(MRJobConfig.MR_ENSURE_LOCAL_STORAGE_TYPE) != null) { + environment.put(Environment.ENSURE_LOCAL_STORAGE_TYPE.name(), + conf.get(MRJobConfig.MR_ENSURE_LOCAL_STORAGE_TYPE)); + } + if (conf.get(MRJobConfig.MR_ENSURE_LOG_STORAGE_TYPE) != null) { + environment.put(Environment.ENSURE_LOG_STORAGE_TYPE.name(), + conf.get(MRJobConfig.MR_ENSURE_LOG_STORAGE_TYPE)); + } + // Construct the actual Container // The null fields are per-container and will be constructed for each // container separately. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MROutputFiles.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MROutputFiles.java index cb480a8..7630eb8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MROutputFiles.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MROutputFiles.java @@ -68,8 +68,9 @@ public Path getOutputFile() @Override public Path getOutputFileForWrite(long size) throws IOException { - return lDirAlloc.getLocalPathForWrite(MRJobConfig.OUTPUT + Path.SEPARATOR - + MAP_OUTPUT_FILENAME_STRING, size, getConf()); + return lDirAlloc.getLocalPathForWrite( + getStorageTypePrefix() + MRJobConfig.OUTPUT + Path.SEPARATOR + + MAP_OUTPUT_FILENAME_STRING, size, getConf()); } /** @@ -104,9 +105,10 @@ public Path getOutputIndexFile() @Override public Path getOutputIndexFileForWrite(long size) throws IOException { - return lDirAlloc.getLocalPathForWrite(MRJobConfig.OUTPUT + Path.SEPARATOR - + MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING, - size, getConf()); + return lDirAlloc.getLocalPathForWrite( + getStorageTypePrefix() + MRJobConfig.OUTPUT + Path.SEPARATOR + + MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING, size, + getConf()); } /** @@ -143,8 +145,9 @@ public Path getSpillFile(int spillNumber) @Override public Path getSpillFileForWrite(int spillNumber, long size) throws IOException { - return lDirAlloc.getLocalPathForWrite(MRJobConfig.OUTPUT + "/spill" - + spillNumber + ".out", size, getConf()); + return lDirAlloc.getLocalPathForWrite( + getStorageTypePrefix() + MRJobConfig.OUTPUT + "/spill" + spillNumber + + ".out", size, getConf()); } /** @@ -172,8 +175,9 @@ public Path getSpillIndexFile(int spillNumber) @Override public Path getSpillIndexFileForWrite(int spillNumber, long size) throws IOException { - return lDirAlloc.getLocalPathForWrite(MRJobConfig.OUTPUT + "/spill" - + spillNumber + ".out.index", size, getConf()); + return lDirAlloc.getLocalPathForWrite( + getStorageTypePrefix() + MRJobConfig.OUTPUT + "/spill" + spillNumber + + ".out.index", size, getConf()); } /** @@ -203,9 +207,9 @@ public Path getInputFile(int mapId) public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId, long size) throws IOException { - return lDirAlloc.getLocalPathForWrite(String.format( - REDUCE_INPUT_FILE_FORMAT_STRING, MRJobConfig.OUTPUT, mapId.getId()), - size, getConf()); + return lDirAlloc.getLocalPathForWrite(getStorageTypePrefix() + String + .format(REDUCE_INPUT_FILE_FORMAT_STRING, MRJobConfig.OUTPUT, + mapId.getId()), size, getConf()); } /** Removes all of the files related to a task. */ @@ -223,4 +227,12 @@ public void setConf(Configuration conf) { super.setConf(conf); } + private String getStorageTypePrefix() { + String storageType = getConf().get(MRJobConfig.MR_LOCAL_STORAGE_TYPE); + if (storageType != null && !storageType.trim().isEmpty()) { + return "[" + storageType + "]"; + } + return ""; + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 5716404..15b794b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -756,6 +756,15 @@ "LD_LIBRARY_PATH=" + Apps.crossPlatformify("HADOOP_COMMON_HOME") + "/lib/native"; + public static final String MR_LOCAL_STORAGE_TYPE = + "mapreduce.job.local-storage-type"; + public static final String MR_LOG_STORAGE_TYPE = + "mapreduce.job.log-storage-type"; + public static final String MR_ENSURE_LOCAL_STORAGE_TYPE = + "mapreduce.job.ensure-local-storage-type"; + public static final String MR_ENSURE_LOG_STORAGE_TYPE = + "mapreduce.job.ensure-log-storage-type"; + public static final String MR_AM_PROFILE = MR_AM_PREFIX + "profile"; public static final boolean DEFAULT_MR_AM_PROFILE = false; public static final String MR_AM_PROFILE_PARAMS = MR_AM_PREFIX diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 4c6f0f3..902ab83 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -483,6 +483,28 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( MRApps.setEnvFromInputString(environment, conf.get(MRJobConfig.MR_AM_ENV), conf); + // Setup the environment variables for storage type of local/log dirs + String localStorageType = conf.get(MRJobConfig.MR_LOCAL_STORAGE_TYPE); + if (localStorageType != null && !localStorageType.isEmpty()) { + environment.put(Environment.LOCAL_STORAGE_TYPE.name(), localStorageType); + } + String logStorageType = conf.get(MRJobConfig.MR_LOG_STORAGE_TYPE); + if (logStorageType != null && !logStorageType.isEmpty()) { + environment.put(Environment.LOG_STORAGE_TYPE.name(), logStorageType); + } + String ensureLocalStorageType = + conf.get(MRJobConfig.MR_ENSURE_LOCAL_STORAGE_TYPE); + if (ensureLocalStorageType != null && !ensureLocalStorageType.isEmpty()) { + environment.put(Environment.ENSURE_LOCAL_STORAGE_TYPE.name(), + ensureLocalStorageType); + } + String ensureLogStorageType = + conf.get(MRJobConfig.MR_ENSURE_LOG_STORAGE_TYPE); + if (ensureLogStorageType != null && !ensureLogStorageType.isEmpty()) { + environment.put(Environment.ENSURE_LOG_STORAGE_TYPE.name(), + ensureLogStorageType); + } + // Parse distributed cache MRApps.setupDistributedCache(jobConf, localResources); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java index 760e251..1ed2ad0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java @@ -241,7 +241,33 @@ * Comma separate list of directories that the container should use for * logging. */ - LOG_DIRS("LOG_DIRS"); + LOG_DIRS("LOG_DIRS"), + + /** + * $LOCAL_STORAGE_TYPE + * use SSD/DISK to storage local files of the application. + */ + LOCAL_STORAGE_TYPE("LOCAL_STORAGE_TYPE"), + + /** + * $LOG_STORAGE_TYPE + * use SSD/DISK to storage log files of the application. + */ + LOG_STORAGE_TYPE("LOG_STORAGE_TYPE"), + + /** + * $ENSURE_LOCAL_STORAGE_TYPE + * if true and fallback to un-desired storage medium, + * fail to launch container. + */ + ENSURE_LOCAL_STORAGE_TYPE("ENSURE_LOCAL_STORAGE_TYPE"), + + /** + * $ENSURE_LOCAL_STORAGE_TYPE + * if true and fallback to un-desired storage medium, + * fail to launch container. + */ + ENSURE_LOG_STORAGE_TYPE("ENSURE_LOG_STORAGE_TYPE"); private final String variable; private Environment(String variable) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java index f8cb4ee..8947311 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java @@ -22,8 +22,10 @@ import java.io.IOException; import java.net.URI; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.Timer; import java.util.TimerTask; @@ -107,6 +109,9 @@ private NodeManagerMetrics nodeManagerMetrics = null; + private static Map + localStorageLocationMap; + /** * Class which is used by the {@link Timer} class to periodically execute the * disks' health checker code. @@ -134,6 +139,22 @@ public MonitoringTimerTask(Configuration conf) throws YarnRuntimeException { long minFreeSpacePerDiskMB = conf.getLong(YarnConfiguration.NM_MIN_PER_DISK_FREE_SPACE_MB, YarnConfiguration.DEFAULT_NM_MIN_PER_DISK_FREE_SPACE_MB); + String[] localDirConfigs = + conf.getTrimmedStrings(YarnConfiguration.NM_LOCAL_DIRS); + localStorageLocationMap = + new HashMap(); + for (String localDirConfig : localDirConfigs) { + LocalDirAllocator.StorageLocation location = + new LocalDirAllocator.StorageLocation(localDirConfig); + localStorageLocationMap.put(location.getLocation(), location); + } + String[] logDirConfigs = + conf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS); + for (String logDirConfig : logDirConfigs) { + LocalDirAllocator.StorageLocation location = + new LocalDirAllocator.StorageLocation(logDirConfig); + localStorageLocationMap.put(location.getLocation(), location); + } localDirs = new DirectoryCollection( validatePaths(conf @@ -433,12 +454,14 @@ private boolean isInGoodDirs(List goodDirs, String path) { private void updateDirsAfterTest() { Configuration conf = getConfig(); - List localDirs = getLocalDirs(); + List typePrefixLocalDirs = + getLocalPathsWithStorageTypePrefix(getLocalDirs()); conf.setStrings(NM_GOOD_LOCAL_DIRS, - localDirs.toArray(new String[localDirs.size()])); - List logDirs = getLogDirs(); + typePrefixLocalDirs.toArray(new String[typePrefixLocalDirs.size()])); + List typePrefixLogDirs = + getLocalPathsWithStorageTypePrefix(getLogDirs()); conf.setStrings(NM_GOOD_LOG_DIRS, - logDirs.toArray(new String[logDirs.size()])); + typePrefixLogDirs.toArray(new String[typePrefixLogDirs.size()])); if (!areDisksHealthy()) { // Just log. LOG.error("Most of the disks failed. " + getDisksHealthReport(false)); @@ -566,7 +589,7 @@ public Path getLocalPathForWrite(String pathStr) throws IOException { public Path getLocalPathForWrite(String pathStr, long size, boolean checkWrite) throws IOException { return localDirsAllocator.getLocalPathForWrite(pathStr, size, getConfig(), - checkWrite); + checkWrite); } public Path getLocalPathForRead(String pathStr) throws IOException { @@ -587,7 +610,10 @@ public Path getLogPathToRead(String pathStr) throws IOException { ArrayList validPaths = new ArrayList(); for (int i = 0; i < paths.length; ++i) { try { - URI uriPath = (new Path(paths[i])).toUri(); + //support path with storage type prefix + LocalDirAllocator.StorageLocation storageLocation = + new LocalDirAllocator.StorageLocation(paths[i]); + URI uriPath = (new Path(storageLocation.getLocation())).toUri(); if (uriPath.getScheme() == null || uriPath.getScheme().equals(FILE_SCHEME)) { validPaths.add(new Path(uriPath.getPath()).toString()); @@ -620,4 +646,43 @@ protected void updateMetrics() { logDirs.getGoodDirsDiskUtilizationPercentage()); } } + + public Path getLocalPathWithStorageTypePrefix(String pathStr) { + if (pathStr == null) { + return null; + } + pathStr = pathStr.endsWith("/") ? pathStr : pathStr + "/"; + LocalDirAllocator.StorageType storageType = getStorageType(pathStr); + if (storageType != null + && storageType != LocalDirAllocator.StorageType.DEFAULT) { + //add storage type prefix for non-default storage type + return new Path(String.format("[%s]%s", storageType.name(), pathStr)); + } + return new Path(pathStr); + } + + private List getLocalPathsWithStorageTypePrefix( + List nonTypePrefixLocalDirs) { + List localStorageLocations = new ArrayList(); + for (String nonTypePrefixLocalDir : nonTypePrefixLocalDirs) { + localStorageLocations.add( + getLocalPathWithStorageTypePrefix(nonTypePrefixLocalDir).toString()); + } + return localStorageLocations; + } + + public LocalDirAllocator.StorageType getStorageType(String pathStr) { + if (pathStr == null) { + return null; + } + pathStr = pathStr.endsWith("/") ? pathStr : pathStr + "/"; + for (String localDir : localStorageLocationMap.keySet()) { + String localDirRootPath = + localDir.endsWith("/") ? localDir : localDir + "/"; + if (pathStr.startsWith(localDirRootPath)) { + return localStorageLocationMap.get(localDir).getStorageType(); + } + } + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index d8239ef..6d50d33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -171,14 +171,18 @@ public Integer call() { Map> localResources = getLocalizedResources(); final String user = container.getUser(); + String logPathPrefix = getStorageTypePrefix(launchContext, + Environment.LOG_STORAGE_TYPE.name()); + String localPathPrefix = getStorageTypePrefix(launchContext, + Environment.LOCAL_STORAGE_TYPE.name()); // /////////////////////////// Variable expansion // Before the container script gets written out. List newCmds = new ArrayList(command.size()); String appIdStr = app.getAppId().toString(); String relativeContainerLogDir = ContainerLaunch .getRelativeContainerLogDir(appIdStr, containerIdStr); - containerLogDir = - dirsHandler.getLogPathForWrite(relativeContainerLogDir, false); + containerLogDir = dirsHandler + .getLogPathForWrite(logPathPrefix + relativeContainerLogDir, false); recordContainerLogDir(containerID, containerLogDir.toString()); for (String str : command) { // TODO: Should we instead work via symlinks without this grammar? @@ -197,36 +201,56 @@ public Integer call() { FileContext lfs = FileContext.getLocalFSFileContext(); - Path nmPrivateContainerScriptPath = - dirsHandler.getLocalPathForWrite( - getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR - + CONTAINER_SCRIPT); - Path nmPrivateTokensPath = - dirsHandler.getLocalPathForWrite( - getContainerPrivateDir(appIdStr, containerIdStr) - + Path.SEPARATOR - + String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, - containerIdStr)); - Path nmPrivateClasspathJarDir = - dirsHandler.getLocalPathForWrite( - getContainerPrivateDir(appIdStr, containerIdStr)); + Path nmPrivateContainerScriptPath = dirsHandler.getLocalPathForWrite( + localPathPrefix + getContainerPrivateDir(appIdStr, containerIdStr) + + Path.SEPARATOR + CONTAINER_SCRIPT); + Path nmPrivateTokensPath = dirsHandler.getLocalPathForWrite( + localPathPrefix + getContainerPrivateDir(appIdStr, containerIdStr) + + Path.SEPARATOR + String + .format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, containerIdStr)); + Path nmPrivateClasspathJarDir = dirsHandler.getLocalPathForWrite( + localPathPrefix + getContainerPrivateDir(appIdStr, containerIdStr)); DataOutputStream containerScriptOutStream = null; DataOutputStream tokensOutStream = null; // Select the working directory for the container - Path containerWorkDir = - dirsHandler.getLocalPathForWrite(ContainerLocalizer.USERCACHE - + Path.SEPARATOR + user + Path.SEPARATOR - + ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr - + Path.SEPARATOR + containerIdStr, - LocalDirAllocator.SIZE_UNKNOWN, false); + Path containerWorkDir = dirsHandler.getLocalPathForWrite( + localPathPrefix + ContainerLocalizer.USERCACHE + Path.SEPARATOR + user + + Path.SEPARATOR + ContainerLocalizer.APPCACHE + Path.SEPARATOR + + appIdStr + Path.SEPARATOR + containerIdStr, + LocalDirAllocator.SIZE_UNKNOWN, false); recordContainerWorkDir(containerID, containerWorkDir.toString()); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Got local dir for %s : workDir(%s) logDir(%s)", + containerID, containerWorkDir, containerLogDir)); + } + + //ensure storage type for container work dir + if (environment.containsKey(Environment.LOCAL_STORAGE_TYPE.name()) + && environment + .containsKey(Environment.ENSURE_LOCAL_STORAGE_TYPE.name()) && Boolean + .parseBoolean( + environment.get(Environment.ENSURE_LOCAL_STORAGE_TYPE.name()))) { + String requirement = + environment.get(Environment.LOCAL_STORAGE_TYPE.name()); + checkStorageType(containerWorkDir.toString(), requirement); + } + //ensure storage type for container log dir + if (environment.containsKey(Environment.LOG_STORAGE_TYPE.name()) + && environment.containsKey(Environment.ENSURE_LOG_STORAGE_TYPE.name()) + && Boolean.parseBoolean( + environment.get(Environment.ENSURE_LOG_STORAGE_TYPE.name()))) { + String requirement = + environment.get(Environment.LOG_STORAGE_TYPE.name()); + checkStorageType(containerLogDir.toString(), requirement); + } String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr); // pid file should be in nm private dir so that it is not // accessible by users - pidFilePath = dirsHandler.getLocalPathForWrite(pidFileSubpath); + pidFilePath = + dirsHandler.getLocalPathForWrite(localPathPrefix + pidFileSubpath); List localDirs = dirsHandler.getLocalDirs(); List logDirs = dirsHandler.getLogDirs(); List filecacheDirs = getNMFilecacheDirs(localDirs); @@ -248,6 +272,8 @@ public Integer call() { Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE); Path userdir = new Path(usersdir, user); Path appsdir = new Path(userdir, ContainerLocalizer.APPCACHE); + appsdir = dirsHandler.getLocalPathWithStorageTypePrefix( + appsdir.toString()); //add storage type prefix appDirs.add(new Path(appsdir, appIdStr)); } containerScriptOutStream = @@ -311,6 +337,18 @@ public Integer call() { return ret; } + private void checkStorageType(String pathStr, String requirement) + throws IOException { + LocalDirAllocator.StorageType storageType = + dirsHandler.getStorageType(pathStr); + if (!storageType.name().equals(requirement)) { + throw new IOException(String.format( + "Failed to launch container when the specified storage type(%s) " + + "for container work dir can't be satisfied(got %s)!", + requirement, storageType.name())); + } + } + @SuppressWarnings("unchecked") protected boolean validateContainerState() { // CONTAINER_KILLED_ON_REQUEST should not be missed if the container @@ -787,6 +825,15 @@ private String getAppPrivateDir(String appIdStr) { + appIdStr; } + private String getStorageTypePrefix(ContainerLaunchContext launchContext, + String key) { + String storageTypeStr = launchContext.getEnvironment().get(key); + if (storageTypeStr != null && !storageTypeStr.isEmpty()) { + return "[" + storageTypeStr + "]"; + } + return ""; + } + Context getContext() { return context; }