diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java index 6d4bccd..4b68a64 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java @@ -87,6 +87,28 @@ public static ContainerLaunchContext newInstance( return container; } + @Public + @Unstable + public static ContainerLaunchContext newInstance( + Map localResources, + Map environment, List commands, + Map serviceData, ByteBuffer tokens, + Map acls, + ContainerRetryContext containerRetryContext, + int deleteDelaySec) { + ContainerLaunchContext container = + Records.newRecord(ContainerLaunchContext.class); + container.setLocalResources(localResources); + container.setEnvironment(environment); + container.setCommands(commands); + container.setServiceData(serviceData); + container.setTokens(tokens); + container.setApplicationACLs(acls); + container.setContainerRetryContext(containerRetryContext); + container.setDeleteDelaySec(deleteDelaySec); + return container; + } + /** * Get all the tokens needed by this container. It may include file-system * tokens, ApplicationMaster related tokens if this container is an @@ -225,7 +247,23 @@ public static ContainerLaunchContext newInstance( * relaunch container. */ @Public - @Unstable + @Stable public abstract void setContainerRetryContext( ContainerRetryContext containerRetryContext); + + /** + * Get the delay before deleting resources to ease debugging of NM issues. + * @return delay before deleting resources in seconds. + */ + @Public + @Unstable + public abstract int getDeleteDelaySec(); + + /** + * Set the delay before deleting resources to ease debugging of NM issues. + * @param deleteDelaySec delay before deleting resources in seconds + */ + @Public + @Stable + public abstract void setDeleteDelaySec(int deleteDelaySec); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java index 1efe541..4be3755 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java @@ -59,6 +59,7 @@ private List commands = null; private Map applicationACLS = null; private ContainerRetryContext containerRetryContext = null; + private int deleteDelaySec = 0; public ContainerLaunchContextPBImpl() { builder = ContainerLaunchContextProto.newBuilder(); @@ -507,4 +508,22 @@ private ContainerRetryContextProto convertToProtoFormat( ContainerRetryContext t) { return ((ContainerRetryContextPBImpl)t).getProto(); } + /** + * Get the delay before deleting resources to ease debugging of NM issues. + * @return delay before deleting resources in seconds. + */ + public int getDeleteDelaySec() + { + return this.deleteDelaySec; + } + + /** + * Set the delay before deleting resources to ease debugging of NM issues. + * @param deleteDelaySec delay before deleting resources in seconds + */ + public void setDeleteDelaySec(int deleteDelaySec) + { + this.deleteDelaySec = deleteDelaySec; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java index db834b2..5703934 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java @@ -44,6 +44,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; @@ -53,9 +54,17 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; +/** + * This service is responsible for asynchronously deleting files with + * an optional delay + */ public class DeletionService extends AbstractService { + public static final int KEEP_FOREVER = -1; static final Log LOG = LogFactory.getLog(DeletionService.class); - private int debugDelay; + /** + * Default debug delay on all containers specified by the configuration + */ + private int debugDelayDefault; private final ContainerExecutor exec; private ScheduledThreadPoolExecutor sched; private static final FileContext lfs = getLfs(); @@ -78,37 +87,71 @@ public DeletionService(ContainerExecutor exec, NMStateStoreService stateStore) { super(DeletionService.class.getName()); this.exec = exec; - this.debugDelay = 0; + this.debugDelayDefault = 0; this.stateStore = stateStore; } - + /** * Delete the path(s) as this user. * @param user The user to delete as, or the JVM user if null * @param subDir the sub directory name * @param baseDirs the base directories which contains the subDir's */ - public void delete(String user, Path subDir, Path... baseDirs) { + public void delete(String user, Path subDir, + Path... baseDirs) { + deleteWithDelay(user, 0, subDir, baseDirs); + } + + /** + * Delete the path(s) as this user with a custom retention policy. + * @param user The user to delete as, or the JVM user if null + * @param debugDelaySec Seconds to wait before deleting. Use 0 as default + * @param subDir the sub directory name + * @param baseDirs the base directories which contains the subDir's + */ + public void deleteWithDelay(String user, int debugDelaySec, Path subDir, + Path... baseDirs) { // TODO if parent owned by NM, rename within parent inline - if (debugDelay != -1) { - List baseDirList = null; - if (baseDirs != null && baseDirs.length != 0) { - baseDirList = Arrays.asList(baseDirs); - } - FileDeletionTask task = - new FileDeletionTask(this, user, subDir, baseDirList); - recordDeletionTaskInStateStore(task); - sched.schedule(task, debugDelay, TimeUnit.SECONDS); + List baseDirList = null; + if (baseDirs != null && baseDirs.length != 0) { + baseDirList = Arrays.asList(baseDirs); } + FileDeletionTask task = + new FileDeletionTask(this, user, subDir, baseDirList); + scheduleFileDeletionTask(task, debugDelaySec); } - + + /** + * Schedules a task that deletes a file. Do not use this function unless + * you know what you are doing. Use delete() instead + * @param fileDeletionTask task to schedule + */ public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask) { - if (debugDelay != -1) { - recordDeletionTaskInStateStore(fileDeletionTask); - sched.schedule(fileDeletionTask, debugDelay, TimeUnit.SECONDS); + scheduleFileDeletionTask(fileDeletionTask, 0); + } + + /** + * Schedules a task that deletes a file. Do not use this function unless + * you know what you are doing. Use deleteWithDelay() instead + * @param fileDeletionTask the task to schedule + * @param debugDelaySec optional delay in seconds for debugging (default to 0) + */ + public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask, + int debugDelaySec) { + if (debugDelaySec < -1) + { + throw new IllegalArgumentException("delay time out of bounds."); + } + + if (debugDelayDefault != KEEP_FOREVER && + debugDelaySec != KEEP_FOREVER) { + int effectiveDelay = Math.max(debugDelayDefault, debugDelaySec); + + recordDeletionTaskInStateStore(fileDeletionTask, effectiveDelay); + sched.schedule(fileDeletionTask, effectiveDelay, TimeUnit.SECONDS); } } - + @Override protected void serviceInit(Configuration conf) throws Exception { ThreadFactory tf = new ThreadFactoryBuilder() @@ -118,7 +161,7 @@ protected void serviceInit(Configuration conf) throws Exception { sched = new HadoopScheduledThreadPoolExecutor( conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf); - debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0); + debugDelayDefault = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0); } else { sched = new HadoopScheduledThreadPoolExecutor( YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf); @@ -430,7 +473,8 @@ private int generateTaskId() { return taskId; } - private void recordDeletionTaskInStateStore(FileDeletionTask task) { + private void recordDeletionTaskInStateStore(FileDeletionTask task, + long debugDelay) { if (!stateStore.canRecover()) { // optimize the case where we aren't really recording return; @@ -445,7 +489,7 @@ private void recordDeletionTaskInStateStore(FileDeletionTask task) { // store successors first to ensure task IDs have been generated for them for (FileDeletionTask successor : successors) { - recordDeletionTaskInStateStore(successor); + recordDeletionTaskInStateStore(successor, debugDelay); } DeletionServiceDeleteTaskProto.Builder builder = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java index aee0862..46ec648 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.application; +import java.util.Date; import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -40,4 +41,6 @@ String getFlowVersion(); long getFlowRunId(); + + Date getDelayedDeletionTime(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index 531693e..83530f3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.application; import java.io.IOException; +import java.util.Date; import java.util.EnumSet; import java.util.HashMap; import java.util.Map; @@ -27,6 +28,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import com.google.protobuf.ByteString; +import org.apache.commons.lang.time.DateUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.DataOutputBuffer; @@ -36,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl; @@ -66,6 +69,8 @@ import org.apache.hadoop.yarn.state.StateMachineFactory; import com.google.common.annotations.VisibleForTesting; +import static org.apache.hadoop.yarn.server.nodemanager.DeletionService.KEEP_FOREVER; + /** * The state machine for the representation of an Application * within the NodeManager. @@ -92,6 +97,13 @@ new HashMap(); /** + * Containers may provide a retention policy on their local directories. + * We track the latest container deletion time here, so that we do not clean + * up the application directories before the container dirs are deleted + */ + private Date delayedDeletionTime = null; + + /** * The timestamp when the log aggregation has started for this application. * Used to determine the age of application log files during log aggregation. * When logAggregationRentention policy is enabled, log files older than @@ -467,6 +479,7 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { public void transition(ApplicationImpl app, ApplicationEvent event) { ApplicationContainerFinishedEvent containerEvent = (ApplicationContainerFinishedEvent) event; + app.estimateRetention(containerEvent.getContainerID()); if (null == app.containers.remove(containerEvent.getContainerID())) { LOG.warn("Removing unknown " + containerEvent.getContainerID() + " from application " + app.toString()); @@ -528,6 +541,7 @@ public ApplicationState transition(ApplicationImpl app, (ApplicationContainerFinishedEvent) event; LOG.info("Removing " + containerFinishEvent.getContainerID() + " from application " + app.toString()); + app.estimateRetention(containerFinishEvent.getContainerID()); app.containers.remove(containerFinishEvent.getContainerID()); if (app.containers.isEmpty()) { @@ -639,4 +653,47 @@ public String getFlowVersion() { public long getFlowRunId() { return flowContext == null ? 0L : flowContext.getFlowRunId(); } + + /** + * Estimate retention policy based on the container stop time + * @param cId container ID that might extend the current retention period + */ + protected void estimateRetention(ContainerId cId) + { + Container c = containers.get(cId); + if (c != null) + { + ContainerLaunchContext context = c.getLaunchContext(); + if (context != null) + { + int deleteDelaySec = context.getDeleteDelaySec(); + if (deleteDelaySec == KEEP_FOREVER) { + // Pick the maximum period to mark forever. + // This makes the logic below more simple + delayedDeletionTime = new Date(Long.MAX_VALUE); + } + else if (deleteDelaySec > 0) { + // Wait for the container with the longest delay + Date newDeletionTime = DateUtils.addSeconds(new Date(), + context.getDeleteDelaySec()); + delayedDeletionTime = delayedDeletionTime == null ? + newDeletionTime : + delayedDeletionTime.before(newDeletionTime) ? + newDeletionTime : + delayedDeletionTime; + } + } + } + } + + /** + * The time until we need to keep contianer directories + * @return the longest delayed deletion time of all the containers in this app + */ + @Override + public Date getDelayedDeletionTime() + { + return delayedDeletionTime; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 2cf6ee9..891f09a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -30,6 +30,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Date; import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; @@ -50,6 +51,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.lang.time.DateUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -79,6 +81,7 @@ import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.URL; @@ -553,6 +556,12 @@ private void handleCleanupContainerResources( String appIDStr = c.getContainerId().getApplicationAttemptId().getApplicationId() .toString(); + ContainerLaunchContext launchContext = c.getLaunchContext(); + int debugDelaySec = 0; + if (launchContext != null) + { + debugDelaySec = launchContext.getDeleteDelaySec(); + } // Try deleting from good local dirs and full local dirs because a dir might // have gone bad while the app was running(disk full). In addition @@ -566,14 +575,14 @@ private void handleCleanupContainerResources( Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE); Path appDir = new Path(allAppsdir, appIDStr); Path containerDir = new Path(appDir, containerIDStr); - submitDirForDeletion(userName, containerDir); + submitDirForDeletion(userName, containerDir, debugDelaySec); // Delete the nmPrivate container-dir Path sysDir = new Path(localDir, NM_PRIVATE_DIR); Path appSysDir = new Path(sysDir, appIDStr); Path containerSysDir = new Path(appSysDir, containerIDStr); - submitDirForDeletion(null, containerSysDir); + submitDirForDeletion(null, containerSysDir, debugDelaySec); } dispatcher.getEventHandler().handle( @@ -581,10 +590,11 @@ private void handleCleanupContainerResources( ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP)); } - private void submitDirForDeletion(String userName, Path dir) { + private void submitDirForDeletion(String userName, Path dir, + int debugDelaySec) { try { lfs.getFileStatus(dir); - delService.delete(userName, dir, new Path[] {}); + delService.deleteWithDelay(userName, debugDelaySec, dir, new Path[] {}); } catch (UnsupportedFileSystemException ue) { LOG.warn("Local dir " + dir + " is an unsupported filesystem", ue); } catch (IOException ie) { @@ -620,6 +630,25 @@ private void handleDestroyApplicationResources(Application application) { // Delete the application directories userName = application.getUser(); appIDStr = application.toString(); + // Each container may specify a retention policy. We should wait for the + // last one before deleting the application directories. + int debugDelaySec = 0; + Date applicationCleanupTime = application.getDelayedDeletionTime(); + if (applicationCleanupTime != null) { + Date now = new Date(); + if (applicationCleanupTime.equals(new Date(Long.MAX_VALUE))) { + // The maximum value is reserved for keep forever + debugDelaySec = DeletionService.KEEP_FOREVER; + } + else if (now.before(applicationCleanupTime)) { + // There is a valid retention policy set on at least one container. + // Wait for the specified time rounded up to the nearest second. + debugDelaySec = + (int)(((applicationCleanupTime.getTime() - now.getTime + ()) + + 999) / 1000); + } + } for (String localDir : dirsHandler.getLocalDirsForCleanup()) { @@ -628,12 +657,12 @@ private void handleDestroyApplicationResources(Application application) { Path userdir = new Path(usersdir, userName); Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE); Path appDir = new Path(allAppsdir, appIDStr); - submitDirForDeletion(userName, appDir); + submitDirForDeletion(userName, appDir, debugDelaySec); // Delete the nmPrivate app-dir Path sysDir = new Path(localDir, NM_PRIVATE_DIR); Path appSysDir = new Path(sysDir, appIDStr); - submitDirForDeletion(null, appSysDir); + submitDirForDeletion(null, appSysDir, debugDelaySec); } // TODO: decrement reference counts of all resources associated with this diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java index f72a606..17fd2cc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java @@ -156,6 +156,39 @@ public void testLocalFilesCleanup() throws InterruptedException, } @Override + public void testDelayedDeletionContainerOnly() throws IOException, YarnException, InterruptedException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testLocalFilesCleanup"); + super.testDelayedDeletionContainerOnly(); + } + + @Override + public void testDelayedDeletion() throws IOException, YarnException, InterruptedException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testLocalFilesCleanup"); + super.testDelayedDeletion(); + } + + @Override + public void testDelayedKeep() throws IOException, YarnException, InterruptedException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testLocalFilesCleanup"); + super.testDelayedKeep(); + } + + @Override public void testContainerLaunchFromPreviousRM() throws InterruptedException, IOException, YarnException { // Don't run the test if the binary is not available. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java index 2e0bbe0..8371dfa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java @@ -18,11 +18,6 @@ package org.apache.hadoop.yarn.server.nodemanager; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -40,6 +35,12 @@ import org.junit.Test; import org.mockito.Mockito; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class TestDeletionService { private static final FileContext lfs = getLfs(); @@ -179,8 +180,13 @@ public void testRelativeDelete() throws Exception { } } + /** + * Test, if we can disable deleting files + * remain + * @throws Exception + */ @Test - public void testNoDelete() throws Exception { + public void testCustomDisableDelete() throws Exception { Random r = new Random(); long seed = r.nextLong(); r.setSeed(seed); @@ -189,7 +195,8 @@ public void testNoDelete() throws Exception { createDirs(new Path("."), dirs); FakeDefaultContainerExecutor exec = new FakeDefaultContainerExecutor(); Configuration conf = new Configuration(); - conf.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, -1); + conf.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, + DeletionService.KEEP_FOREVER); exec.setConf(conf); DeletionService del = new DeletionService(exec); try { @@ -199,12 +206,11 @@ public void testNoDelete() throws Exception { del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p, null); } - int msecToWait = 20 * 1000; + + // Limit the test execution time. No delay given, a deleted file + // would disappear by this time + Thread.sleep(1000); for (Path p : dirs) { - while (msecToWait > 0 && lfs.util().exists(p)) { - Thread.sleep(100); - msecToWait -= 100; - } assertTrue(lfs.util().exists(p)); } } finally { @@ -212,6 +218,88 @@ public void testNoDelete() throws Exception { } } + /** + * Test, if we can apply a retention policy on the fly + * @throws Exception + */ + @Test + public void testCustomRetentionPolicy() throws Exception { + int debugDelay = 1; + Random r = new Random(); + long seed = r.nextLong(); + r.setSeed(seed); + System.out.println("SEED: " + seed); + List dirs = buildDirs(r, base, 20); + createDirs(new Path("."), dirs); + FakeDefaultContainerExecutor exec = new FakeDefaultContainerExecutor(); + Configuration conf = new Configuration(); + exec.setConf(conf); + DeletionService del = new DeletionService(exec); + try { + del.init(conf); + del.start(); + for (Path p : dirs) { + // Specify that the files should be kept for a second + del.deleteWithDelay( + (Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", + debugDelay, p, null); + } + + Thread.sleep(100); + // Assert that the files are still around + for (Path p : dirs) { + assertTrue(lfs.util().exists(p)); + } + // Wait for more than a second to get the files deleted + Thread.sleep(debugDelay * 1000 + 400); + for (Path p : dirs) { + assertFalse(lfs.util().exists(p)); + } + } finally { + del.stop(); + } + } + + /** + * Test, if we can catch API errors + * @throws Exception + */ + @Test + public void testAPIError() throws Exception { + // This is an invalid value + int debugDelay = -1000; + + Random r = new Random(); + long seed = r.nextLong(); + r.setSeed(seed); + System.out.println("SEED: " + seed); + List dirs = buildDirs(r, base, 20); + createDirs(new Path("."), dirs); + FakeDefaultContainerExecutor exec = new FakeDefaultContainerExecutor(); + Configuration conf = new Configuration(); + exec.setConf(conf); + DeletionService del = new DeletionService(exec); + try { + del.init(conf); + del.start(); + for (Path p : dirs) { + del.deleteWithDelay( + (Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", + debugDelay, p, null); + } + + fail("-1000 is an invalid delay value"); + } + catch (IllegalArgumentException ex) { + assertEquals("delay time out of bounds.", ex.getMessage()); + } + catch (Exception ex) { + fail("Wrong type of exception thrown on invalid input"); + } finally { + del.stop(); + } + } + @Test public void testStopWithDelayedTasks() throws Exception { DeletionService del = new DeletionService(Mockito.mock(ContainerExecutor.class)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index d359c3d..a64cdc7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -264,7 +264,8 @@ protected NMTokenIdentifier selectNMTokenIdentifier( protected DeletionService createDeletionService() { return new DeletionService(exec) { @Override - public void delete(String user, Path subDir, Path... baseDirs) { + public void deleteWithDelay(String user, int debugDelaySec, Path subDir, + Path... baseDirs) { // Don't do any deletions. LOG.info("Psuedo delete: user - " + user + ", subDir - " + subDir + ", baseDirs - " + Arrays.asList(baseDirs)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 0c083f2..0dadb7c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; +import static org.apache.hadoop.yarn.server.nodemanager.DeletionService.KEEP_FOREVER; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.never; import static org.mockito.Mockito.timeout; @@ -31,6 +32,7 @@ import java.io.PrintWriter; import java.net.InetAddress; import java.nio.ByteBuffer; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -47,6 +49,7 @@ import org.apache.hadoop.service.Service; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.ThreadUtil; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; @@ -92,6 +95,7 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; @@ -215,7 +219,7 @@ public void testContainerSetup() throws Exception { rsrc_alpha.setType(LocalResourceType.FILE); rsrc_alpha.setTimestamp(file.lastModified()); String destinationFile = "dest_file"; - Map localResources = + Map localResources = new HashMap(); localResources.put(destinationFile, rsrc_alpha); containerLaunchContext.setLocalResources(localResources); @@ -1143,6 +1147,203 @@ public void testLocalFilesCleanup() throws InterruptedException, targetFile.exists()); } + /** + * Verify whether the container dir exists or not + * @param appIDStr Application ID + * @param containerIDStr Container ID + * @param shouldExist Test existence or absence + * @param container Test container or application dir + */ + private void verifyContainerDir(String appIDStr, String containerIDStr, + boolean shouldExist, boolean container) { + File userCacheDir = new File(localDir, ContainerLocalizer.USERCACHE); + File userDir = new File(userCacheDir, user); + File appCache = new File(userDir, ContainerLocalizer.APPCACHE); + File appDir = new File(appCache, appIDStr); + File containerDir = new File(appDir, containerIDStr); + + File sysDir = + new File(localDir, + ResourceLocalizationService.NM_PRIVATE_DIR); + File appSysDir = new File(sysDir, appIDStr); + File containerSysDir = new File(appSysDir, containerIDStr); + + if (container) { + if (shouldExist) { + // ContainerDir should still exist + Assert.assertTrue("ContainerDir " + containerDir.getAbsolutePath() + + " doesn't exist!!", containerDir.exists()); + Assert.assertTrue("ContainerSysDir " + containerSysDir.getAbsolutePath() + + " doesn't exist!!", containerSysDir.exists()); + } else { + // ContainerDir should not exist + Assert.assertFalse("ContainerDir " + containerDir.getAbsolutePath() + + " exists!!", containerDir.exists()); + Assert.assertFalse("ContainerSysDir " + containerSysDir.getAbsolutePath() + + " exists!!", containerSysDir.exists()); + } + } else { + if (shouldExist) { + // appDir should still exist + Assert.assertTrue("appDir " + appDir.getAbsolutePath() + + " doesn't exist!!", appDir.exists()); + Assert.assertTrue("appSysDir " + appSysDir.getAbsolutePath() + + " doesn't exist!!", appSysDir.exists()); + } else { + // appDir should not exist + Assert.assertFalse("appDir " + appDir.getAbsolutePath() + + " exists!!", appDir.exists()); + Assert.assertFalse("appSysDir " + appSysDir.getAbsolutePath() + + " exists!!", appSysDir.exists()); + } + } + } + + /** + * Run a container with delay, so that delayed deletion of resources can be + * tested + * @param debugDelaySec Delay container deletion with this number of seconds + * @return The application that ran + * @throws IOException + * @throws YarnException + * @throws InterruptedException + */ + private Map.Entry runContainerWithDebugDelay( + int debugDelaySec) + throws IOException, YarnException, InterruptedException { + delSrvc = new DeletionService(exec); + delSrvc.init(conf); + containerManager = createContainerManager(delSrvc); + containerManager.init(conf); + containerManager.start(); + + List list = new ArrayList<>(); + + // Create a delayed cleanup container that sleeps for a second + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + containerLaunchContext.setDeleteDelaySec(debugDelaySec); + containerLaunchContext.setCommands(Arrays.asList("sleep 1")); + + ContainerId cId = createContainerId(0); + ApplicationId appId = cId.getApplicationAttemptId().getApplicationId(); + + StartContainerRequest scRequest = + StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(cId, DUMMY_RM_IDENTIFIER, context + .getNodeId(), + user, context.getContainerTokenSecretManager())); + list.add(scRequest); + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + BaseContainerManagerTest.waitForContainerState(containerManager, cId, + ContainerState.COMPLETE); + + // Simulate RM closing the app after the containers finished + Application app = + containerManager.getContext().getApplications().get(appId); + List appIds = Arrays.asList(appId); + containerManager.handle(new CMgrCompletedAppsEvent(appIds, + CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER)); + + return new AbstractMap.SimpleEntry(app, cId); + } + + /** + * Launch a container in debug mode keeping the container files for a few + * seconds. + * @throws IOException + * @throws YarnException + * @throws InterruptedException + */ + @Test + public void testDelayedDeletionContainerOnly() + throws IOException, YarnException, InterruptedException { + final int debugDelaySec = 3; + + // Run app with container and wait until it is completed + Map.Entry pair = runContainerWithDebugDelay + (debugDelaySec); + String appId = pair.getKey().getAppId().toString(); + String cId = pair.getValue().toString(); + + // Verify that containers are kept as specified in the policy + Thread.sleep(debugDelaySec * 1000 / 2); + verifyContainerDir(appId, cId, true, true); + + // Verify that containers are get eventually deleted + Thread.sleep(debugDelaySec * 1000 / 2 + 1000); + verifyContainerDir(appId, cId, false, true); + } + + /** + * Launch a container in debug mode keeping the container files for a few + * seconds. + * Test that application dir deletion does not override the specified delay + * @throws IOException + * @throws YarnException + * @throws InterruptedException + */ + @Test + public void testDelayedDeletion() + throws IOException, YarnException, InterruptedException { + final int debugDelaySec = 3; + + // Run app with container and wait until it is completed + Map.Entry pair = runContainerWithDebugDelay + (debugDelaySec); + String appId = pair.getKey().getAppId().toString(); + String cId = pair.getValue().toString(); + + int retry = 150; + while(--retry > 0 && + pair.getKey().getApplicationState()==ApplicationState.RUNNING) + Thread.sleep(10); + + // Verify that containers are kept as specified in the policy + Thread.sleep(debugDelaySec * 1000 / 2); + verifyContainerDir(appId, cId, true, false); + verifyContainerDir(appId, cId, true, true); + + // Verify that containers are get eventually deleted with the application + Thread.sleep(debugDelaySec * 1000 / 2 + 1000); + verifyContainerDir(appId, cId, false, false); + verifyContainerDir(appId, cId, false, true); + } + + /** + * Launch a container in debug mode keeping the container files for a few + * seconds. + * Test that application dir deletion does not override the specified delay + * @throws IOException + * @throws YarnException + * @throws InterruptedException + */ + @Test + public void testDelayedKeep() throws IOException, YarnException, + InterruptedException { + final int debugDelaySec = KEEP_FOREVER; + + // Run app with container and wait until it is completed + Map.Entry pair = runContainerWithDebugDelay + (debugDelaySec); + String appId = pair.getKey().getAppId().toString(); + String cId = pair.getValue().toString(); + + int retry = 150; + while(--retry > 0 && + pair.getKey().getApplicationState()==ApplicationState.RUNNING) + Thread.sleep(10); + + // Verify that containers are kept indefinitely + Thread.sleep(1000); + verifyContainerDir(appId, cId, true, false); + verifyContainerDir(appId, cId, true, true); + } + @Test public void testContainerLaunchFromPreviousRM() throws IOException, InterruptedException, YarnException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java index caebef7..b2a5375 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java @@ -120,7 +120,8 @@ public long getVCoresAllocatedForContainers() { protected DeletionService createDeletionService() { return new DeletionService(exec) { @Override - public void delete(String user, Path subDir, Path... baseDirs) { + public void deleteWithDelay(String user, int debugDelaySec, Path subDir, + Path... baseDirs) { // Don't do any deletions. if (shouldDeleteWait) { try { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java index 8feca21..6bc3812 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.webapp; +import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -99,4 +100,8 @@ public String getFlowVersion() { public long getFlowRunId() { return flowRunId; } + + public Date getDelayedDeletionTime() { + return null; + } }