diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 29442bc..9b6e788 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -37,7 +37,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; @@ -89,6 +88,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; +import com.google.common.annotations.VisibleForTesting; + /** * An ApplicationMaster for executing shell commands on a set of launched * containers using the YARN framework. @@ -169,7 +170,8 @@ private NMCallbackHandler containerListener; // Application Attempt Id ( combination of attemptId and fail count ) - private ApplicationAttemptId appAttemptID; + @VisibleForTesting + protected ApplicationAttemptId appAttemptID; // TODO // For status update for clients - yet to be implemented @@ -194,13 +196,15 @@ private AtomicInteger numCompletedContainers = new AtomicInteger(); // Allocated container count so that we know how many containers has the RM // allocated to us - private AtomicInteger numAllocatedContainers = new AtomicInteger(); + @VisibleForTesting + protected AtomicInteger numAllocatedContainers = new AtomicInteger(); // Count of failed containers private AtomicInteger numFailedContainers = new AtomicInteger(); // Count of containers already requested from the RM // Needed as once requested, we should not request for containers again. // Only request for more if the original requirement changes. - private AtomicInteger numRequestedContainers = new AtomicInteger(); + @VisibleForTesting + protected AtomicInteger numRequestedContainers = new AtomicInteger(); // Shell command to be executed private String shellCommand = ""; @@ -251,6 +255,7 @@ public static void main(String[] args) { System.exit(0); } result = appMaster.run(); + appMaster.finish(); } catch (Throwable t) { LOG.fatal("Error running ApplicationMaster", t); System.exit(1); @@ -537,26 +542,25 @@ public boolean run() throws YarnException, IOException { containerVirtualCores = maxVCores; } + List previousAMRunningContainers = + response.getContainersFromPreviousAttempt(); + LOG.info("Received " + previousAMRunningContainers.size() + + " previous AM's running containers on AM registration."); + numAllocatedContainers.addAndGet(previousAMRunningContainers.size()); + + int numTotalContainersToRequest = + numTotalContainers - previousAMRunningContainers.size(); // Setup ask for containers from RM // Send request for containers to RM // Until we get our fully allocated quota, we keep on polling RM for // containers // Keep looping until all the containers are launched and shell script // executed on them ( regardless of success/failure). - for (int i = 0; i < numTotalContainers; ++i) { + for (int i = 0; i < numTotalContainersToRequest; ++i) { ContainerRequest containerAsk = setupContainerAskForRM(); amRMClient.addContainerRequest(containerAsk); } - numRequestedContainers.set(numTotalContainers); - - while (!done - && (numCompletedContainers.get() != numTotalContainers)) { - try { - Thread.sleep(200); - } catch (InterruptedException ex) {} - } - finish(); - + numRequestedContainers.set(numTotalContainersToRequest); return success; } @@ -565,7 +569,15 @@ NMCallbackHandler createNMCallbackHandler() { return new NMCallbackHandler(this); } - private void finish() { + protected void finish() { + // wait for completion. + while (!done + && (numCompletedContainers.get() != numTotalContainers)) { + try { + Thread.sleep(200); + } catch (InterruptedException ex) {} + } + // Join all launched threads // needed for when we time out // and we need to release containers diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 2257c42..333486d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -162,6 +162,9 @@ // Timeout threshold for client. Kill app after time interval expires. private long clientTimeout = 600000; + // flag to indicate whether to keep containers across application attempts. + private boolean keepContainers = false; + // Debug flag boolean debugFlag = false; @@ -243,6 +246,11 @@ public Client(Configuration conf) throws Exception { opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); opts.addOption("log_properties", true, "log4j.properties file"); + opts.addOption("keep_containers_across_application_attempts", false, + "Flag to indicate whether to keep containers across application attempts." + + " If the flag is true, running containers will not be killed when" + + " application attempt fails and these containers will be retrieved by" + + " the new application attempt "); opts.addOption("debug", false, "Dump out debug information"); opts.addOption("help", false, "Print usage"); @@ -294,12 +302,17 @@ public boolean init(String[] args) throws ParseException { } + if (cliParser.hasOption("keep_containers_across_application_attempts")) { + LOG.info("keep_containers_across_application_attempts"); + keepContainers = true; + } + appName = cliParser.getOptionValue("appname", "DistributedShell"); amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); amQueue = cliParser.getOptionValue("queue", "default"); amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10")); amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1")); - + if (amMemory < 0) { throw new IllegalArgumentException("Invalid memory specified for application master, exiting." + " Specified memory=" + amMemory); @@ -442,6 +455,8 @@ public boolean run() throws IOException, YarnException { // set the application name ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); ApplicationId appId = appContext.getApplicationId(); + + appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); appContext.setApplicationName(appName); // Set up the container launch context for the application master diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/ContainerLaunchFailAppMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/ContainerLaunchFailAppMaster.java index 2692fff..e845490 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/ContainerLaunchFailAppMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/ContainerLaunchFailAppMaster.java @@ -67,6 +67,7 @@ public static void main(String[] args) { System.exit(0); } result = appMaster.run(); + appMaster.finish(); } catch (Throwable t) { LOG.fatal("Error running ApplicationMaster", t); System.exit(1); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSFailedAppMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSFailedAppMaster.java new file mode 100644 index 0000000..644f667 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSFailedAppMaster.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.exceptions.YarnException; + +public class TestDSFailedAppMaster extends ApplicationMaster { + + private static final Log LOG = LogFactory.getLog(TestDSFailedAppMaster.class); + + @Override + public boolean run() throws YarnException, IOException { + boolean res = super.run(); + + // for the 2nd attempt. + if (appAttemptID.getAttemptId() == 2) { + // should reuse the earlier running container, so numAllocatedContainers + // should be set to 1. And should ask no more containers, so + // numRequestedContainers should be set to 0. + if (numAllocatedContainers.get() != 1 + || numRequestedContainers.get() != 0) { + LOG.info("Application Master failed. exiting"); + System.exit(200); + } + } + return res; + } + + public static void main(String[] args) { + boolean result = false; + try { + TestDSFailedAppMaster appMaster = new TestDSFailedAppMaster(); + boolean doRun = appMaster.init(args); + if (!doRun) { + System.exit(0); + } + result = appMaster.run(); + if (appMaster.appAttemptID.getAttemptId() == 1) { + try { + // sleep some time, wait for the AM to launch a container. + Thread.sleep(3000); + } catch (InterruptedException e) {} + // fail the first am. + System.exit(100); + } + appMaster.finish(); + } catch (Throwable t) { + System.exit(1); + } + if (result) { + LOG.info("Application Master completed successfully. exiting"); + System.exit(0); + } else { + LOG.info("Application Master failed. exiting"); + System.exit(2); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 7efe8e8..97522f3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -175,6 +175,35 @@ public void run() { } @Test(timeout=90000) + public void testDSRestartWithPreviousRunningContainers() throws Exception { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "1", + "--shell_command", + Shell.WINDOWS ? "timeout 8" : "sleep 8", + "--master_memory", + "512", + "--container_memory", + "128", + "--keep_containers_across_application_attempts" + }; + + LOG.info("Initializing DS Client"); + Client client = new Client(TestDSFailedAppMaster.class.getName(), + new Configuration(yarnCluster.getConfig())); + + client.init(args); + LOG.info("Running DS Client"); + boolean result = client.run(); + + LOG.info("Client run completed. Result=" + result); + // application should succeed + Assert.assertTrue(result); + } + + @Test(timeout=90000) public void testDSShellWithCustomLogPropertyFile() throws Exception { final File basedir = new File("target", TestDistributedShell.class.getName());