diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java
index 1c216f430af..6abc589e331 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java
@@ -24,8 +24,6 @@
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.util.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -78,11 +76,19 @@
/** Used when size of file to be allocated is unknown. */
public static final int SIZE_UNKNOWN = -1;
+ private final DiskValidator diskValidator;
+
/**Create an allocator object
* @param contextCfgItemName
*/
public LocalDirAllocator(String contextCfgItemName) {
this.contextCfgItemName = contextCfgItemName;
+ try {
+ this.diskValidator = DiskValidatorFactory.getInstance(
+ BasicDiskValidator.NAME);
+ } catch (DiskErrorException e) {
+ throw new RuntimeException(e);
+ }
}
/** This method must be used to obtain the dir allocation context for a
@@ -91,12 +97,14 @@ public LocalDirAllocator(String contextCfgItemName) {
* dir allocations (e.g., mapred.local.dir). The method will
* create a context for that name if it doesn't already exist.
*/
- private AllocatorPerContext obtainContext(String contextCfgItemName) {
+ private AllocatorPerContext obtainContext(String contextCfgItemName,
+ DiskValidator diskValidator) {
synchronized (contexts) {
AllocatorPerContext l = contexts.get(contextCfgItemName);
if (l == null) {
contexts.put(contextCfgItemName,
- (l = new AllocatorPerContext(contextCfgItemName)));
+ (l = new AllocatorPerContext(contextCfgItemName,
+ diskValidator)));
}
return l;
}
@@ -148,7 +156,8 @@ public Path getLocalPathForWrite(String pathStr, long size,
public Path getLocalPathForWrite(String pathStr, long size,
Configuration conf,
boolean checkWrite) throws IOException {
- AllocatorPerContext context = obtainContext(contextCfgItemName);
+ AllocatorPerContext context = obtainContext(contextCfgItemName,
+ diskValidator);
return context.getLocalPathForWrite(pathStr, size, conf, checkWrite);
}
@@ -162,7 +171,8 @@ public Path getLocalPathForWrite(String pathStr, long size,
*/
public Path getLocalPathToRead(String pathStr,
Configuration conf) throws IOException {
- AllocatorPerContext context = obtainContext(contextCfgItemName);
+ AllocatorPerContext context = obtainContext(contextCfgItemName,
+ diskValidator);
return context.getLocalPathToRead(pathStr, conf);
}
@@ -178,7 +188,7 @@ public Path getLocalPathToRead(String pathStr,
) throws IOException {
AllocatorPerContext context;
synchronized (this) {
- context = obtainContext(contextCfgItemName);
+ context = obtainContext(contextCfgItemName, diskValidator);
}
return context.getAllLocalPathsToRead(pathStr, conf);
}
@@ -196,7 +206,8 @@ public Path getLocalPathToRead(String pathStr,
*/
public File createTmpFileForWrite(String pathStr, long size,
Configuration conf) throws IOException {
- AllocatorPerContext context = obtainContext(contextCfgItemName);
+ AllocatorPerContext context = obtainContext(contextCfgItemName,
+ diskValidator);
return context.createTmpFileForWrite(pathStr, size, conf);
}
@@ -231,7 +242,8 @@ public static void removeContext(String contextCfgItemName) {
* @throws IOException
*/
public boolean ifExists(String pathStr,Configuration conf) {
- AllocatorPerContext context = obtainContext(contextCfgItemName);
+ AllocatorPerContext context = obtainContext(contextCfgItemName,
+ diskValidator);
return context.ifExists(pathStr, conf);
}
@@ -240,7 +252,8 @@ public boolean ifExists(String pathStr,Configuration conf) {
* @return the current directory index for the given configuration item.
*/
int getCurrentDirectoryIndex() {
- AllocatorPerContext context = obtainContext(contextCfgItemName);
+ AllocatorPerContext context = obtainContext(contextCfgItemName,
+ diskValidator);
return context.getCurrentDirectoryIndex();
}
@@ -255,6 +268,7 @@ int getCurrentDirectoryIndex() {
// NOTE: the context must be accessed via a local reference as it
// may be updated at any time to reference a different context
private AtomicReference currentContext;
+ private final DiskValidator diskValidator;
private static class Context {
private AtomicInteger dirNumLastAccessed = new AtomicInteger(0);
@@ -280,9 +294,11 @@ public int getAndIncrDirNumLastAccessed(int delta) {
}
}
- public AllocatorPerContext(String contextCfgItemName) {
+ public AllocatorPerContext(String contextCfgItemName,
+ DiskValidator diskValidator) {
this.contextCfgItemName = contextCfgItemName;
this.currentContext = new AtomicReference(new Context());
+ this.diskValidator = diskValidator;
}
/** This method gets called everytime before any read/write to make sure
@@ -312,7 +328,7 @@ private Context confChanged(Configuration conf)
? new File(ctx.localFS.makeQualified(tmpDir).toUri())
: new File(dirStrings[i]);
- DiskChecker.checkDir(tmpFile);
+ diskValidator.checkStatus(tmpFile);
dirs.add(new Path(tmpFile.getPath()));
dfList.add(new DF(tmpFile, 30000));
} catch (DiskErrorException de) {
@@ -348,7 +364,7 @@ private Path createPath(Path dir, String path,
//check whether we are able to create a directory here. If the disk
//happens to be RDONLY we will fail
try {
- DiskChecker.checkDir(new File(file.getParent().toUri().getPath()));
+ diskValidator.checkStatus(new File(file.getParent().toUri().getPath()));
return file;
} catch (DiskErrorException d) {
LOG.warn("Disk Error Exception: ", d);