diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java index 1c216f430af..6abc589e331 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java @@ -24,8 +24,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.util.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -78,11 +76,19 @@ /** Used when size of file to be allocated is unknown. */ public static final int SIZE_UNKNOWN = -1; + private final DiskValidator diskValidator; + /**Create an allocator object * @param contextCfgItemName */ public LocalDirAllocator(String contextCfgItemName) { this.contextCfgItemName = contextCfgItemName; + try { + this.diskValidator = DiskValidatorFactory.getInstance( + BasicDiskValidator.NAME); + } catch (DiskErrorException e) { + throw new RuntimeException(e); + } } /** This method must be used to obtain the dir allocation context for a @@ -91,12 +97,14 @@ public LocalDirAllocator(String contextCfgItemName) { * dir allocations (e.g., mapred.local.dir). The method will * create a context for that name if it doesn't already exist. */ - private AllocatorPerContext obtainContext(String contextCfgItemName) { + private AllocatorPerContext obtainContext(String contextCfgItemName, + DiskValidator diskValidator) { synchronized (contexts) { AllocatorPerContext l = contexts.get(contextCfgItemName); if (l == null) { contexts.put(contextCfgItemName, - (l = new AllocatorPerContext(contextCfgItemName))); + (l = new AllocatorPerContext(contextCfgItemName, + diskValidator))); } return l; } @@ -148,7 +156,8 @@ public Path getLocalPathForWrite(String pathStr, long size, public Path getLocalPathForWrite(String pathStr, long size, Configuration conf, boolean checkWrite) throws IOException { - AllocatorPerContext context = obtainContext(contextCfgItemName); + AllocatorPerContext context = obtainContext(contextCfgItemName, + diskValidator); return context.getLocalPathForWrite(pathStr, size, conf, checkWrite); } @@ -162,7 +171,8 @@ public Path getLocalPathForWrite(String pathStr, long size, */ public Path getLocalPathToRead(String pathStr, Configuration conf) throws IOException { - AllocatorPerContext context = obtainContext(contextCfgItemName); + AllocatorPerContext context = obtainContext(contextCfgItemName, + diskValidator); return context.getLocalPathToRead(pathStr, conf); } @@ -178,7 +188,7 @@ public Path getLocalPathToRead(String pathStr, ) throws IOException { AllocatorPerContext context; synchronized (this) { - context = obtainContext(contextCfgItemName); + context = obtainContext(contextCfgItemName, diskValidator); } return context.getAllLocalPathsToRead(pathStr, conf); } @@ -196,7 +206,8 @@ public Path getLocalPathToRead(String pathStr, */ public File createTmpFileForWrite(String pathStr, long size, Configuration conf) throws IOException { - AllocatorPerContext context = obtainContext(contextCfgItemName); + AllocatorPerContext context = obtainContext(contextCfgItemName, + diskValidator); return context.createTmpFileForWrite(pathStr, size, conf); } @@ -231,7 +242,8 @@ public static void removeContext(String contextCfgItemName) { * @throws IOException */ public boolean ifExists(String pathStr,Configuration conf) { - AllocatorPerContext context = obtainContext(contextCfgItemName); + AllocatorPerContext context = obtainContext(contextCfgItemName, + diskValidator); return context.ifExists(pathStr, conf); } @@ -240,7 +252,8 @@ public boolean ifExists(String pathStr,Configuration conf) { * @return the current directory index for the given configuration item. */ int getCurrentDirectoryIndex() { - AllocatorPerContext context = obtainContext(contextCfgItemName); + AllocatorPerContext context = obtainContext(contextCfgItemName, + diskValidator); return context.getCurrentDirectoryIndex(); } @@ -255,6 +268,7 @@ int getCurrentDirectoryIndex() { // NOTE: the context must be accessed via a local reference as it // may be updated at any time to reference a different context private AtomicReference currentContext; + private final DiskValidator diskValidator; private static class Context { private AtomicInteger dirNumLastAccessed = new AtomicInteger(0); @@ -280,9 +294,11 @@ public int getAndIncrDirNumLastAccessed(int delta) { } } - public AllocatorPerContext(String contextCfgItemName) { + public AllocatorPerContext(String contextCfgItemName, + DiskValidator diskValidator) { this.contextCfgItemName = contextCfgItemName; this.currentContext = new AtomicReference(new Context()); + this.diskValidator = diskValidator; } /** This method gets called everytime before any read/write to make sure @@ -312,7 +328,7 @@ private Context confChanged(Configuration conf) ? new File(ctx.localFS.makeQualified(tmpDir).toUri()) : new File(dirStrings[i]); - DiskChecker.checkDir(tmpFile); + diskValidator.checkStatus(tmpFile); dirs.add(new Path(tmpFile.getPath())); dfList.add(new DF(tmpFile, 30000)); } catch (DiskErrorException de) { @@ -348,7 +364,7 @@ private Path createPath(Path dir, String path, //check whether we are able to create a directory here. If the disk //happens to be RDONLY we will fail try { - DiskChecker.checkDir(new File(file.getParent().toUri().getPath())); + diskValidator.checkStatus(new File(file.getParent().toUri().getPath())); return file; } catch (DiskErrorException d) { LOG.warn("Disk Error Exception: ", d);