diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java index 1ab5a37..107a3b3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java @@ -19,9 +19,14 @@ package org.apache.hadoop.yarn.server.nodemanager; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import static java.util.concurrent.TimeUnit.*; import org.apache.hadoop.fs.Path; @@ -42,7 +47,9 @@ private int debugDelay; private final ContainerExecutor exec; private ScheduledThreadPoolExecutor sched; - private final FileContext lfs = getLfs(); + private static final FileContext lfs = getLfs(); + private final HashMap> deletionTaskDependencyMap; + static final FileContext getLfs() { try { return FileContext.getLocalFSFileContext(); @@ -55,6 +62,8 @@ public DeletionService(ContainerExecutor exec) { super(DeletionService.class.getName()); this.exec = exec; this.debugDelay = 0; + this.deletionTaskDependencyMap = + new HashMap>(); } /** @@ -67,12 +76,83 @@ public DeletionService(ContainerExecutor exec) { */ public void delete(String user, Path subDir, Path... baseDirs) { // TODO if parent owned by NM, rename within parent inline + delete(new FileDeletion(this, user, subDir, baseDirs)); + } + + public void delete(FileDeletion fileDeletion) { if (debugDelay != -1) { - sched.schedule(new FileDeletion(user, subDir, baseDirs), debugDelay, - TimeUnit.SECONDS); + sched.schedule(fileDeletion, debugDelay, TimeUnit.SECONDS); } } - + + public void fileDeletionFinished(FileDeletion fileDeletion, + boolean success) { + synchronized (this.deletionTaskDependencyMap) { + List dependentTasks = + this.deletionTaskDependencyMap.remove(fileDeletion); + for (FileDeletion dependentTask : dependentTasks) { + int count = dependentTask.decrementAndGetDependentTaskCount(); + if (success && count == 0) { + // This task finished successfully and child dependent task is not + // waiting for any more tasks. + LOG.info("Starting dependentTask (parent task succeeded)" + + fileDeletion); + delete(dependentTask); + } else if (!success) { + // This task failed. If child task is configured to execute even if + // parent task failed then execute it if count drops to 0 else mark + // this child task as also failed. + if (dependentTask.getCancelIfAnyDependentTaskFailedFlag()) { + LOG.info("Cancelling dependentTask " + fileDeletion); + fileDeletionFinished(dependentTask, false); + } else if (count == 0) { + LOG.info("Starting dependentTask (even though parent task failed)" + + fileDeletion); + delete(dependentTask); + } + } + } + } + } + + /** + * This is to be used to define Deletion task dependency. + * Ex. we have tasks 1,2,3,4,5,6 to be executed. + * Dependency is child -> parent + * 4 -> 1,2 + * 3 -> 1,2 + * 5 -> 4 + * 6 -> 3,4 + * To specify this use + * parentTasks(1,2) , childDependentTasks(3,4) + * parentTasks(4) , childDependentTasks(5,6) + * parentTasks(3) , childDependentTasks(6) + * + * Once dependency is defined call delete(FileDeletion) for task 1 and 2. + * Deletion service will take care of executing dependent tasks as per the + * mapping specified. + * + * @param parentTasks + * @param childDependentTasks + */ + public void populateFileDeletionTaskDependency(List parentTasks, + List childDependentTasks) { + synchronized (deletionTaskDependencyMap) { + for (FileDeletion parentTask : parentTasks) { + if (!this.deletionTaskDependencyMap.containsKey(parentTask)) { + this.deletionTaskDependencyMap.put(parentTask, + new ArrayList()); + } + List dependentTaskList = + this.deletionTaskDependencyMap.get(parentTask); + for (FileDeletion childDependencyTask : childDependentTasks) { + childDependencyTask.incrementAndGetDependentTaskCount(); + dependentTaskList.add(childDependencyTask); + } + } + } + } + @Override protected void serviceInit(Configuration conf) throws Exception { ThreadFactory tf = new ThreadFactoryBuilder() @@ -118,46 +198,106 @@ public boolean isTerminated() { return getServiceState() == STATE.STOPPED && sched.isTerminated(); } - private class FileDeletion implements Runnable { - final String user; - final Path subDir; - final Path[] baseDirs; - FileDeletion(String user, Path subDir, Path[] baseDirs) { + public static class FileDeletion implements Runnable { + private final String user; + private final Path subDir; + private final Path[] baseDirs; + private final boolean cancelIfAnyDependentTaskFailed; + private final AtomicInteger numberOfDependentTask; + private final DeletionService delService; + + private FileDeletion(DeletionService delService, String user, Path subDir, + Path[] baseDirs) { + this.delService = delService; this.user = user; this.subDir = subDir; this.baseDirs = baseDirs; + this.cancelIfAnyDependentTaskFailed = false; + this.numberOfDependentTask = new AtomicInteger(0); } + + private FileDeletion(DeletionService delService, String user, Path subDir, + Path[] baseDirs, boolean cancelIfAnyDependentTaskFailed) { + this.delService = delService; + this.user = user; + this.subDir = subDir; + this.baseDirs = baseDirs; + this.cancelIfAnyDependentTaskFailed = cancelIfAnyDependentTaskFailed; + this.numberOfDependentTask = new AtomicInteger(0); + } + + public int incrementAndGetDependentTaskCount() { + return numberOfDependentTask.incrementAndGet(); + } + + public int decrementAndGetDependentTaskCount() { + return numberOfDependentTask.decrementAndGet(); + } + + public boolean getCancelIfAnyDependentTaskFailedFlag() { + return this.cancelIfAnyDependentTaskFailed; + } + @Override public void run() { + boolean error = false; if (null == user) { if (baseDirs == null || baseDirs.length == 0) { LOG.debug("NM deleting absolute path : " + subDir); try { lfs.delete(subDir, true); } catch (IOException e) { + error = true; LOG.warn("Failed to delete " + subDir); } - return; - } - for (Path baseDir : baseDirs) { - Path del = subDir == null? baseDir : new Path(baseDir, subDir); - LOG.debug("NM deleting path : " + del); - try { - lfs.delete(del, true); - } catch (IOException e) { - LOG.warn("Failed to delete " + subDir); + } else { + for (Path baseDir : baseDirs) { + Path del = subDir == null? baseDir : new Path(baseDir, subDir); + LOG.debug("NM deleting path : " + del); + try { + lfs.delete(del, true); + } catch (IOException e) { + error = true; + LOG.warn("Failed to delete " + subDir); + } } } } else { try { LOG.debug("Deleting path: [" + subDir + "] as user: [" + user + "]"); - exec.deleteAsUser(user, subDir, baseDirs); + delService.exec.deleteAsUser(user, subDir, baseDirs); } catch (IOException e) { + error = true; LOG.warn("Failed to delete as user " + user, e); } catch (InterruptedException e) { + error = true; LOG.warn("Failed to delete as user " + user, e); } } + delService.fileDeletionFinished(this, !error); + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer("\nFileDeletion : "); + sb.append(" user : ").append(this.user); + sb.append(" subDir : ").append( + subDir == null ? "null" : subDir.toString()); + sb.append(" baseDir : "); + if (baseDirs == null || baseDirs.length == 0) { + sb.append("null"); + } else { + for (Path baseDir : baseDirs) { + sb.append(baseDir.toString()).append(','); + } + } + return sb.toString(); } } -} + + public FileDeletion createFileDeletion(String user, Path subDir, + Path[] baseDirs, boolean cancelIfAnyDependentTaskFailed) { + return new FileDeletion(this, user, subDir, baseDirs, + cancelIfAnyDependentTaskFailed); + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 3324ddc..9635d8c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -82,6 +82,7 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletion; import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus; @@ -1094,6 +1095,7 @@ private void deleteLocalDir(FileContext lfs, DeletionService del, try { if (status.getPath().getName().matches(".*" + ContainerLocalizer.USERCACHE + "_DEL_.*")) { + LOG.info("usercache path : " + status.getPath().toString()); cleanUpFilesFromSubDir(lfs, del, status.getPath()); } else if (status.getPath().getName() .matches(".*" + NM_PRIVATE_DIR + "_DEL_.*") @@ -1114,14 +1116,28 @@ private void deleteLocalDir(FileContext lfs, DeletionService del, private void cleanUpFilesFromSubDir(FileContext lfs, DeletionService del, Path dirPath) throws IOException { RemoteIterator fileStatus = lfs.listStatus(dirPath); + List parentDeletionTasks = + new ArrayList(); if (fileStatus != null) { while (fileStatus.hasNext()) { FileStatus status = fileStatus.next(); String owner = status.getOwner(); - del.delete(owner, status.getPath(), new Path[] {}); + FileDeletion deletionTask = + del.createFileDeletion(owner, null, + new Path[] { status.getPath() }, true); + parentDeletionTasks.add(deletionTask); } } - del.delete(null, dirPath, new Path[] {}); + FileDeletion dependentDeletionTask = + del.createFileDeletion(null, dirPath, new Path[] {}, true); + List dependentDeletionTasks = + new ArrayList(); + dependentDeletionTasks.add(dependentDeletionTask); + del.populateFileDeletionTaskDependency(parentDeletionTasks, + dependentDeletionTasks); + for (FileDeletion parentTask : parentDeletionTasks) { + del.delete(parentTask); + } } }