diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java index 760e251..90d8d79 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java @@ -241,7 +241,29 @@ * Comma separate list of directories that the container should use for * logging. */ - LOG_DIRS("LOG_DIRS"); + LOG_DIRS("LOG_DIRS"), + + /** + * $DEBUG_DELETE_DELAY + * Number of seconds after an application finishes before the nodemanager's + * DeletionService will delete the application's localized file directory + * and log directory. + * + * To diagnose Yarn application problems, set this property's value large + * enough (for example, to 600 = 10 minutes) to permit examination of these + * directories. + + * The configuration is appplication request specific. It has effect only + * if specified as an environment variable on the client in the container + * launch context. Use yarn.nodemanager.delete.debug-delay-sec to specify + * a service wide delay. + + * The maximum of yarn.nodemanager.delete.debug-delay-sec and this value + * will be the effective delay. + * Use yarn.nodemanager.delete.max-debug-delay-sec to put a cap on how + * much an application can delay the deletion of the files. + */ + DEBUG_DELETE_DELAY("DEBUG_DELETE_DELAY"); private final String variable; private Environment(String variable) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java index 6d4bccd..1f116bf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java @@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.AuxiliaryService; @@ -228,4 +229,28 @@ public static ContainerLaunchContext newInstance( @Unstable public abstract void setContainerRetryContext( ContainerRetryContext containerRetryContext); + + /** + * Delayed deletion time of directories in seconds. + * @return the specified debug delay in the environment + */ + @Public + @Unstable + public int getDeleteDelaySec() { + int environmentDelay = 0; + Map environment = getEnvironment(); + if (environment != null) { + try { + String value = environment + .get(ApplicationConstants.Environment.DEBUG_DELETE_DELAY.key()); + if (value != null) { + environmentDelay = Integer.parseInt(value); + environmentDelay = Math.max(environmentDelay, 0); + } + } catch (NullPointerException | NumberFormatException e) { + environmentDelay = 0; + } + } + return environmentDelay; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 1fd25a7..9093980 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -93,9 +93,19 @@ private static void addDeprecatedKeys() { public static final String YARN_PREFIX = "yarn."; - /** Delay before deleting resource to ease debugging of NM issues */ + /** Delay before deleting resource to ease debugging of NM issues. */ public static final String DEBUG_NM_DELETE_DELAY_SEC = - YarnConfiguration.NM_PREFIX + "delete.debug-delay-sec"; + YarnConfiguration.NM_PREFIX + "delete.debug-delay-sec"; + public static final int DEFAULT_DEBUG_NM_DELETE_DELAY_SEC = 0; + + /** Maximum delay before deleting the resource. + * This setting limits YarnConfiguration.DEFAULT_DEBUG_NM_DELETE_DELAY_SEC + * and any container specific delay as well for reliability and security*/ + public static final String DEBUG_NM_MAX_PER_APPLICATION_DELETE_DELAY_SEC = + YarnConfiguration.NM_PREFIX + + "delete.max-per-application-debug-delay-sec"; + public static final int + DEFAULT_DEBUG_NM_MAX_PER_APPLICATION_DELETE_DELAY_SEC = 0; public static final String NM_LOG_CONTAINER_DEBUG_INFO = YarnConfiguration.NM_PREFIX + "log-container-debug-info.enabled"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index e890b40..fabc139 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1008,18 +1008,22 @@ - Number of seconds after an application finishes before the nodemanager's + Number of seconds after an application finishes before the nodemanager's DeletionService will delete the application's localized file directory and log directory. - + To diagnose Yarn application problems, set this property's value large enough (for example, to 600 = 10 minutes) to permit examination of these - directories. After changing the property's value, you must restart the - nodemanager in order for it to have an effect. + directories. + + After changing the property's value, you must restart the nodemanager + in order for it to have an effect. It applies to all containers. + To find a setting that affects only a single application run please + refer to Environment.DELETE_DELAY The roots of Yarn applications' work directories is configurable with the yarn.nodemanager.local-dirs property (see below), and the roots - of the Yarn applications' log directories is configurable with the + of the Yarn applications' log directories is configurable with the yarn.nodemanager.log-dirs property (see also below). yarn.nodemanager.delete.debug-delay-sec @@ -1027,6 +1031,21 @@ + + Maximum mumber of seconds after an application finishes before the + nodemanager's DeletionService will delete the application's localized + file directory and log directory. This value is a security setting to + prevent unreliable or malicious clients keep files arbitrarily. + + This is a maximum limit on the following values + yarn.nodemanager.delete.debug-delay-sec + Environment.DELETE_DELAY + + yarn.nodemanager.delete.max-debug-delay-sec + 600 + + + Keytab for NM. yarn.nodemanager.keytab /etc/krb5.keytab diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java index aac0af9..1be2b7d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java @@ -44,6 +44,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; @@ -53,9 +54,15 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; +/** + * This service is responsible for asynchronously deleting files with + * an optional delay. + */ public class DeletionService extends AbstractService { static final Log LOG = LogFactory.getLog(DeletionService.class); - private int debugDelay; + // Default debug delay on all containers specified by the configuration. + private int debugDelayDefault; + private int maxDebugDelay; private final ContainerExecutor exec; private ScheduledThreadPoolExecutor sched; private static final FileContext lfs = getLfs(); @@ -78,47 +85,127 @@ public DeletionService(ContainerExecutor exec, NMStateStoreService stateStore) { super(DeletionService.class.getName()); this.exec = exec; - this.debugDelay = 0; + this.debugDelayDefault = 0; this.stateStore = stateStore; } - + /** * Delete the path(s) as this user. - * @param user The user to delete as, or the JVM user if null + * @param user the user to delete as, or the JVM user if null + * @param subDir the sub directory name + * @param baseDirs the base directories which contains the subDir's + */ + public void delete(String user, Path subDir, + Path... baseDirs) { + deleteWithDelay(user, 0, subDir, baseDirs); + } + + /** + * Delete the path(s) as this user with a custom retention policy. + * @param user the user to delete as, or the JVM user if null + * @param debugDelaySec seconds to wait before deleting. Use 0 as default * @param subDir the sub directory name * @param baseDirs the base directories which contains the subDir's */ - public void delete(String user, Path subDir, Path... baseDirs) { + public void deleteWithDelay(String user, int debugDelaySec, Path subDir, + Path... baseDirs) { // TODO if parent owned by NM, rename within parent inline - if (debugDelay != -1) { - List baseDirList = null; - if (baseDirs != null && baseDirs.length != 0) { - baseDirList = Arrays.asList(baseDirs); - } - FileDeletionTask task = - new FileDeletionTask(this, user, subDir, baseDirList); - recordDeletionTaskInStateStore(task); - sched.schedule(task, debugDelay, TimeUnit.SECONDS); + List baseDirList = null; + if (baseDirs != null && baseDirs.length != 0) { + baseDirList = Arrays.asList(baseDirs); } + FileDeletionTask task = + new FileDeletionTask(this, user, subDir, baseDirList); + scheduleFileDeletionTask(task, debugDelaySec); } - - public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask) { - if (debugDelay != -1) { - recordDeletionTaskInStateStore(fileDeletionTask); - sched.schedule(fileDeletionTask, debugDelay, TimeUnit.SECONDS); + + /** + * Verifies and limits the user specified debug delay. + * @param debugDelaySec requested delay from the client + * @return effective delay based on configuration + */ + int getEffectiveDelaySec(int debugDelaySec) { + int limitedDelay = 0; + + if (debugDelaySec < 0) { + LOG.warn("Delay time out of bounds. " + debugDelaySec); + limitedDelay = 0; + } else { + limitedDelay = debugDelaySec; } + + if (limitedDelay > maxDebugDelay) { + LOG.info(String.format("Limiting debug delay %d to %d", + limitedDelay, maxDebugDelay)); + limitedDelay = Math.min(limitedDelay, maxDebugDelay); + } + + int effectiveDelay; + effectiveDelay = Math.max(debugDelayDefault, limitedDelay); + + return effectiveDelay; } - + + /** + * Schedules a task that deletes a file. Do not use this function unless + * you know what you are doing. Use delete() instead. + * @param fileDeletionTask task to schedule + */ + public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask) { + scheduleFileDeletionTask(fileDeletionTask, 0); + } + + /** + * Schedules a task that deletes a file. Do not use this function unless + * you know what you are doing. Use deleteWithDelay() instead. + * @param fileDeletionTask the task to schedule + * @param debugDelaySec optional delay in seconds for debugging (default to 0) + */ + private void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask, + int debugDelaySec) { + int effectiveDelay = getEffectiveDelaySec(debugDelaySec); + recordDeletionTaskInStateStore(fileDeletionTask, effectiveDelay); + sched.schedule(fileDeletionTask, effectiveDelay, TimeUnit.SECONDS); + } + + /** + * Peek the beginning of the queue. + * @return scheduled task count + */ + @VisibleForTesting + ScheduledThreadPoolExecutor getSched() { + return sched; + } + @Override protected void serviceInit(Configuration conf) throws Exception { ThreadFactory tf = new ThreadFactoryBuilder() - .setNameFormat("DeletionService #%d") - .build(); + .setNameFormat("DeletionService #%d") + .build(); if (conf != null) { sched = new HadoopScheduledThreadPoolExecutor( conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf); - debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0); + debugDelayDefault = conf.getInt( + YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, + YarnConfiguration.DEFAULT_DEBUG_NM_DELETE_DELAY_SEC); + maxDebugDelay = conf.getInt( + YarnConfiguration.DEBUG_NM_MAX_PER_APPLICATION_DELETE_DELAY_SEC, + YarnConfiguration.DEFAULT_DEBUG_NM_MAX_PER_APPLICATION_DELETE_DELAY_SEC); + + // Check input + if (debugDelayDefault < 0){ + throw new YarnException( + String.format("Invalid %s=%d should be greather than or equal to 0", + YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, + debugDelayDefault)); + } + if (maxDebugDelay < 0){ + throw new YarnException( + String.format("Invalid %s=%d should be greater than or equal to 0", + YarnConfiguration.DEBUG_NM_MAX_PER_APPLICATION_DELETE_DELAY_SEC, + maxDebugDelay)); + } } else { sched = new HadoopScheduledThreadPoolExecutor( YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf); @@ -437,7 +524,8 @@ private int generateTaskId() { return taskId; } - private void recordDeletionTaskInStateStore(FileDeletionTask task) { + private void recordDeletionTaskInStateStore(FileDeletionTask task, + long debugDelay) { if (!stateStore.canRecover()) { // optimize the case where we aren't really recording return; @@ -452,7 +540,7 @@ private void recordDeletionTaskInStateStore(FileDeletionTask task) { // store successors first to ensure task IDs have been generated for them for (FileDeletionTask successor : successors) { - recordDeletionTaskInStateStore(successor); + recordDeletionTaskInStateStore(successor, debugDelay); } DeletionServiceDeleteTaskProto.Builder builder = @@ -495,4 +583,4 @@ public DeletionTaskRecoveryInfo(FileDeletionTask task, this.deletionTimestamp = deletionTimestamp; } } -} \ No newline at end of file +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java index aee0862..46ec648 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.application; +import java.util.Date; import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -40,4 +41,6 @@ String getFlowVersion(); long getFlowRunId(); + + Date getDelayedDeletionTime(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index b9197c2..299530f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.application; import java.io.IOException; +import java.util.Date; import java.util.EnumSet; import java.util.HashMap; import java.util.Map; @@ -27,6 +28,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import com.google.protobuf.ByteString; +import org.apache.commons.lang.time.DateUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.DataOutputBuffer; @@ -36,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl; @@ -92,6 +95,13 @@ new HashMap(); /** + * Containers may provide a retention policy on their local directories. + * We track the latest container deletion time here, so that we do not clean + * up the application directories before the container dirs are deleted. + */ + private Date delayedDeletionTime = null; + + /** * The timestamp when the log aggregation has started for this application. * Used to determine the age of application log files during log aggregation. * When logAggregationRentention policy is enabled, log files older than @@ -467,6 +477,7 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { public void transition(ApplicationImpl app, ApplicationEvent event) { ApplicationContainerFinishedEvent containerEvent = (ApplicationContainerFinishedEvent) event; + app.calculateRetention(containerEvent.getContainerID()); if (null == app.containers.remove(containerEvent.getContainerID())) { LOG.warn("Removing unknown " + containerEvent.getContainerID() + " from application " + app.toString()); @@ -528,6 +539,7 @@ public ApplicationState transition(ApplicationImpl app, (ApplicationContainerFinishedEvent) event; LOG.info("Removing " + containerFinishEvent.getContainerID() + " from application " + app.toString()); + app.calculateRetention(containerFinishEvent.getContainerID()); app.containers.remove(containerFinishEvent.getContainerID()); if (app.containers.isEmpty()) { @@ -641,4 +653,41 @@ public String getFlowVersion() { public long getFlowRunId() { return flowContext == null ? 0L : flowContext.getFlowRunId(); } + + /** + * Estimate retention policy based on the container stop time. + * @param cId container ID that might extend the current retention period + */ + private void calculateRetention(ContainerId cId) { + Container c = containers.get(cId); + if (c != null) { + ContainerLaunchContext launchContext = c.getLaunchContext(); + if (launchContext != null) { + int deleteDelaySec = launchContext.getDeleteDelaySec(); + if (deleteDelaySec > 0) { + // Wait for the container with the longest delay + Date newDeletionTime = DateUtils.addSeconds(new Date(), + launchContext.getDeleteDelaySec()); + delayedDeletionTime = delayedDeletionTime == null ? + newDeletionTime : + delayedDeletionTime.before(newDeletionTime) ? + newDeletionTime : + delayedDeletionTime; + } + } + } + } + + /** + * The time until we need to keep contianer directories. + * @return the latest delayed deletion time of any container in this + * app. The value Application.NEVER is reserved for keeping forever. + */ + @Override + public Date getDelayedDeletionTime() { + return delayedDeletionTime != null ? + new Date(delayedDeletionTime.getTime()) : + null; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 4cd1acc..7511133 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -30,6 +30,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Date; import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; @@ -79,6 +80,7 @@ import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.URL; @@ -559,6 +561,11 @@ private void handleCleanupContainerResources( String appIDStr = c.getContainerId().getApplicationAttemptId().getApplicationId() .toString(); + ContainerLaunchContext launchContext = c.getLaunchContext(); + int debugDelaySec = 0; + if (launchContext != null) { + debugDelaySec = launchContext.getDeleteDelaySec(); + } // Try deleting from good local dirs and full local dirs because a dir might // have gone bad while the app was running(disk full). In addition @@ -572,14 +579,14 @@ private void handleCleanupContainerResources( Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE); Path appDir = new Path(allAppsdir, appIDStr); Path containerDir = new Path(appDir, containerIDStr); - submitDirForDeletion(userName, containerDir); + submitDirForDeletion(userName, containerDir, debugDelaySec); // Delete the nmPrivate container-dir Path sysDir = new Path(localDir, NM_PRIVATE_DIR); Path appSysDir = new Path(sysDir, appIDStr); Path containerSysDir = new Path(appSysDir, containerIDStr); - submitDirForDeletion(null, containerSysDir); + submitDirForDeletion(null, containerSysDir, debugDelaySec); } dispatcher.getEventHandler().handle( @@ -587,15 +594,15 @@ private void handleCleanupContainerResources( ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP)); } - private void submitDirForDeletion(String userName, Path dir) { + private void submitDirForDeletion(String userName, Path dir, + int debugDelaySec) { try { lfs.getFileStatus(dir); - delService.delete(userName, dir, new Path[] {}); + delService.deleteWithDelay(userName, debugDelaySec, dir); } catch (UnsupportedFileSystemException ue) { LOG.warn("Local dir " + dir + " is an unsupported filesystem", ue); } catch (IOException ie) { - // ignore - return; + LOG.info("Local dir " + dir + " cannot be deleted", ie); } } @@ -626,6 +633,20 @@ private void handleDestroyApplicationResources(Application application) { // Delete the application directories userName = application.getUser(); appIDStr = application.toString(); + // Each container may specify a retention policy. We should wait for the + // last one before deleting the application directories. + int debugDelaySec = 0; + Date applicationCleanupTime = application.getDelayedDeletionTime(); + if (applicationCleanupTime != null) { + Date now = new Date(); + if (now.before(applicationCleanupTime)) { + // There is a valid retention policy set on at least one container. + // Wait for the specified time rounded up to the nearest second. + debugDelaySec = + (int)(((applicationCleanupTime.getTime() - now.getTime()) + + 999) / 1000); + } + } for (String localDir : dirsHandler.getLocalDirsForCleanup()) { @@ -634,12 +655,12 @@ private void handleDestroyApplicationResources(Application application) { Path userdir = new Path(usersdir, userName); Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE); Path appDir = new Path(allAppsdir, appIDStr); - submitDirForDeletion(userName, appDir); + submitDirForDeletion(userName, appDir, debugDelaySec); // Delete the nmPrivate app-dir Path sysDir = new Path(localDir, NM_PRIVATE_DIR); Path appSysDir = new Path(sysDir, appIDStr); - submitDirForDeletion(null, appSysDir); + submitDirForDeletion(null, appSysDir, debugDelaySec); } // TODO: decrement reference counts of all resources associated with this diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java index f72a606..2307040 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java @@ -156,6 +156,42 @@ public void testLocalFilesCleanup() throws InterruptedException, } @Override + public void testDelayedDeletionContainerOnly() + throws IOException, YarnException, InterruptedException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testLocalFilesCleanup"); + super.testDelayedDeletionContainerOnly(); + } + + @Override + public void testDelayedDeletion() + throws IOException, YarnException, InterruptedException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testLocalFilesCleanup"); + super.testDelayedDeletion(); + } + + @Override + public void testDelayedKeep() + throws IOException, YarnException, InterruptedException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testLocalFilesCleanup"); + super.testDelayedKeep(); + } + + @Override public void testContainerLaunchFromPreviousRM() throws InterruptedException, IOException, YarnException { // Don't run the test if the binary is not available. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java index 2e0bbe0..68ccaba 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java @@ -33,34 +33,40 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +/** + * Unit tests for deletion service. + */ public class TestDeletionService { - private static final FileContext lfs = getLfs(); - private static final FileContext getLfs() { + private static final FileContext LFS = getLfs(); + private static FileContext getLfs() { try { return FileContext.getLocalFSFileContext(); } catch (UnsupportedFileSystemException e) { throw new RuntimeException(e); } } - private static final Path base = - lfs.makeQualified(new Path("target", TestDeletionService.class.getName())); + private static final Path BASE = + LFS.makeQualified(new Path("target", + TestDeletionService.class.getName())); @AfterClass public static void removeBase() throws IOException { - lfs.delete(base, true); + LFS.delete(BASE, true); } - public List buildDirs(Random r, Path root, int numpaths) + private List buildDirs(Random r, Path root, int numpaths) throws IOException { - ArrayList ret = new ArrayList(); + ArrayList ret = new ArrayList<>(); for (int i = 0; i < numpaths; ++i) { Path p = root; long name = r.nextLong(); @@ -73,13 +79,14 @@ public static void removeBase() throws IOException { return ret; } - public void createDirs(Path base, List dirs) throws IOException { + private void createDirs(Path myBase, List dirs) throws IOException { for (Path dir : dirs) { - lfs.mkdir(new Path(base, dir), null, true); + LFS.mkdir(new Path(myBase, dir), null, true); } } - static class FakeDefaultContainerExecutor extends DefaultContainerExecutor { + private static class FakeDefaultContainerExecutor extends + DefaultContainerExecutor { @Override public void deleteAsUser(DeletionAsUserContext ctx) throws IOException, InterruptedException { @@ -113,7 +120,7 @@ public void testAbsDelete() throws Exception { long seed = r.nextLong(); r.setSeed(seed); System.out.println("SEED: " + seed); - List dirs = buildDirs(r, base, 20); + List dirs = buildDirs(r, BASE, 20); createDirs(new Path("."), dirs); FakeDefaultContainerExecutor exec = new FakeDefaultContainerExecutor(); Configuration conf = new Configuration(); @@ -124,16 +131,16 @@ public void testAbsDelete() throws Exception { try { for (Path p : dirs) { del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", - p, null); + p); } int msecToWait = 20 * 1000; for (Path p : dirs) { - while (msecToWait > 0 && lfs.util().exists(p)) { + while (msecToWait > 0 && LFS.util().exists(p)) { Thread.sleep(100); msecToWait -= 100; } - assertFalse(lfs.util().exists(p)); + assertFalse(LFS.util().exists(p)); } } finally { del.stop(); @@ -146,19 +153,19 @@ public void testRelativeDelete() throws Exception { long seed = r.nextLong(); r.setSeed(seed); System.out.println("SEED: " + seed); - List baseDirs = buildDirs(r, base, 4); + List baseDirs = buildDirs(r, BASE, 4); createDirs(new Path("."), baseDirs); List content = buildDirs(r, new Path("."), 10); for (Path b : baseDirs) { createDirs(b, content); } DeletionService del = - new DeletionService(new FakeDefaultContainerExecutor()); + new DeletionService(new FakeDefaultContainerExecutor()); try { del.init(new Configuration()); del.start(); for (Path p : content) { - assertTrue(lfs.util().exists(new Path(baseDirs.get(0), p))); + assertTrue(LFS.util().exists(new Path(baseDirs.get(0), p))); del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p, baseDirs.toArray(new Path[4])); } @@ -167,11 +174,11 @@ public void testRelativeDelete() throws Exception { for (Path p : baseDirs) { for (Path q : content) { Path fp = new Path(p, q); - while (msecToWait > 0 && lfs.util().exists(fp)) { + while (msecToWait > 0 && LFS.util().exists(fp)) { Thread.sleep(100); msecToWait -= 100; } - assertFalse(lfs.util().exists(fp)); + assertFalse(LFS.util().exists(fp)); } } } finally { @@ -179,42 +186,102 @@ public void testRelativeDelete() throws Exception { } } + /** + * Test, if we can disable deleting files. + * @throws Exception any exception + */ @Test - public void testNoDelete() throws Exception { + public void testCustomDisableDelete() throws Exception { + final int almostForever = Integer.MAX_VALUE; + final int fileCount = 20; Random r = new Random(); long seed = r.nextLong(); r.setSeed(seed); - System.out.println("SEED: " + seed); - List dirs = buildDirs(r, base, 20); + List dirs = buildDirs(r, BASE, fileCount); createDirs(new Path("."), dirs); FakeDefaultContainerExecutor exec = new FakeDefaultContainerExecutor(); Configuration conf = new Configuration(); - conf.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, -1); + conf.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, almostForever); + conf.setInt(YarnConfiguration.DEBUG_NM_MAX_PER_APPLICATION_DELETE_DELAY_SEC, + Integer.MAX_VALUE); exec.setConf(conf); DeletionService del = new DeletionService(exec); try { del.init(conf); del.start(); for (Path p : dirs) { - del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p, - null); + del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p); } - int msecToWait = 20 * 1000; + + // Verify that all deletions are delayed + Thread.sleep(200); + assertEquals(fileCount, + del.getSched().getTaskCount() - + del.getSched().getCompletedTaskCount()); + // Verify that nothing was deleted for (Path p : dirs) { - while (msecToWait > 0 && lfs.util().exists(p)) { - Thread.sleep(100); - msecToWait -= 100; - } - assertTrue(lfs.util().exists(p)); + assertTrue(LFS.util().exists(p)); } } finally { del.stop(); } } + /** + * Test, if we can apply a retention policy on the fly. + * @throws Exception An arbitrary exception + */ + @Test + public void testCustomRetentionPolicy() throws Exception { + int debugDelay = 1; + Random r = new Random(); + long seed = r.nextLong(); + r.setSeed(seed); + List dirs = buildDirs(r, BASE, 20); + createDirs(new Path("."), dirs); + FakeDefaultContainerExecutor exec = new FakeDefaultContainerExecutor(); + Configuration conf = new Configuration(); + conf.setInt(YarnConfiguration + .DEBUG_NM_MAX_PER_APPLICATION_DELETE_DELAY_SEC, debugDelay); + exec.setConf(conf); + DeletionService del = new DeletionService(exec); + try { + del.init(conf); + del.start(); + + long start = System.currentTimeMillis(); + + for (Path p : dirs) { + // Specify that the files should be kept for a second + del.deleteWithDelay( + (Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", + debugDelay, p); + } + + // Wait until all files are deleted + boolean allDeleted; + do { + allDeleted = true; + for (Path p : dirs) { + allDeleted = allDeleted && !LFS.util().exists(p); + } + Thread.sleep(10); + } while (!allDeleted && + System.currentTimeMillis() - start < 20000); + + long end = System.currentTimeMillis(); + long diff = end - start; + //Make sure we waited for the specified amount of time + assertTrue(diff >= debugDelay * 1000); + } finally { + del.stop(); + } + } + @Test public void testStopWithDelayedTasks() throws Exception { - DeletionService del = new DeletionService(Mockito.mock(ContainerExecutor.class)); + DeletionService del = + new DeletionService(Mockito.mock(ContainerExecutor.class)); Configuration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 60); try { @@ -241,7 +308,7 @@ public void testFileDeletionTaskDependency() throws Exception { long seed = r.nextLong(); r.setSeed(seed); System.out.println("SEED: " + seed); - List dirs = buildDirs(r, base, 2); + List dirs = buildDirs(r, BASE, 2); createDirs(new Path("."), dirs); // first we will try to delete sub directories which are present. This @@ -250,10 +317,10 @@ public void testFileDeletionTaskDependency() throws Exception { FileDeletionTask dependentDeletionTask = del.createFileDeletionTask(null, dirs.get(0), new Path[] {}); - List deletionTasks = new ArrayList(); + List deletionTasks = new ArrayList<>(); for (Path subDir : subDirs) { FileDeletionTask deletionTask = - del.createFileDeletionTask(null, null, new Path[] { subDir }); + del.createFileDeletionTask(null, null, new Path[] {subDir}); deletionTask.addFileDeletionTaskDependency(dependentDeletionTask); deletionTasks.add(deletionTask); } @@ -262,11 +329,11 @@ public void testFileDeletionTaskDependency() throws Exception { } int msecToWait = 20 * 1000; - while (msecToWait > 0 && (lfs.util().exists(dirs.get(0)))) { + while (msecToWait > 0 && (LFS.util().exists(dirs.get(0)))) { Thread.sleep(100); msecToWait -= 100; } - assertFalse(lfs.util().exists(dirs.get(0))); + assertFalse(LFS.util().exists(dirs.get(0))); // Now we will try to delete sub directories; one of the deletion task we @@ -276,10 +343,10 @@ public void testFileDeletionTaskDependency() throws Exception { dependentDeletionTask = del.createFileDeletionTask(null, dirs.get(1), new Path[] {}); - deletionTasks = new ArrayList(); + deletionTasks = new ArrayList<>(); for (Path subDir : subDirs) { FileDeletionTask deletionTask = - del.createFileDeletionTask(null, null, new Path[] { subDir }); + del.createFileDeletionTask(null, null, new Path[] {subDir}); deletionTask.addFileDeletionTaskDependency(dependentDeletionTask); deletionTasks.add(deletionTask); } @@ -291,12 +358,12 @@ public void testFileDeletionTaskDependency() throws Exception { msecToWait = 20 * 1000; while (msecToWait > 0 - && (lfs.util().exists(subDirs.get(0)) || lfs.util().exists( + && (LFS.util().exists(subDirs.get(0)) || LFS.util().exists( subDirs.get(1)))) { Thread.sleep(100); msecToWait -= 100; } - assertTrue(lfs.util().exists(dirs.get(1))); + assertTrue(LFS.util().exists(dirs.get(1))); } finally { del.stop(); } @@ -308,7 +375,7 @@ public void testRecovery() throws Exception { long seed = r.nextLong(); r.setSeed(seed); System.out.println("SEED: " + seed); - List baseDirs = buildDirs(r, base, 4); + List baseDirs = buildDirs(r, BASE, 4); createDirs(new Path("."), baseDirs); List content = buildDirs(r, new Path("."), 10); for (Path b : baseDirs) { @@ -321,12 +388,12 @@ public void testRecovery() throws Exception { stateStore.init(conf); stateStore.start(); DeletionService del = - new DeletionService(new FakeDefaultContainerExecutor(), stateStore); + new DeletionService(new FakeDefaultContainerExecutor(), stateStore); try { del.init(conf); del.start(); for (Path p : content) { - assertTrue(lfs.util().exists(new Path(baseDirs.get(0), p))); + assertTrue(LFS.util().exists(new Path(baseDirs.get(0), p))); del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p, baseDirs.toArray(new Path[4])); } @@ -343,11 +410,11 @@ public void testRecovery() throws Exception { for (Path p : baseDirs) { for (Path q : content) { Path fp = new Path(p, q); - while (msecToWait > 0 && lfs.util().exists(fp)) { + while (msecToWait > 0 && LFS.util().exists(fp)) { Thread.sleep(100); msecToWait -= 100; } - assertFalse(lfs.util().exists(fp)); + assertFalse(LFS.util().exists(fp)); } } } finally { @@ -355,4 +422,83 @@ public void testRecovery() throws Exception { stateStore.close(); } } + + private DeletionService createMockDeletionService(int defaultDelay, + int maxDelay) + throws Exception { + FakeDefaultContainerExecutor exec = new FakeDefaultContainerExecutor(); + Configuration conf = new Configuration(); + conf.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, + defaultDelay); + conf.setInt(YarnConfiguration.DEBUG_NM_MAX_PER_APPLICATION_DELETE_DELAY_SEC, + maxDelay); + exec.setConf(conf); + DeletionService del = new DeletionService(exec); + del.serviceInit(conf); + return del; + } + + /** + * Assert that a configuration results in an exception thrown. + * @param defaultDelay default delay from configuration + * @param maxDelay maximum delay from configuration + */ + private void assertConfigurationError(int defaultDelay, + int maxDelay) { + try { + createMockDeletionService(defaultDelay, maxDelay); + Assert.fail("Ignored invalid configuration"); + } catch (Exception e) { + Assert.assertTrue(e instanceof YarnException); + } + } + + /** + * Test that we catch invalid configuration errors. + * @throws Exception unexpected exceptions fail the test + */ + @Test + public void testInvalidConfigurationErrors() throws Exception { + assertConfigurationError(-1000, 0); + assertConfigurationError(0, -1000); + } + + /** + * Check that the expected delay is calculated. + * @param expectedDelay expected value + * @param defaultDelay default value from configuration + * @param customDelay custom value from client + * @param maxDelay maximum value from configuration + * @throws Exception unknown error + */ + private void checkDelay(int expectedDelay, + int defaultDelay, + int customDelay, + int maxDelay) + throws Exception { + Assert.assertEquals( + expectedDelay, + createMockDeletionService(defaultDelay, maxDelay) + .getEffectiveDelaySec(customDelay)); + } + + /** + * Test that we calculate the correct effective delay. + * @throws Exception unexpected exceptions fail the test + */ + @Test + public void testEffectiveDelay() throws Exception { + final int forever = Integer.MAX_VALUE; + // expected, default, custom, max + checkDelay(0, 0, 0, forever); + checkDelay(0, 0, -2, forever); + checkDelay(forever, forever, 0, forever); + checkDelay(forever, forever, forever, forever); + checkDelay(10, 0, 20, 10); + checkDelay(10, 0, forever, 10); + checkDelay(20, 10, 20, forever); + checkDelay(20, 20, 10, forever); + checkDelay(20, 20, 0, forever); + checkDelay(20, 0, 20, forever); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 6dd1ac7..1200109 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -264,11 +264,12 @@ protected NMTokenIdentifier selectNMTokenIdentifier( protected DeletionService createDeletionService() { return new DeletionService(exec) { @Override - public void delete(String user, Path subDir, Path... baseDirs) { + public void deleteWithDelay(String deleteAs, int debugDelaySec, + Path subDir, Path... baseDirs) { // Don't do any deletions. - LOG.info("Psuedo delete: user - " + user + ", subDir - " + subDir + LOG.info("Psuedo delete: user - " + deleteAs + ", subDir - " + subDir + ", baseDirs - " + Arrays.asList(baseDirs)); - }; + } }; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 0c083f2..8915048 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -31,8 +31,10 @@ import java.io.PrintWriter; import java.net.InetAddress; import java.nio.ByteBuffer; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -47,6 +49,7 @@ import org.apache.hadoop.service.Service; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; @@ -92,6 +95,7 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; @@ -215,7 +219,7 @@ public void testContainerSetup() throws Exception { rsrc_alpha.setType(LocalResourceType.FILE); rsrc_alpha.setTimestamp(file.lastModified()); String destinationFile = "dest_file"; - Map localResources = + Map localResources = new HashMap(); localResources.put(destinationFile, rsrc_alpha); containerLaunchContext.setLocalResources(localResources); @@ -1109,22 +1113,22 @@ public void testLocalFilesCleanup() throws InterruptedException, + " doesn't exist!!", appDir.exists()); Assert.assertTrue("AppSysDir " + appSysDir.getAbsolutePath() + " doesn't exist!!", appSysDir.exists()); - for (File f : new File[] { containerDir, containerSysDir }) { + for (File f : new File[] {containerDir, containerSysDir}) { Assert.assertFalse(f.getAbsolutePath() + " exists!!", f.exists()); } Assert.assertFalse(targetFile.getAbsolutePath() + " exists!!", targetFile.exists()); // Simulate RM sending an AppFinish event. - containerManager.handle(new CMgrCompletedAppsEvent(Arrays - .asList(new ApplicationId[] { appId }), CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN)); + containerManager.handle(new CMgrCompletedAppsEvent(Collections. + singletonList(appId), CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN)); BaseContainerManagerTest.waitForApplicationState(containerManager, cId.getApplicationAttemptId().getApplicationId(), ApplicationState.FINISHED); // Now ascertain that the resources are localised correctly. - for (File f : new File[] { appDir, containerDir, appSysDir, + for (File f : new File[] {appDir, containerDir, appSysDir, containerSysDir }) { // Wait for deletion. Deletion can happen long after AppFinish because of // the async DeletionService @@ -1143,6 +1147,265 @@ public void testLocalFilesCleanup() throws InterruptedException, targetFile.exists()); } + /** + * Verify whether the container dir exists or not. + * @param message description of what is being tested + * @param appIDStr application ID + * @param containerIDStr container ID + * @param shouldExist test existence or absence + */ + private void verifyContainerDir(String message, String appIDStr, + String containerIDStr, boolean shouldExist) { + File userCacheDir = new File(localDir, ContainerLocalizer.USERCACHE); + File userDir = new File(userCacheDir, user); + File appCache = new File(userDir, ContainerLocalizer.APPCACHE); + File appDir = new File(appCache, appIDStr); + File containerDir = new File(appDir, containerIDStr); + + File sysDir = + new File(localDir, + ResourceLocalizationService.NM_PRIVATE_DIR); + File appSysDir = new File(sysDir, appIDStr); + File containerSysDir = new File(appSysDir, containerIDStr); + + if (shouldExist) { + // ContainerDir should still exist + Assert.assertTrue(message + " ContainerDir " + + containerDir.getAbsolutePath() + + " doesn't exist", containerDir.exists()); + Assert.assertTrue(message + " ContainerSysDir " + containerSysDir + .getAbsolutePath() + + " doesn't exist", containerSysDir.exists()); + } else { + // ContainerDir should not exist + Assert.assertFalse(message + " ContainerDir " + + containerDir.getAbsolutePath() + + " exists", containerDir.exists()); + Assert.assertFalse(message + " ContainerSysDir " + + containerSysDir.getAbsolutePath() + + " exists", containerSysDir.exists()); + } + } + + /** + * Verify whether the application dir exists or not. + * @param message description of what is being tested + * @param appIDStr application ID + * @param shouldExist test existence or absence + */ + private void verifyAppDir(String message, String appIDStr, + boolean shouldExist) { + File userCacheDir = new File(localDir, ContainerLocalizer.USERCACHE); + File userDir = new File(userCacheDir, user); + File appCache = new File(userDir, ContainerLocalizer.APPCACHE); + File appDir = new File(appCache, appIDStr); + + File sysDir = + new File(localDir, + ResourceLocalizationService.NM_PRIVATE_DIR); + File appSysDir = new File(sysDir, appIDStr); + + if (shouldExist) { + // appDir should still exist + Assert.assertTrue(message + " appDir " + appDir.getAbsolutePath() + + " doesn't exist", appDir.exists()); + Assert.assertTrue("appSysDir " + appSysDir.getAbsolutePath() + + " doesn't exist", appSysDir.exists()); + } else { + // appDir should not exist + Assert.assertFalse(message + " appDir " + appDir.getAbsolutePath() + + " exists", appDir.exists()); + Assert.assertFalse("appSysDir " + appSysDir.getAbsolutePath() + + " exists", appSysDir.exists()); + } + } + + /** + * Wait for an application directory deleted or the timeout elapsed. + * @param appIDStr application id + * @param timeoutMs timeout in milliseconds + * @throws InterruptedException the thread wait was interrupted + */ + private void waitForApplicationDirDeleted(String appIDStr, long timeoutMs) + throws InterruptedException { + File userCacheDir = new File(localDir, ContainerLocalizer.USERCACHE); + File userDir = new File(userCacheDir, user); + File appCache = new File(userDir, ContainerLocalizer.APPCACHE); + File appDir = new File(appCache, appIDStr); + + File sysDir = + new File(localDir, + ResourceLocalizationService.NM_PRIVATE_DIR); + File appSysDir = new File(sysDir, appIDStr); + + // wait for the deletion of the directory + long start = System.currentTimeMillis(); + while ((appDir.exists() || appSysDir.exists()) && + System.currentTimeMillis() - start < timeoutMs) { + Thread.sleep(10); + } + } + + /** + * Run a container with delay, so that delayed deletion of resources can be + * tested. + * @param debugDelaySec delay container deletion with this number of seconds + * @return the application that ran + * @throws IOException file I/O failed + * @throws YarnException general YARN execution error + * @throws InterruptedException thread interrupted + */ + private Map.Entry runContainerWithDebugDelay( + int debugDelaySec) + throws IOException, YarnException, InterruptedException { + conf.setInt( + YarnConfiguration.DEBUG_NM_MAX_PER_APPLICATION_DELETE_DELAY_SEC, + debugDelaySec); + delSrvc = new DeletionService(exec); + delSrvc.init(conf); + containerManager = createContainerManager(delSrvc); + containerManager.init(conf); + containerManager.start(); + + List list = new ArrayList<>(); + + // Create a delayed cleanup container that sleeps for a second + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + Map userSetEnv = new HashMap(); + userSetEnv.put(ApplicationConstants.Environment.DEBUG_DELETE_DELAY.key(), + Integer.toString(debugDelaySec)); + containerLaunchContext.setEnvironment(userSetEnv); + containerLaunchContext.setCommands(Collections.singletonList("sleep 1")); + + ContainerId cId = createContainerId(0); + ApplicationId appId = cId.getApplicationAttemptId().getApplicationId(); + + StartContainerRequest scRequest = + StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(cId, DUMMY_RM_IDENTIFIER, context + .getNodeId(), + user, context.getContainerTokenSecretManager())); + list.add(scRequest); + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + BaseContainerManagerTest.waitForContainerState(containerManager, cId, + ContainerState.COMPLETE); + + // Simulate RM closing the app after the containers finished + Application app = + containerManager.getContext().getApplications().get(appId); + List appIds = Collections.singletonList(appId); + containerManager.handle(new CMgrCompletedAppsEvent(appIds, + CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER)); + + return new AbstractMap.SimpleEntry<>(app, cId); + } + + /** + * Launch a container in debug mode keeping the container files for a few + * seconds. + * @throws IOException file I/O failed + * @throws YarnException general YARN execution error + * @throws InterruptedException thread interrupted + */ + @Test + public void testDelayedDeletionContainerOnly() + throws IOException, YarnException, InterruptedException { + final int debugDelaySec = 3; + + // Run app with container and wait until it is completed + Map.Entry pair = runContainerWithDebugDelay( + debugDelaySec); + String appId = pair.getKey().getAppId().toString(); + String cId = pair.getValue().toString(); + + // Verify that containers are kept as specified in the policy + Thread.sleep(debugDelaySec * 1000 / 2); + verifyContainerDir("Verify that the container directory is still alive by" + + " deletion policy", appId, cId, true); + + // Verify that containers are get eventually deleted + Thread.sleep(debugDelaySec * 1000 / 2); + waitForApplicationDirDeleted(appId, 5000); + verifyContainerDir("Verify that the container directory is deleted by " + + "deletion policy", appId, cId, false); + } + + /** + * Launch a container in debug mode keeping the container files for a few + * seconds. + * Test that application dir deletion does not override the specified delay + * @throws IOException file I/O failed + * @throws YarnException general YARN execution error + * @throws InterruptedException thread interrupted + */ + @Test + public void testDelayedDeletion() + throws IOException, YarnException, InterruptedException { + final int debugDelaySec = 3; + + // Run app with container and wait until it is completed + Map.Entry pair = runContainerWithDebugDelay( + debugDelaySec); + String appId = pair.getKey().getAppId().toString(); + String cId = pair.getValue().toString(); + + int retry = 150; + while(--retry > 0 && + pair.getKey().getApplicationState()==ApplicationState.RUNNING) { + Thread.sleep(10); + } + + // Verify that containers are kept as specified in the policy + Thread.sleep(debugDelaySec * 1000 / 2); + verifyAppDir("Verify that the application directory is still alive by " + + "policy", appId, true); + verifyContainerDir("Verify that the container directory is still alive by" + + " policy", appId, cId, true); + + // Verify that containers are get eventually deleted with the application + Thread.sleep(debugDelaySec * 1000 / 2); + waitForApplicationDirDeleted(appId, 5000); + verifyContainerDir("Verify that the container directory is deleted by " + + "policy", appId, cId, false); + verifyAppDir("Verify that the application directory is deleted by " + + "policy", appId, false); + } + + /** + * Launch a container in debug mode keeping the container files for a + * very long time. + * Test that application dir deletion does not override the specified delay + * @throws IOException file I/O failed + * @throws YarnException general YARN execution error + * @throws InterruptedException thread interrupted + */ + @Test + public void testDelayedKeep() throws IOException, YarnException, + InterruptedException { + // Run app with container and wait until it is completed + Map.Entry pair = runContainerWithDebugDelay( + Integer.MAX_VALUE); + String appId = pair.getKey().getAppId().toString(); + String cId = pair.getValue().toString(); + + int retry = 150; + while(--retry > 0 && + pair.getKey().getApplicationState()==ApplicationState.RUNNING) { + Thread.sleep(10); + } + + // Verify that containers are kept indefinitely + Thread.sleep(200); + verifyAppDir("Verify that the application is kept forever", appId, true); + verifyContainerDir("Verify that the container is kept forever", appId, + cId, true); + } + @Test public void testContainerLaunchFromPreviousRM() throws IOException, InterruptedException, YarnException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java index 7f06afa..b4df427 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java @@ -120,7 +120,8 @@ public long getVCoresAllocatedForContainers() { protected DeletionService createDeletionService() { return new DeletionService(exec) { @Override - public void delete(String user, Path subDir, Path... baseDirs) { + public void deleteWithDelay(String user, int debugDelaySec, Path subDir, + Path... baseDirs) { // Don't do any deletions. if (shouldDeleteWait) { try { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java index 8feca21..6bc3812 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.webapp; +import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -99,4 +100,8 @@ public String getFlowVersion() { public long getFlowRunId() { return flowRunId; } + + public Date getDelayedDeletionTime() { + return null; + } }