diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java index 6d4bccd..302745d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java @@ -228,4 +228,20 @@ public static ContainerLaunchContext newInstance( @Unstable public abstract void setContainerRetryContext( ContainerRetryContext containerRetryContext); + + /** + * Get the delay before deleting resources to ease debugging of NM issues. + * @return delay before deleting resources in seconds. + */ + @Public + @Unstable + public abstract int getDeleteDelaySec(); + + /** + * Set the delay before deleting resources to ease debugging of NM issues. + * @param deleteDelaySec delay before deleting resources in seconds + */ + @Public + @Unstable + public abstract void setDeleteDelaySec(int deleteDelaySec); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 3f84a23..1beb940 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -93,9 +93,10 @@ private static void addDeprecatedKeys() { public static final String YARN_PREFIX = "yarn."; - /** Delay before deleting resource to ease debugging of NM issues */ + /** Delay before deleting resource to ease debugging of NM issues. */ public static final String DEBUG_NM_DELETE_DELAY_SEC = YarnConfiguration.NM_PREFIX + "delete.debug-delay-sec"; + public static final int DEFAULT_DEBUG_NM_DELETE_DELAY_SEC = 0; public static final String NM_LOG_CONTAINER_DEBUG_INFO = YarnConfiguration.NM_PREFIX + "log-container-debug-info.enabled"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 9c746fd..38392d5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -528,6 +528,7 @@ message ContainerLaunchContextProto { repeated string command = 5; repeated ApplicationACLMapProto application_ACLs = 6; optional ContainerRetryContextProto container_retry_context = 7; + optional int32 delete_delay_sec = 8 [default = 0]; } message ContainerStatusProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java index 1efe541..3c21cd8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java @@ -59,6 +59,7 @@ private List commands = null; private Map applicationACLS = null; private ContainerRetryContext containerRetryContext = null; + private int deleteDelaySec = 0; public ContainerLaunchContextPBImpl() { builder = ContainerLaunchContextProto.newBuilder(); @@ -127,6 +128,9 @@ private void mergeLocalToBuilder() { builder.setContainerRetryContext( convertToProtoFormat(this.containerRetryContext)); } + if (this.deleteDelaySec != 0) { + builder.setDeleteDelaySec(this.deleteDelaySec); + } } private void mergeLocalToProto() { @@ -507,4 +511,31 @@ private ContainerRetryContextProto convertToProtoFormat( ContainerRetryContext t) { return ((ContainerRetryContextPBImpl)t).getProto(); } + /** + * Get the delay before deleting resources to ease debugging of NM issues. + * @return delay before deleting resources in seconds. + */ + public int getDeleteDelaySec() { + ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder; + if (this.deleteDelaySec != 0) { + return this.deleteDelaySec; + } + if (!p.hasDeleteDelaySec()) { + return 0; + } + return p.getDeleteDelaySec(); + } + + /** + * Set the delay before deleting resources to ease debugging of NM issues. + * @param deleteDelaySec delay before deleting resources in seconds + */ + public void setDeleteDelaySec(int deleteDelaySec) { + maybeInitBuilder(); + if (deleteDelaySec == 0) { + builder.clearDeleteDelaySec(); + } + this.deleteDelaySec = deleteDelaySec; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java index aac0af9..6c9dbf4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java @@ -53,9 +53,15 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; +/** + * This service is responsible for asynchronously deleting files with + * an optional delay. + */ public class DeletionService extends AbstractService { + public static final int KEEP_FOREVER = -1; static final Log LOG = LogFactory.getLog(DeletionService.class); - private int debugDelay; + // Default debug delay on all containers specified by the configuration. + private int debugDelayDefault; private final ContainerExecutor exec; private ScheduledThreadPoolExecutor sched; private static final FileContext lfs = getLfs(); @@ -78,37 +84,86 @@ public DeletionService(ContainerExecutor exec, NMStateStoreService stateStore) { super(DeletionService.class.getName()); this.exec = exec; - this.debugDelay = 0; + this.debugDelayDefault = 0; this.stateStore = stateStore; } - + /** * Delete the path(s) as this user. - * @param user The user to delete as, or the JVM user if null + * @param user the user to delete as, or the JVM user if null * @param subDir the sub directory name * @param baseDirs the base directories which contains the subDir's */ - public void delete(String user, Path subDir, Path... baseDirs) { + public void delete(String user, Path subDir, + Path... baseDirs) { + deleteWithDelay(user, 0, subDir, baseDirs); + } + + /** + * Delete the path(s) as this user with a custom retention policy. + * @param user the user to delete as, or the JVM user if null + * @param debugDelaySec seconds to wait before deleting. Use 0 as default + * @param subDir the sub directory name + * @param baseDirs the base directories which contains the subDir's + */ + public void deleteWithDelay(String user, int debugDelaySec, Path subDir, + Path... baseDirs) { // TODO if parent owned by NM, rename within parent inline - if (debugDelay != -1) { + if (debugDelayDefault != KEEP_FOREVER && + debugDelaySec != KEEP_FOREVER) { List baseDirList = null; if (baseDirs != null && baseDirs.length != 0) { baseDirList = Arrays.asList(baseDirs); } FileDeletionTask task = new FileDeletionTask(this, user, subDir, baseDirList); - recordDeletionTaskInStateStore(task); - sched.schedule(task, debugDelay, TimeUnit.SECONDS); + scheduleFileDeletionTask(task, debugDelaySec); } } - + + /** + * Schedules a task that deletes a file. Do not use this function unless + * you know what you are doing. Use delete() instead. + * @param fileDeletionTask task to schedule + */ public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask) { - if (debugDelay != -1) { - recordDeletionTaskInStateStore(fileDeletionTask); - sched.schedule(fileDeletionTask, debugDelay, TimeUnit.SECONDS); + scheduleFileDeletionTask(fileDeletionTask, 0); + } + + /** + * Schedules a task that deletes a file. Do not use this function unless + * you know what you are doing. Use deleteWithDelay() instead. + * @param fileDeletionTask the task to schedule + * @param debugDelaySec optional delay in seconds for debugging (default to 0) + */ + private void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask, + int debugDelaySec) { + if (debugDelaySec < -1) { + LOG.warn("delay time out of bounds. " + debugDelaySec); + debugDelaySec = 0; + } + + if (debugDelayDefault != KEEP_FOREVER && + debugDelaySec != KEEP_FOREVER) { + int effectiveDelay = Math.max(debugDelayDefault, debugDelaySec); + + recordDeletionTaskInStateStore(fileDeletionTask, effectiveDelay); + sched.schedule(fileDeletionTask, effectiveDelay, TimeUnit.SECONDS); } } - + + /** + * Verify the scheduled task count for testing purposes only. + * @return scheduled task count + */ + public long getTaskCount() { + if (sched != null) { + return sched.getTaskCount(); + } else { + return 0; + } + } + @Override protected void serviceInit(Configuration conf) throws Exception { ThreadFactory tf = new ThreadFactoryBuilder() @@ -118,7 +173,9 @@ protected void serviceInit(Configuration conf) throws Exception { sched = new HadoopScheduledThreadPoolExecutor( conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf); - debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0); + debugDelayDefault = conf.getInt( + YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, + YarnConfiguration.DEFAULT_DEBUG_NM_DELETE_DELAY_SEC); } else { sched = new HadoopScheduledThreadPoolExecutor( YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf); @@ -437,7 +494,8 @@ private int generateTaskId() { return taskId; } - private void recordDeletionTaskInStateStore(FileDeletionTask task) { + private void recordDeletionTaskInStateStore(FileDeletionTask task, + long debugDelay) { if (!stateStore.canRecover()) { // optimize the case where we aren't really recording return; @@ -452,7 +510,7 @@ private void recordDeletionTaskInStateStore(FileDeletionTask task) { // store successors first to ensure task IDs have been generated for them for (FileDeletionTask successor : successors) { - recordDeletionTaskInStateStore(successor); + recordDeletionTaskInStateStore(successor, debugDelay); } DeletionServiceDeleteTaskProto.Builder builder = @@ -495,4 +553,4 @@ public DeletionTaskRecoveryInfo(FileDeletionTask task, this.deletionTimestamp = deletionTimestamp; } } -} \ No newline at end of file +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java index aee0862..567870e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.application; +import java.util.Date; import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -27,6 +28,8 @@ public interface Application extends EventHandler { + Date NEVER = new Date(Long.MAX_VALUE); + String getUser(); Map getContainers(); @@ -40,4 +43,6 @@ String getFlowVersion(); long getFlowRunId(); + + Date getDelayedDeletionTime(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index b9197c2..3d5d456 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.application; import java.io.IOException; +import java.util.Date; import java.util.EnumSet; import java.util.HashMap; import java.util.Map; @@ -27,6 +28,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import com.google.protobuf.ByteString; +import org.apache.commons.lang.time.DateUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.DataOutputBuffer; @@ -36,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl; @@ -66,6 +69,8 @@ import org.apache.hadoop.yarn.state.StateMachineFactory; import com.google.common.annotations.VisibleForTesting; +import static org.apache.hadoop.yarn.server.nodemanager.DeletionService.KEEP_FOREVER; + /** * The state machine for the representation of an Application * within the NodeManager. @@ -92,6 +97,13 @@ new HashMap(); /** + * Containers may provide a retention policy on their local directories. + * We track the latest container deletion time here, so that we do not clean + * up the application directories before the container dirs are deleted. + */ + private Date delayedDeletionTime = null; + + /** * The timestamp when the log aggregation has started for this application. * Used to determine the age of application log files during log aggregation. * When logAggregationRentention policy is enabled, log files older than @@ -467,6 +479,7 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { public void transition(ApplicationImpl app, ApplicationEvent event) { ApplicationContainerFinishedEvent containerEvent = (ApplicationContainerFinishedEvent) event; + app.calculateRetention(containerEvent.getContainerID()); if (null == app.containers.remove(containerEvent.getContainerID())) { LOG.warn("Removing unknown " + containerEvent.getContainerID() + " from application " + app.toString()); @@ -528,6 +541,7 @@ public ApplicationState transition(ApplicationImpl app, (ApplicationContainerFinishedEvent) event; LOG.info("Removing " + containerFinishEvent.getContainerID() + " from application " + app.toString()); + app.calculateRetention(containerFinishEvent.getContainerID()); app.containers.remove(containerFinishEvent.getContainerID()); if (app.containers.isEmpty()) { @@ -641,4 +655,45 @@ public String getFlowVersion() { public long getFlowRunId() { return flowContext == null ? 0L : flowContext.getFlowRunId(); } + + /** + * Estimate retention policy based on the container stop time. + * @param cId container ID that might extend the current retention period + */ + private void calculateRetention(ContainerId cId) { + Container c = containers.get(cId); + if (c != null) { + ContainerLaunchContext launchContext = c.getLaunchContext(); + if (launchContext != null) { + int deleteDelaySec = launchContext.getDeleteDelaySec(); + if (deleteDelaySec == KEEP_FOREVER) { + // Pick the maximum period to mark forever. + // This makes the logic below more simple + delayedDeletionTime = Application.NEVER; + } else if (deleteDelaySec > 0) { + // Wait for the container with the longest delay + Date newDeletionTime = DateUtils.addSeconds(new Date(), + launchContext.getDeleteDelaySec()); + delayedDeletionTime = delayedDeletionTime == null ? + newDeletionTime : + delayedDeletionTime.before(newDeletionTime) ? + newDeletionTime : + delayedDeletionTime; + } + } + } + } + + /** + * The time until we need to keep contianer directories. + * @return the latest delayed deletion time of any container in this + * app. The value Application.NEVER is reserved for keeping forever. + */ + @Override + public Date getDelayedDeletionTime() { + return delayedDeletionTime != null ? + new Date(delayedDeletionTime.getTime()) : + null; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 4cd1acc..49c12ad 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -30,6 +30,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Date; import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; @@ -79,6 +80,7 @@ import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.URL; @@ -559,6 +561,11 @@ private void handleCleanupContainerResources( String appIDStr = c.getContainerId().getApplicationAttemptId().getApplicationId() .toString(); + ContainerLaunchContext launchContext = c.getLaunchContext(); + int debugDelaySec = 0; + if (launchContext != null) { + debugDelaySec = launchContext.getDeleteDelaySec(); + } // Try deleting from good local dirs and full local dirs because a dir might // have gone bad while the app was running(disk full). In addition @@ -572,14 +579,14 @@ private void handleCleanupContainerResources( Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE); Path appDir = new Path(allAppsdir, appIDStr); Path containerDir = new Path(appDir, containerIDStr); - submitDirForDeletion(userName, containerDir); + submitDirForDeletion(userName, containerDir, debugDelaySec); // Delete the nmPrivate container-dir Path sysDir = new Path(localDir, NM_PRIVATE_DIR); Path appSysDir = new Path(sysDir, appIDStr); Path containerSysDir = new Path(appSysDir, containerIDStr); - submitDirForDeletion(null, containerSysDir); + submitDirForDeletion(null, containerSysDir, debugDelaySec); } dispatcher.getEventHandler().handle( @@ -587,15 +594,15 @@ private void handleCleanupContainerResources( ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP)); } - private void submitDirForDeletion(String userName, Path dir) { + private void submitDirForDeletion(String userName, Path dir, + int debugDelaySec) { try { lfs.getFileStatus(dir); - delService.delete(userName, dir, new Path[] {}); + delService.deleteWithDelay(userName, debugDelaySec, dir); } catch (UnsupportedFileSystemException ue) { LOG.warn("Local dir " + dir + " is an unsupported filesystem", ue); } catch (IOException ie) { - // ignore - return; + LOG.info("Local dir " + dir + " cannot be deleted", ie); } } @@ -626,6 +633,23 @@ private void handleDestroyApplicationResources(Application application) { // Delete the application directories userName = application.getUser(); appIDStr = application.toString(); + // Each container may specify a retention policy. We should wait for the + // last one before deleting the application directories. + int debugDelaySec = 0; + Date applicationCleanupTime = application.getDelayedDeletionTime(); + if (applicationCleanupTime != null) { + Date now = new Date(); + if (applicationCleanupTime.equals(Application.NEVER)) { + // The maximum value is reserved for keep forever + debugDelaySec = DeletionService.KEEP_FOREVER; + } else if (now.before(applicationCleanupTime)) { + // There is a valid retention policy set on at least one container. + // Wait for the specified time rounded up to the nearest second. + debugDelaySec = + (int)(((applicationCleanupTime.getTime() - now.getTime()) + + 999) / 1000); + } + } for (String localDir : dirsHandler.getLocalDirsForCleanup()) { @@ -634,12 +658,12 @@ private void handleDestroyApplicationResources(Application application) { Path userdir = new Path(usersdir, userName); Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE); Path appDir = new Path(allAppsdir, appIDStr); - submitDirForDeletion(userName, appDir); + submitDirForDeletion(userName, appDir, debugDelaySec); // Delete the nmPrivate app-dir Path sysDir = new Path(localDir, NM_PRIVATE_DIR); Path appSysDir = new Path(sysDir, appIDStr); - submitDirForDeletion(null, appSysDir); + submitDirForDeletion(null, appSysDir, debugDelaySec); } // TODO: decrement reference counts of all resources associated with this diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java index f72a606..2307040 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java @@ -156,6 +156,42 @@ public void testLocalFilesCleanup() throws InterruptedException, } @Override + public void testDelayedDeletionContainerOnly() + throws IOException, YarnException, InterruptedException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testLocalFilesCleanup"); + super.testDelayedDeletionContainerOnly(); + } + + @Override + public void testDelayedDeletion() + throws IOException, YarnException, InterruptedException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testLocalFilesCleanup"); + super.testDelayedDeletion(); + } + + @Override + public void testDelayedKeep() + throws IOException, YarnException, InterruptedException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testLocalFilesCleanup"); + super.testDelayedKeep(); + } + + @Override public void testContainerLaunchFromPreviousRM() throws InterruptedException, IOException, YarnException { // Don't run the test if the binary is not available. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java index 2e0bbe0..9800b95 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java @@ -18,11 +18,6 @@ package org.apache.hadoop.yarn.server.nodemanager; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -40,6 +35,12 @@ import org.junit.Test; import org.mockito.Mockito; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class TestDeletionService { private static final FileContext lfs = getLfs(); @@ -179,32 +180,34 @@ public void testRelativeDelete() throws Exception { } } + /** + * Test, if we can disable deleting files. + * @throws Exception any exception + */ @Test - public void testNoDelete() throws Exception { + public void testCustomDisableDelete() throws Exception { Random r = new Random(); long seed = r.nextLong(); r.setSeed(seed); - System.out.println("SEED: " + seed); List dirs = buildDirs(r, base, 20); createDirs(new Path("."), dirs); FakeDefaultContainerExecutor exec = new FakeDefaultContainerExecutor(); Configuration conf = new Configuration(); - conf.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, -1); + conf.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, + DeletionService.KEEP_FOREVER); exec.setConf(conf); DeletionService del = new DeletionService(exec); try { del.init(conf); del.start(); for (Path p : dirs) { - del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p, - null); + del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p); } - int msecToWait = 20 * 1000; + + // Verify that nothing is delayed + assertTrue(del.getTaskCount() == 0); + // Verify that nothing was deleted for (Path p : dirs) { - while (msecToWait > 0 && lfs.util().exists(p)) { - Thread.sleep(100); - msecToWait -= 100; - } assertTrue(lfs.util().exists(p)); } } finally { @@ -212,6 +215,55 @@ public void testNoDelete() throws Exception { } } + /** + * Test, if we can apply a retention policy on the fly. + * @throws Exception An arbitrary exception + */ + @Test + public void testCustomRetentionPolicy() throws Exception { + int debugDelay = 1; + Random r = new Random(); + long seed = r.nextLong(); + r.setSeed(seed); + List dirs = buildDirs(r, base, 20); + createDirs(new Path("."), dirs); + FakeDefaultContainerExecutor exec = new FakeDefaultContainerExecutor(); + Configuration conf = new Configuration(); + exec.setConf(conf); + DeletionService del = new DeletionService(exec); + try { + del.init(conf); + del.start(); + + long start = System.currentTimeMillis(); + + for (Path p : dirs) { + // Specify that the files should be kept for a second + del.deleteWithDelay( + (Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", + debugDelay, p); + } + + // Wait until all files are deleted + boolean allDeleted; + do { + allDeleted = true; + for (Path p : dirs) { + allDeleted = allDeleted && !lfs.util().exists(p); + } + Thread.sleep(10); + } while (!allDeleted && + System.currentTimeMillis() - start < 20000); + + long end = System.currentTimeMillis(); + long diff = end - start; + //Make sure we waited for the specified amount of time + assertTrue(diff >= debugDelay * 1000); + } finally { + del.stop(); + } + } + @Test public void testStopWithDelayedTasks() throws Exception { DeletionService del = new DeletionService(Mockito.mock(ContainerExecutor.class)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index d359c3d..bdff62d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -264,11 +264,12 @@ protected NMTokenIdentifier selectNMTokenIdentifier( protected DeletionService createDeletionService() { return new DeletionService(exec) { @Override - public void delete(String user, Path subDir, Path... baseDirs) { + public void deleteWithDelay(String deleteAs, int debugDelaySec, + Path subDir, Path... baseDirs) { // Don't do any deletions. - LOG.info("Psuedo delete: user - " + user + ", subDir - " + subDir + LOG.info("Psuedo delete: user - " + deleteAs + ", subDir - " + subDir + ", baseDirs - " + Arrays.asList(baseDirs)); - }; + } }; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 0c083f2..51f04a7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; +import static org.apache.hadoop.yarn.server.nodemanager.DeletionService.KEEP_FOREVER; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.never; import static org.mockito.Mockito.timeout; @@ -31,8 +32,10 @@ import java.io.PrintWriter; import java.net.InetAddress; import java.nio.ByteBuffer; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -92,6 +95,7 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; @@ -215,7 +219,7 @@ public void testContainerSetup() throws Exception { rsrc_alpha.setType(LocalResourceType.FILE); rsrc_alpha.setTimestamp(file.lastModified()); String destinationFile = "dest_file"; - Map localResources = + Map localResources = new HashMap(); localResources.put(destinationFile, rsrc_alpha); containerLaunchContext.setLocalResources(localResources); @@ -1109,22 +1113,22 @@ public void testLocalFilesCleanup() throws InterruptedException, + " doesn't exist!!", appDir.exists()); Assert.assertTrue("AppSysDir " + appSysDir.getAbsolutePath() + " doesn't exist!!", appSysDir.exists()); - for (File f : new File[] { containerDir, containerSysDir }) { + for (File f : new File[] {containerDir, containerSysDir}) { Assert.assertFalse(f.getAbsolutePath() + " exists!!", f.exists()); } Assert.assertFalse(targetFile.getAbsolutePath() + " exists!!", targetFile.exists()); // Simulate RM sending an AppFinish event. - containerManager.handle(new CMgrCompletedAppsEvent(Arrays - .asList(new ApplicationId[] { appId }), CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN)); + containerManager.handle(new CMgrCompletedAppsEvent(Collections. + singletonList(appId), CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN)); BaseContainerManagerTest.waitForApplicationState(containerManager, cId.getApplicationAttemptId().getApplicationId(), ApplicationState.FINISHED); // Now ascertain that the resources are localised correctly. - for (File f : new File[] { appDir, containerDir, appSysDir, + for (File f : new File[] {appDir, containerDir, appSysDir, containerSysDir }) { // Wait for deletion. Deletion can happen long after AppFinish because of // the async DeletionService @@ -1143,6 +1147,259 @@ public void testLocalFilesCleanup() throws InterruptedException, targetFile.exists()); } + /** + * Verify whether the container dir exists or not. + * @param message description of what is being tested + * @param appIDStr application ID + * @param containerIDStr container ID + * @param shouldExist test existence or absence + */ + private void verifyContainerDir(String message, String appIDStr, + String containerIDStr, boolean shouldExist) { + File userCacheDir = new File(localDir, ContainerLocalizer.USERCACHE); + File userDir = new File(userCacheDir, user); + File appCache = new File(userDir, ContainerLocalizer.APPCACHE); + File appDir = new File(appCache, appIDStr); + File containerDir = new File(appDir, containerIDStr); + + File sysDir = + new File(localDir, + ResourceLocalizationService.NM_PRIVATE_DIR); + File appSysDir = new File(sysDir, appIDStr); + File containerSysDir = new File(appSysDir, containerIDStr); + + if (shouldExist) { + // ContainerDir should still exist + Assert.assertTrue(message + " ContainerDir " + + containerDir.getAbsolutePath() + + " doesn't exist", containerDir.exists()); + Assert.assertTrue(message + " ContainerSysDir " + containerSysDir + .getAbsolutePath() + + " doesn't exist", containerSysDir.exists()); + } else { + // ContainerDir should not exist + Assert.assertFalse(message + " ContainerDir " + + containerDir.getAbsolutePath() + + " exists", containerDir.exists()); + Assert.assertFalse(message + " ContainerSysDir " + + containerSysDir.getAbsolutePath() + + " exists", containerSysDir.exists()); + } + } + + /** + * Verify whether the application dir exists or not. + * @param message description of what is being tested + * @param appIDStr application ID + * @param shouldExist test existence or absence + */ + private void verifyAppDir(String message, String appIDStr, + boolean shouldExist) { + File userCacheDir = new File(localDir, ContainerLocalizer.USERCACHE); + File userDir = new File(userCacheDir, user); + File appCache = new File(userDir, ContainerLocalizer.APPCACHE); + File appDir = new File(appCache, appIDStr); + + File sysDir = + new File(localDir, + ResourceLocalizationService.NM_PRIVATE_DIR); + File appSysDir = new File(sysDir, appIDStr); + + if (shouldExist) { + // appDir should still exist + Assert.assertTrue(message + " appDir " + appDir.getAbsolutePath() + + " doesn't exist", appDir.exists()); + Assert.assertTrue("appSysDir " + appSysDir.getAbsolutePath() + + " doesn't exist", appSysDir.exists()); + } else { + // appDir should not exist + Assert.assertFalse(message + " appDir " + appDir.getAbsolutePath() + + " exists", appDir.exists()); + Assert.assertFalse("appSysDir " + appSysDir.getAbsolutePath() + + " exists", appSysDir.exists()); + } + } + + /** + * Wait for an application directory deleted or the timeout elapsed. + * @param appIDStr application id + * @param timeoutMs timeout in milliseconds + * @throws InterruptedException the thread wait was interrupted + */ + private void waitForApplicationDirDeleted(String appIDStr, long timeoutMs) + throws InterruptedException { + File userCacheDir = new File(localDir, ContainerLocalizer.USERCACHE); + File userDir = new File(userCacheDir, user); + File appCache = new File(userDir, ContainerLocalizer.APPCACHE); + File appDir = new File(appCache, appIDStr); + + File sysDir = + new File(localDir, + ResourceLocalizationService.NM_PRIVATE_DIR); + File appSysDir = new File(sysDir, appIDStr); + + // wait for the deletion of the directory + long start = System.currentTimeMillis(); + while ((appDir.exists() || appSysDir.exists()) && + System.currentTimeMillis() - start < timeoutMs) { + Thread.sleep(10); + } + } + + /** + * Run a container with delay, so that delayed deletion of resources can be + * tested. + * @param debugDelaySec delay container deletion with this number of seconds + * @return the application that ran + * @throws IOException file I/O failed + * @throws YarnException general YARN execution error + * @throws InterruptedException thread interrupted + */ + private Map.Entry runContainerWithDebugDelay( + int debugDelaySec) + throws IOException, YarnException, InterruptedException { + delSrvc = new DeletionService(exec); + delSrvc.init(conf); + containerManager = createContainerManager(delSrvc); + containerManager.init(conf); + containerManager.start(); + + List list = new ArrayList<>(); + + // Create a delayed cleanup container that sleeps for a second + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + containerLaunchContext.setDeleteDelaySec(debugDelaySec); + containerLaunchContext.setCommands(Collections.singletonList("sleep 1")); + + ContainerId cId = createContainerId(0); + ApplicationId appId = cId.getApplicationAttemptId().getApplicationId(); + + StartContainerRequest scRequest = + StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(cId, DUMMY_RM_IDENTIFIER, context + .getNodeId(), + user, context.getContainerTokenSecretManager())); + list.add(scRequest); + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + BaseContainerManagerTest.waitForContainerState(containerManager, cId, + ContainerState.COMPLETE); + + // Simulate RM closing the app after the containers finished + Application app = + containerManager.getContext().getApplications().get(appId); + List appIds = Collections.singletonList(appId); + containerManager.handle(new CMgrCompletedAppsEvent(appIds, + CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER)); + + return new AbstractMap.SimpleEntry<>(app, cId); + } + + /** + * Launch a container in debug mode keeping the container files for a few + * seconds. + * @throws IOException file I/O failed + * @throws YarnException general YARN execution error + * @throws InterruptedException thread interrupted + */ + @Test + public void testDelayedDeletionContainerOnly() + throws IOException, YarnException, InterruptedException { + final int debugDelaySec = 3; + + // Run app with container and wait until it is completed + Map.Entry pair = runContainerWithDebugDelay( + debugDelaySec); + String appId = pair.getKey().getAppId().toString(); + String cId = pair.getValue().toString(); + + // Verify that containers are kept as specified in the policy + Thread.sleep(debugDelaySec * 1000 / 2); + verifyContainerDir("Verify that the container directory is still alive by" + + " deletion policy", appId, cId, true); + + // Verify that containers are get eventually deleted + Thread.sleep(debugDelaySec * 1000 / 2); + waitForApplicationDirDeleted(appId, 5000); + verifyContainerDir("Verify that the container directory is deleted by " + + "deletion policy", appId, cId, false); + } + + /** + * Launch a container in debug mode keeping the container files for a few + * seconds. + * Test that application dir deletion does not override the specified delay + * @throws IOException file I/O failed + * @throws YarnException general YARN execution error + * @throws InterruptedException thread interrupted + */ + @Test + public void testDelayedDeletion() + throws IOException, YarnException, InterruptedException { + final int debugDelaySec = 3; + + // Run app with container and wait until it is completed + Map.Entry pair = runContainerWithDebugDelay( + debugDelaySec); + String appId = pair.getKey().getAppId().toString(); + String cId = pair.getValue().toString(); + + int retry = 150; + while(--retry > 0 && + pair.getKey().getApplicationState()==ApplicationState.RUNNING) { + Thread.sleep(10); + } + + // Verify that containers are kept as specified in the policy + Thread.sleep(debugDelaySec * 1000 / 2); + verifyAppDir("Verify that the application directory is still alive by " + + "policy", appId, true); + verifyContainerDir("Verify that the container directory is still alive by" + + " policy", appId, cId, true); + + // Verify that containers are get eventually deleted with the application + Thread.sleep(debugDelaySec * 1000 / 2); + waitForApplicationDirDeleted(appId, 5000); + verifyContainerDir("Verify that the container directory is deleted by " + + "policy", appId, cId, false); + verifyAppDir("Verify that the application directory is deleted by " + + "policy", appId, false); + } + + /** + * Launch a container in debug mode keeping the container files for a few + * seconds. + * Test that application dir deletion does not override the specified delay + * @throws IOException file I/O failed + * @throws YarnException general YARN execution error + * @throws InterruptedException thread interrupted + */ + @Test + public void testDelayedKeep() throws IOException, YarnException, + InterruptedException { + // Run app with container and wait until it is completed + Map.Entry pair = runContainerWithDebugDelay( + KEEP_FOREVER); + String appId = pair.getKey().getAppId().toString(); + String cId = pair.getValue().toString(); + + int retry = 150; + while(--retry > 0 && + pair.getKey().getApplicationState()==ApplicationState.RUNNING) { + Thread.sleep(10); + } + + // Verify that containers are kept indefinitely + Thread.sleep(200); + verifyAppDir("Verify that the application is kept forever", appId, true); + verifyContainerDir("Verify that the container is kept forever", appId, + cId, true); + } + @Test public void testContainerLaunchFromPreviousRM() throws IOException, InterruptedException, YarnException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java index caebef7..b2a5375 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java @@ -120,7 +120,8 @@ public long getVCoresAllocatedForContainers() { protected DeletionService createDeletionService() { return new DeletionService(exec) { @Override - public void delete(String user, Path subDir, Path... baseDirs) { + public void deleteWithDelay(String user, int debugDelaySec, Path subDir, + Path... baseDirs) { // Don't do any deletions. if (shouldDeleteWait) { try { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java index 8feca21..6bc3812 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.webapp; +import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -99,4 +100,8 @@ public String getFlowVersion() { public long getFlowRunId() { return flowRunId; } + + public Date getDelayedDeletionTime() { + return null; + } }