diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 959ba1c..fa6eb90 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; @@ -281,8 +282,8 @@ private void dumpOutDebugInfo() { } } - public ApplicationMaster() throws Exception { - // Set up the configuration and RPC + public ApplicationMaster() { + // Set up the configuration conf = new YarnConfiguration(); } @@ -470,7 +471,7 @@ public boolean run() throws YarnException, IOException { amRMClient.init(conf); amRMClient.start(); - containerListener = new NMCallbackHandler(); + containerListener = createNMCallbackHandler(); nmClientAsync = new NMClientAsyncImpl(containerListener); nmClientAsync.init(conf); nmClientAsync.start(); @@ -500,7 +501,6 @@ public boolean run() throws YarnException, IOException { containerMemory = maxMem; } - // Setup ask for containers from RM // Send request for containers to RM // Until we get our fully allocated quota, we keep on polling RM for @@ -513,7 +513,8 @@ public boolean run() throws YarnException, IOException { } numRequestedContainers.set(numTotalContainers); - while (!done) { + while (!done + && (numCompletedContainers.get() != numTotalContainers)) { try { Thread.sleep(200); } catch (InterruptedException ex) {} @@ -522,7 +523,12 @@ public boolean run() throws YarnException, IOException { return success; } - + + @VisibleForTesting + NMCallbackHandler createNMCallbackHandler() { + return new NMCallbackHandler(this); + } + private void finish() { // Join all launched threads // needed for when we time out @@ -566,7 +572,6 @@ private void finish() { LOG.error("Failed to unregister application", e); } - done = true; amRMClient.stop(); } @@ -679,10 +684,17 @@ public void onError(Throwable e) { } } - private class NMCallbackHandler implements NMClientAsync.CallbackHandler { + @VisibleForTesting + static class NMCallbackHandler + implements NMClientAsync.CallbackHandler { private ConcurrentMap containers = new ConcurrentHashMap(); + private final ApplicationMaster applicationMaster; + + public NMCallbackHandler(ApplicationMaster applicationMaster) { + this.applicationMaster = applicationMaster; + } public void addContainer(ContainerId containerId, Container container) { containers.putIfAbsent(containerId, container); @@ -713,7 +725,7 @@ public void onContainerStarted(ContainerId containerId, } Container container = containers.get(containerId); if (container != null) { - nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); + applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); } } @@ -721,6 +733,8 @@ public void onContainerStarted(ContainerId containerId, public void onStartContainerError(ContainerId containerId, Throwable t) { LOG.error("Failed to start Container " + containerId); containers.remove(containerId); + applicationMaster.numCompletedContainers.incrementAndGet(); + applicationMaster.numFailedContainers.incrementAndGet(); } @Override @@ -847,7 +861,6 @@ public void run() { /** * Setup the request that will be sent to the RM for the container ask. * - * @param numContainers Containers to ask for from RM * @return the setup ResourceRequest to be sent to RM */ private ContainerRequest setupContainerAskForRM() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 7d51a67..dbf486f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -125,8 +125,7 @@ // Application master jar file private String appMasterJar = ""; // Main class to invoke application master - private final String appMasterMainClass = - "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster"; + private final String appMasterMainClass; // Shell command to be executed private String shellCommand = ""; @@ -193,8 +192,14 @@ public static void main(String[] args) { /** */ public Client(Configuration conf) throws Exception { - + this( + "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster", + conf); + } + + Client(String appMasterMainClass, Configuration conf) { this.conf = conf; + this.appMasterMainClass = appMasterMainClass; yarnClient = YarnClient.createYarnClient(); yarnClient.init(conf); opts = new Options(); @@ -214,6 +219,7 @@ public Client(Configuration conf) throws Exception { opts.addOption("log_properties", true, "log4j.properties file"); opts.addOption("debug", false, "Dump out debug information"); opts.addOption("help", false, "Print usage"); + } /** @@ -628,6 +634,7 @@ private boolean monitorApplication(ApplicationId appId) + ", appMasterHost=" + report.getHost() + ", appQueue=" + report.getQueue() + ", appMasterRpcPort=" + report.getRpcPort() + + ", progress=" + report.getProgress() + ", appStartTime=" + report.getStartTime() + ", yarnAppState=" + report.getYarnApplicationState().toString() + ", distributedFinalState=" + report.getFinalApplicationStatus().toString() diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/ContainerLaunchFailAppMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/ContainerLaunchFailAppMaster.java new file mode 100644 index 0000000..2692fff --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/ContainerLaunchFailAppMaster.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ContainerId; + +import java.nio.ByteBuffer; +import java.util.Map; + +public class ContainerLaunchFailAppMaster extends ApplicationMaster { + + private static final Log LOG = + LogFactory.getLog(ContainerLaunchFailAppMaster.class); + + public ContainerLaunchFailAppMaster() { + super(); + } + + @Override + NMCallbackHandler createNMCallbackHandler() { + return new FailContainerLaunchNMCallbackHandler(this); + } + + class FailContainerLaunchNMCallbackHandler + extends ApplicationMaster.NMCallbackHandler { + + public FailContainerLaunchNMCallbackHandler( + ApplicationMaster applicationMaster) { + super(applicationMaster); + } + + @Override + public void onContainerStarted(ContainerId containerId, + Map allServiceResponse) { + super.onStartContainerError(containerId, + new RuntimeException("Inject Container Launch failure")); + } + + } + + public static void main(String[] args) { + boolean result = false; + try { + ContainerLaunchFailAppMaster appMaster = + new ContainerLaunchFailAppMaster(); + LOG.info("Initializing ApplicationMaster"); + boolean doRun = appMaster.init(args); + if (!doRun) { + System.exit(0); + } + result = appMaster.run(); + } catch (Throwable t) { + LOG.fatal("Error running ApplicationMaster", t); + System.exit(1); + } + if (result) { + LOG.info("Application Master completed successfully. exiting"); + System.exit(0); + } else { + LOG.info("Application Master failed. exiting"); + System.exit(2); + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 7fbd2a6..f8a41b7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -59,7 +59,7 @@ protected static String APPMASTER_JAR = JarFinder.getJar(ApplicationMaster.class); @BeforeClass - public static void setup() throws InterruptedException, Exception { + public static void setup() throws Exception { LOG.info("Starting up YARN cluster"); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128); conf.setClass(YarnConfiguration.RM_SCHEDULER, @@ -135,7 +135,7 @@ public void run() { } catch (Exception e) { throw new RuntimeException(e); } - }; + } }; t.start(); @@ -248,5 +248,34 @@ protected static void waitForNMToRegister(NodeManager nm) Thread.sleep(2000); } } + + @Test(timeout=90000) + public void testContainerLaunchFailureHandling() throws Exception { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "2", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--master_memory", + "512", + "--container_memory", + "128" + }; + + LOG.info("Initializing DS Client"); + Client client = new Client(ContainerLaunchFailAppMaster.class.getName(), + new Configuration(yarnCluster.getConfig())); + boolean initSuccess = client.init(args); + Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); + boolean result = client.run(); + + LOG.info("Client run completed. Result=" + result); + Assert.assertFalse(result); + + } + }