diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java index 1ab5a37..596b6f6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java @@ -19,9 +19,14 @@ package org.apache.hadoop.yarn.server.nodemanager; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import static java.util.concurrent.TimeUnit.*; import org.apache.hadoop.fs.Path; @@ -35,6 +40,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; public class DeletionService extends AbstractService { @@ -42,7 +48,9 @@ private int debugDelay; private final ContainerExecutor exec; private ScheduledThreadPoolExecutor sched; - private final FileContext lfs = getLfs(); + private static final FileContext lfs = getLfs(); + private final HashMap> deletionTaskDependencyMap; + static final FileContext getLfs() { try { return FileContext.getLocalFSFileContext(); @@ -55,6 +63,8 @@ public DeletionService(ContainerExecutor exec) { super(DeletionService.class.getName()); this.exec = exec; this.debugDelay = 0; + this.deletionTaskDependencyMap = + new HashMap>(); } /** @@ -68,11 +78,85 @@ public DeletionService(ContainerExecutor exec) { public void delete(String user, Path subDir, Path... baseDirs) { // TODO if parent owned by NM, rename within parent inline if (debugDelay != -1) { - sched.schedule(new FileDeletion(user, subDir, baseDirs), debugDelay, - TimeUnit.SECONDS); + sched.schedule(new FileDeletion(this, user, subDir, baseDirs), + debugDelay, TimeUnit.SECONDS); } } - + + public void deleteHelper(FileDeletion fileDeletion) { + if (debugDelay != -1) { + sched.schedule(fileDeletion, debugDelay, TimeUnit.SECONDS); + } + } + + public void fileDeletionFinished(FileDeletion fileDeletion, + boolean success) { + synchronized (this.deletionTaskDependencyMap) { + List dependentTasks = + this.deletionTaskDependencyMap.remove(fileDeletion); + for (FileDeletion dependentTask : dependentTasks) { + int count = dependentTask.decrementAndGetDependentTaskCount(); + if (success && count == 0) { + // This task finished successfully and child dependent task is not + // waiting for any more tasks. + LOG.info("Starting dependentTask (parent task succeeded)" + + fileDeletion); + deleteHelper(dependentTask); + } else if (!success) { + // This task failed. If child task is configured to execute even if + // parent task failed then execute it if count drops to 0 else mark + // this child task as also failed. + if (dependentTask.getCancelIfAnyDependentTaskFailedFlag()) { + LOG.info("Cancelling dependentTask " + fileDeletion); + fileDeletionFinished(dependentTask, false); + } else if (count == 0) { + LOG.info("Starting dependentTask (even though parent task failed)" + + fileDeletion); + deleteHelper(dependentTask); + } + } + } + } + } + + /** + * This is to be used to define Deletion task dependency. + * Ex. we have tasks 1,2,3,4,5,6 to be executed. + * Dependency is child -> parent + * 4 -> 1,2 + * 3 -> 1,2 + * 5 -> 4 + * 6 -> 3,4 + * To specify this use + * parentTasks(1,2) , childDependentTasks(3,4) + * parentTasks(4) , childDependentTasks(5,6) + * parentTasks(3) , childDependentTasks(6) + * + * Once dependency is defined call delete(FileDeletion) for task 1 and 2. + * Deletion service will take care of executing dependent tasks as per the + * mapping specified. + * + * @param parentTasks + * @param childDependentTasks + */ + public void populateFileDeletionTaskDependency(List parentTasks, + List childDependentTasks) { + synchronized (deletionTaskDependencyMap) { + for (FileDeletion parentTask : parentTasks) { + if (!this.deletionTaskDependencyMap.containsKey(parentTask)) { + this.deletionTaskDependencyMap.put(parentTask, + new ArrayList()); + } + List dependentTaskList = + this.deletionTaskDependencyMap.get(parentTask); + for (FileDeletion childDependencyTask : childDependentTasks) { + childDependencyTask.incrementAndGetDependentTaskCount(); + dependentTaskList.add(childDependencyTask); + } + } + } + } + @Override protected void serviceInit(Configuration conf) throws Exception { ThreadFactory tf = new ThreadFactoryBuilder() @@ -118,46 +202,121 @@ public boolean isTerminated() { return getServiceState() == STATE.STOPPED && sched.isTerminated(); } - private class FileDeletion implements Runnable { - final String user; - final Path subDir; - final Path[] baseDirs; - FileDeletion(String user, Path subDir, Path[] baseDirs) { + public static class FileDeletion implements Runnable { + private final String user; + private final Path subDir; + private final Path[] baseDirs; + private final boolean cancelIfAnyDependentTaskFailed; + private final AtomicInteger numberOfDependentTask; + private final DeletionService delService; + + private FileDeletion(DeletionService delService, String user, Path subDir, + Path[] baseDirs) { + this.delService = delService; this.user = user; this.subDir = subDir; this.baseDirs = baseDirs; + this.cancelIfAnyDependentTaskFailed = false; + this.numberOfDependentTask = new AtomicInteger(0); } + + private FileDeletion(DeletionService delService, String user, Path subDir, + Path[] baseDirs, boolean cancelIfAnyDependentTaskFailed) { + this.delService = delService; + this.user = user; + this.subDir = subDir; + this.baseDirs = baseDirs; + this.cancelIfAnyDependentTaskFailed = cancelIfAnyDependentTaskFailed; + this.numberOfDependentTask = new AtomicInteger(0); + } + + public int incrementAndGetDependentTaskCount() { + return numberOfDependentTask.incrementAndGet(); + } + + public int decrementAndGetDependentTaskCount() { + return numberOfDependentTask.decrementAndGet(); + } + + public boolean getCancelIfAnyDependentTaskFailedFlag() { + return this.cancelIfAnyDependentTaskFailed; + } + + @VisibleForTesting + public String getUser() { + return this.user; + } + + @VisibleForTesting + public Path getSubDir() { + return this.subDir; + } + + @VisibleForTesting + public Path[] getBaseDirs() { + return this.baseDirs; + } + @Override public void run() { + boolean error = false; if (null == user) { if (baseDirs == null || baseDirs.length == 0) { LOG.debug("NM deleting absolute path : " + subDir); try { lfs.delete(subDir, true); } catch (IOException e) { + error = true; LOG.warn("Failed to delete " + subDir); } - return; - } - for (Path baseDir : baseDirs) { - Path del = subDir == null? baseDir : new Path(baseDir, subDir); - LOG.debug("NM deleting path : " + del); - try { - lfs.delete(del, true); - } catch (IOException e) { - LOG.warn("Failed to delete " + subDir); + } else { + for (Path baseDir : baseDirs) { + Path del = subDir == null? baseDir : new Path(baseDir, subDir); + LOG.debug("NM deleting path : " + del); + try { + lfs.delete(del, true); + } catch (IOException e) { + error = true; + LOG.warn("Failed to delete " + subDir); + } } } } else { try { LOG.debug("Deleting path: [" + subDir + "] as user: [" + user + "]"); - exec.deleteAsUser(user, subDir, baseDirs); + delService.exec.deleteAsUser(user, subDir, baseDirs); } catch (IOException e) { + error = true; LOG.warn("Failed to delete as user " + user, e); } catch (InterruptedException e) { + error = true; LOG.warn("Failed to delete as user " + user, e); } } + delService.fileDeletionFinished(this, !error); } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer("\nFileDeletion : "); + sb.append(" user : ").append(this.user); + sb.append(" subDir : ").append( + subDir == null ? "null" : subDir.toString()); + sb.append(" baseDir : "); + if (baseDirs == null || baseDirs.length == 0) { + sb.append("null"); + } else { + for (Path baseDir : baseDirs) { + sb.append(baseDir.toString()).append(','); + } + } + return sb.toString(); + } + } + + public FileDeletion createFileDeletion(String user, Path subDir, + Path[] baseDirs, boolean cancelIfAnyDependentTaskFailed) { + return new FileDeletion(this, user, subDir, baseDirs, + cancelIfAnyDependentTaskFailed); } -} +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 3324ddc..4574139 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -82,6 +82,7 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletion; import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus; @@ -1094,6 +1095,7 @@ private void deleteLocalDir(FileContext lfs, DeletionService del, try { if (status.getPath().getName().matches(".*" + ContainerLocalizer.USERCACHE + "_DEL_.*")) { + LOG.info("usercache path : " + status.getPath().toString()); cleanUpFilesFromSubDir(lfs, del, status.getPath()); } else if (status.getPath().getName() .matches(".*" + NM_PRIVATE_DIR + "_DEL_.*") @@ -1114,14 +1116,30 @@ private void deleteLocalDir(FileContext lfs, DeletionService del, private void cleanUpFilesFromSubDir(FileContext lfs, DeletionService del, Path dirPath) throws IOException { RemoteIterator fileStatus = lfs.listStatus(dirPath); + List parentDeletionTasks = + new ArrayList(); if (fileStatus != null) { while (fileStatus.hasNext()) { FileStatus status = fileStatus.next(); String owner = status.getOwner(); - del.delete(owner, status.getPath(), new Path[] {}); + FileDeletion deletionTask = + del.createFileDeletion(owner, null, + new Path[] { status.getPath() }, true); + LOG.info(deletionTask); + parentDeletionTasks.add(deletionTask); } } - del.delete(null, dirPath, new Path[] {}); + FileDeletion dependentDeletionTask = + del.createFileDeletion(null, dirPath, new Path[] {}, true); + LOG.info(dependentDeletionTask); + List dependentDeletionTasks = + new ArrayList(); + dependentDeletionTasks.add(dependentDeletionTask); + del.populateFileDeletionTaskDependency(parentDeletionTasks, + dependentDeletionTasks); + for (FileDeletion parentTask : parentDeletionTasks) { + del.deleteHelper(parentTask); + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java index 3b88b03..091036b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletion; import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; @@ -69,8 +70,8 @@ public class TestNodeManagerReboot { - static final File basedir = - new File("target", TestNodeManagerReboot.class.getName()); + static final File basedir = new File("target", + TestNodeManagerReboot.class.getName()); static final File logsDir = new File(basedir, "logs"); static final File nmLocalDir = new File(basedir, "nm0"); static final File localResourceDir = new File(basedir, "resource"); @@ -100,7 +101,8 @@ public void testClearLocalDirWhenNodeReboot() throws IOException, nm = new MyNodeManager(); nm.start(); - final ContainerManagementProtocol containerManager = nm.getContainerManager(); + final ContainerManagementProtocol containerManager = + nm.getContainerManager(); // create files under fileCache createFiles(nmLocalDir.getAbsolutePath(), ContainerLocalizer.FILECACHE, 100); @@ -112,11 +114,10 @@ public void testClearLocalDirWhenNodeReboot() throws IOException, ContainerId cId = createContainerId(); URL localResourceUri = - ConverterUtils.getYarnUrlFromPath(localFS - .makeQualified(new Path(localResourceDir.getAbsolutePath()))); + ConverterUtils.getYarnUrlFromPath(localFS.makeQualified(new Path( + localResourceDir.getAbsolutePath()))); - LocalResource localResource = - Records.newRecord(LocalResource.class); + LocalResource localResource = Records.newRecord(LocalResource.class); localResource.setResource(localResourceUri); localResource.setSize(-1); localResource.setVisibility(LocalResourceVisibility.APPLICATION); @@ -129,7 +130,7 @@ public void testClearLocalDirWhenNodeReboot() throws IOException, containerLaunchContext.setLocalResources(localResources); List commands = new ArrayList(); containerLaunchContext.setCommands(commands); - + final StartContainerRequest startRequest = Records.newRecord(StartContainerRequest.class); startRequest.setContainerLaunchContext(containerLaunchContext); @@ -137,8 +138,9 @@ public void testClearLocalDirWhenNodeReboot() throws IOException, startRequest.setContainerToken(TestContainerManager.createContainerToken( cId, 0, nodeId, destinationFile, nm.getNMContext() .getContainerTokenSecretManager())); - final UserGroupInformation currentUser = UserGroupInformation - .createRemoteUser(cId.getApplicationAttemptId().toString()); + final UserGroupInformation currentUser = + UserGroupInformation.createRemoteUser(cId.getApplicationAttemptId() + .toString()); NMTokenIdentifier nmIdentifier = new NMTokenIdentifier(cId.getApplicationAttemptId(), nodeId, user, 123); currentUser.addTokenIdentifier(nmIdentifier); @@ -170,27 +172,31 @@ public Void run() throws YarnException, IOException { Assert.assertEquals(ContainerState.DONE, container.getContainerState()); - Assert.assertTrue( - "The container should create a subDir named currentUser: " + user + - "under localDir/usercache", + Assert + .assertTrue( + "The container should create a subDir named currentUser: " + user + + "under localDir/usercache", numOfLocalDirs(nmLocalDir.getAbsolutePath(), - ContainerLocalizer.USERCACHE) > 0); + ContainerLocalizer.USERCACHE) > 0); - Assert.assertTrue("There should be files or Dirs under nm_private when " + - "container is launched", numOfLocalDirs(nmLocalDir.getAbsolutePath(), + Assert.assertTrue( + "There should be files or Dirs under nm_private when " + + "container is launched", + numOfLocalDirs(nmLocalDir.getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR) > 0); // restart the NodeManager nm.stop(); nm = new MyNodeManager(); - nm.start(); + nm.start(); numTries = 0; - while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer - .USERCACHE) > 0 || numOfLocalDirs(nmLocalDir.getAbsolutePath(), - ContainerLocalizer.FILECACHE) > 0 || numOfLocalDirs(nmLocalDir - .getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR) - > 0) && numTries < MAX_TRIES) { + while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(), + ContainerLocalizer.USERCACHE) > 0 + || numOfLocalDirs(nmLocalDir.getAbsolutePath(), + ContainerLocalizer.FILECACHE) > 0 || numOfLocalDirs( + nmLocalDir.getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR) > 0) + && numTries < MAX_TRIES) { try { Thread.sleep(500); } catch (InterruptedException ex) { @@ -199,21 +205,27 @@ public Void run() throws YarnException, IOException { numTries++; } - Assert.assertTrue("After NM reboots, all local files should be deleted", - numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer - .USERCACHE) == 0 && numOfLocalDirs(nmLocalDir.getAbsolutePath(), - ContainerLocalizer.FILECACHE) == 0 && numOfLocalDirs(nmLocalDir - .getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR) - == 0); + Assert + .assertTrue( + "After NM reboots, all local files should be deleted", + numOfLocalDirs(nmLocalDir.getAbsolutePath(), + ContainerLocalizer.USERCACHE) == 0 + && numOfLocalDirs(nmLocalDir.getAbsolutePath(), + ContainerLocalizer.FILECACHE) == 0 + && numOfLocalDirs(nmLocalDir.getAbsolutePath(), + ResourceLocalizationService.NM_PRIVATE_DIR) == 0); verify(delService, times(1)).delete( - (String) isNull(), - argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR - + "_DEL_"))); + (String) isNull(), + argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR + + "_DEL_"))); verify(delService, times(1)).delete((String) isNull(), - argThat(new PathInclude(ContainerLocalizer.FILECACHE + "_DEL_"))); - verify(delService, times(1)).delete((String) isNull(), - argThat(new PathInclude(ContainerLocalizer.USERCACHE + "_DEL_"))); - + argThat(new PathInclude(ContainerLocalizer.FILECACHE + "_DEL_"))); + verify(delService, times(1)).deleteHelper( + argThat(new FileDeletionInclude(user, null, + new String[] { destinationFile }))); + verify(delService, times(1)).deleteHelper( + argThat(new FileDeletionInclude(null, ContainerLocalizer.USERCACHE + + "_DEL_", new String[] {}))); } private int numOfLocalDirs(String localDir, String localSubDir) { @@ -238,7 +250,8 @@ private void createFiles(String dir, String subDir, int numOfFiles) { private ContainerId createContainerId() { ApplicationId appId = ApplicationId.newInstance(0, 0); - ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); ContainerId containerId = ContainerId.newInstance(appAttemptId, 0); return containerId; } @@ -253,8 +266,8 @@ public MyNodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { - MockNodeStatusUpdater myNodeStatusUpdater = new MockNodeStatusUpdater( - context, dispatcher, healthChecker, metrics); + MockNodeStatusUpdater myNodeStatusUpdater = + new MockNodeStatusUpdater(context, dispatcher, healthChecker, metrics); return myNodeStatusUpdater; } @@ -288,4 +301,81 @@ public boolean matches(Object o) { return ((Path) o).getName().indexOf(part) != -1; } } + + class FileDeletionInclude extends ArgumentMatcher { + final String user; + final String subDirIncludes; + final String[] baseDirIncludes; + + public FileDeletionInclude(String user, String subDirIncludes, + String [] baseDirIncludes) { + this.user = user; + this.subDirIncludes = subDirIncludes; + this.baseDirIncludes = baseDirIncludes; + } + + @Override + public boolean matches(Object o) { + FileDeletion fd = (FileDeletion)o; + if (fd.getUser() == null) { + if (user != null) { + return false; + } + } else { + if (user == null) { + return false; + } + if (!fd.getUser().equals(user)) { + return false; + } + } + if (!comparePaths(fd.getSubDir(), subDirIncludes)) { + assert false; + return false; + } + if (baseDirIncludes == null) { + if (fd.getBaseDirs() != null) { + assert false; + return false; + } + } else { + if (fd.getBaseDirs() == null) { + assert false; + return false; + } + if (baseDirIncludes.length != fd.getBaseDirs().length) { + assert false; + return false; + } + for (int i =0 ; i < baseDirIncludes.length; i++) { + if (!comparePaths(fd.getBaseDirs()[i], baseDirIncludes[i])) { + assert false; + return false; + } + } + } + return true; + } + + public boolean comparePaths(Path p1, String p2) { + if (p1 == null ) { + if (p2 != null) { + assert false; + return false; + } + } else { + if (p2 == null) { + assert false; + return false; + } + if (!p1.toUri().getPath() + .contains(p2.toString())) { + LOG.info("found : " + p1.toString() + " Expected : " + p2); + assert false; + return false; + } + } + return true; + } + } }