From 520198518466128ba8611bc284fabbf8f0ef0c43 Mon Sep 17 00:00:00 2001 From: Prabhu Joseph Date: Sat, 30 Mar 2019 09:43:11 +0530 Subject: [PATCH] YARN-9227 --- .../distributedshell/ApplicationMaster.java | 19 ++++++++++- .../yarn/applications/distributedshell/Client.java | 29 ++++++++++------- .../distributedshell/TestDistributedShell.java | 37 ++++++++++++++++++++++ 3 files changed, 73 insertions(+), 12 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 5d437c9..f4d061a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -389,8 +389,9 @@ */ public static void main(String[] args) { boolean result = false; + ApplicationMaster appMaster = null; try { - ApplicationMaster appMaster = new ApplicationMaster(); + appMaster = new ApplicationMaster(); LOG.info("Initializing ApplicationMaster"); boolean doRun = appMaster.init(args); if (!doRun) { @@ -402,6 +403,10 @@ public static void main(String[] args) { LOG.error("Error running ApplicationMaster", t); LogManager.shutdown(); ExitUtil.terminate(1, t); + } finally { + if (appMaster != null) { + appMaster.cleanup(); + } } if (result) { LOG.info("Application Master completed successfully. exiting"); @@ -768,6 +773,18 @@ private void printUsage(Options opts) { new HelpFormatter().printHelp("ApplicationMaster", opts); } + private void cleanup() { + Path dst = null; + try { + FileSystem fs = FileSystem.get(conf); + dst = new Path(fs.getHomeDirectory(), getRelativePath(appName, + appId.toString(), "")); + fs.delete(dst, true); + } catch(IOException e) { + LOG.warn("Failed to remove application staging directory {}", dst); + } + } + /** * Main run function for the application master * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index ef00e3e..08c6b83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -96,6 +96,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -147,6 +148,7 @@ private YarnClient yarnClient; // Application master specific info to register a new Application with RM/ASM private String appName = ""; + private ApplicationId applicationId; // App master priority private int amPriority = 0; // Queue for App master @@ -759,7 +761,7 @@ public boolean run() throws IOException, YarnException { // set the application name ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); - ApplicationId appId = appContext.getApplicationId(); + applicationId = appContext.getApplicationId(); // Set up resource type requirements // For now, both memory and vcores are supported, so we set memory and @@ -800,13 +802,13 @@ public boolean run() throws IOException, YarnException { // Copy the application master jar to the filesystem // Create a local resource to point to the destination jar path FileSystem fs = FileSystem.get(conf); - addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(), - localResources, null); + addToLocalResources(fs, appMasterJar, appMasterJarPath, + applicationId.toString(), localResources, null); // Set the log4j properties if needed if (!log4jPropFile.isEmpty()) { - addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(), - localResources, null); + addToLocalResources(fs, log4jPropFile, log4jPath, + applicationId.toString(), localResources, null); } // Process local files for localization @@ -833,7 +835,7 @@ public boolean run() throws IOException, YarnException { try { String fileName = f.getName(); - uploadFile(fs, path, fileName, appId.toString()); + uploadFile(fs, path, fileName, applicationId.toString()); if (localizableFiles.length() == 0) { localizableFiles.append(fileName); } else { @@ -857,7 +859,7 @@ public boolean run() throws IOException, YarnException { Path shellSrc = new Path(shellScriptPath); String shellPathSuffix = ApplicationMaster.getRelativePath(appName, - appId.toString(), + applicationId.toString(), SCRIPT_PATH); Path shellDst = new Path(fs.getHomeDirectory(), shellPathSuffix); @@ -869,12 +871,12 @@ public boolean run() throws IOException, YarnException { } if (!shellCommand.isEmpty()) { - addToLocalResources(fs, null, shellCommandPath, appId.toString(), + addToLocalResources(fs, null, shellCommandPath, applicationId.toString(), localResources, shellCommand); } if (shellArgs.length > 0) { - addToLocalResources(fs, null, shellArgsPath, appId.toString(), + addToLocalResources(fs, null, shellArgsPath, applicationId.toString(), localResources, StringUtils.join(shellArgs, " ")); } @@ -1033,7 +1035,7 @@ public boolean run() throws IOException, YarnException { if (dockerClientConfig != null) { dockerCredentials = DockerClientConfigHandler.readCredentialsFromConfigFile( - new Path(dockerClientConfig), conf, appId.toString()); + new Path(dockerClientConfig), conf, applicationId.toString()); } if (rmCredentials != null || dockerCredentials != null) { @@ -1071,7 +1073,7 @@ public boolean run() throws IOException, YarnException { // app submission failure? // Monitor the application - return monitorApplication(appId); + return monitorApplication(applicationId); } @@ -1200,6 +1202,11 @@ private void uploadFile(FileSystem fs, String fileSrcPath, fs.copyFromLocalFile(new Path(fileSrcPath), dst); } + @VisibleForTesting + ApplicationId getAppId() { + return applicationId; + } + private void prepareTimelineDomain() { TimelineClient timelineClient = null; if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 9252ee5..025e543 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.applications.distributedshell; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -1809,4 +1810,40 @@ public void testDistributedShellWithNonExistentFileLocalization() client.init(args); client.run(); } + + + @Test + public void testDistributedShellCleanup() + throws Exception { + String appName = "DistributedShellCleanup"; + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "1", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--appname", + appName + }; + Configuration config = new Configuration(yarnCluster.getConfig()); + Client client = new Client(config); + client.init(args); + client.run(); + ApplicationId appId = client.getAppId(); + String relativePath = + ApplicationMaster.getRelativePath(appName, appId.toString(), ""); + FileSystem fs1 = FileSystem.get(config); + Path path = new Path(fs1.getHomeDirectory(), relativePath); + + GenericTestUtils.waitFor(() -> { + try { + return !fs1.exists(path); + } catch (IOException e) { + return false; + } + }, 10, 60000); + + assertFalse("Distributed Shell Cleanup failed", fs1.exists(path)); + } } -- 2.7.4 (Apple Git-66)