diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DiskChecker.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DiskChecker.java index a36a7a0..2881932 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DiskChecker.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DiskChecker.java @@ -23,6 +23,7 @@ import java.nio.file.DirectoryStream; import java.nio.file.DirectoryIteratorException; import java.nio.file.Files; +import java.util.concurrent.*; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -52,7 +53,64 @@ public DiskOutOfSpaceException(String msg) { super(msg); } } - + + public static class DiskTimeoutException extends IOException { + public DiskTimeoutException(String msg) { + super(msg); + } + } + + public static class DiskAsyncIOException extends IOException { + public DiskAsyncIOException(String msg) { + super(msg); + } + public DiskAsyncIOException(String msg, Throwable cause) { + super(msg, cause); + } + } + + private static ExecutorService asyncIOExecutor = Executors.newCachedThreadPool(); + private static ConcurrentHashMap inprogressMkdirs = + new ConcurrentHashMap(); + + private class AsyncMkdirCallable implements Callable { + private File file; + public AsyncMkdirCallable(File file) { + this.file = file; + } + public Boolean call() throws Exception { + String canon = file.getCanonicalPath(); + boolean inprogress = inprogressMkdirs.putIfAbsent(canon, true); + if (inprogress) { + throw new DiskAsyncIOException("mkdir in-progress: " + file); + } + boolean result = file.mkdir(); + inprogressMkdirs.remove(canon); + return result; + } + } + + public boolean mkdir(File file, long timeout, TimeUnit unit) throws IOException { + if (timeout == 0) { + return file.mkdir(); + } + Future future = asyncIOExecutor.submit(new AsyncMkdirCallable(file)); + Boolean result = null; + while ( result == null ) { + try { + result = future.get(timeout, unit); + } catch (InterruptedException ie) { + // Restore the interrupted status + Thread.currentThread().interrupt(); + } catch (ExecutionException ee) { + throw new DiskAsyncIOException("could not execute future for " + file, ee); + } catch (TimeoutException e) { + throw new DiskTimeoutException("timed out for " + file); + } + } + return result; + } + /** * The semantics of mkdirsWithExistsCheck method is different from the mkdirs * method provided in the Sun's java.io.File class in the following way: