Index: hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/javaapp/HelloWorld.java =================================================================== --- hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/javaapp/HelloWorld.java (revision 0) +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/javaapp/HelloWorld.java (revision 0) @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.applications.javaapp; + +public class HelloWorld { + public static void main(String[] args) { + System.out.println("Hello World from custom java class"); + System.err.println("Hello World from custom java class"); + } +} Index: hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/javaapp/SimpleYarnJavaAppApplicationMaster.java =================================================================== --- hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/javaapp/SimpleYarnJavaAppApplicationMaster.java (revision 0) +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/javaapp/SimpleYarnJavaAppApplicationMaster.java (revision 0) @@ -0,0 +1,300 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.javaapp; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.AMRMProtocol; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; + +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class SimpleYarnJavaAppApplicationMaster { + + private static final Log LOG = LogFactory.getLog(SimpleYarnJavaAppApplicationMaster.class); + + private Configuration conf; + private YarnRPC rpc; + private AMRMProtocol resourceManager; + + private ApplicationAttemptId appAttemptID; + + private AtomicInteger rmRequestID = new AtomicInteger(); + + private String customAppMainClass; + private boolean hasUserAppCompleted; + private boolean hasUserAppFailed; + private String failureInfo; + + public SimpleYarnJavaAppApplicationMaster() throws Exception { + conf = new Configuration(); + rpc = YarnRPC.create(conf); + } + + /** + * @param args + * Command line args + * @throws InterruptedException + * @throws IOException + */ + public static void main(final String[] args) throws IOException, + InterruptedException { + String jobUserName = System.getenv(ApplicationConstants.Environment.USER + .name()); + UserGroupInformation.setConfiguration(new YarnConfiguration()); + UserGroupInformation appMasterUgi = UserGroupInformation + .createRemoteUser(jobUserName); + appMasterUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + boolean result = false; + try { + final SimpleYarnJavaAppApplicationMaster appMaster = new SimpleYarnJavaAppApplicationMaster(); + LOG.info("Initializing ApplicationMaster"); + boolean doRun = appMaster.init(args); + if (!doRun) { + System.exit(0); + } + result = appMaster.run(); + + } catch (Throwable t) { + LOG.fatal("Error running ApplicationMaster", t); + System.exit(1); + } + if (result) { + LOG.info("Application Master completed successfully. exiting"); + System.exit(0); + } else { + LOG.info("Application Master failed. exiting"); + System.exit(2); + } + return null; + } + }); + } + + /** + * Parse command line options + * + * @param args + * Command line args + * @return Whether init successful and run should be invoked + * @throws ParseException + * @throws IOException + */ + public boolean init(String[] args) throws ParseException, IOException { + + Options opts = new Options(); + opts.addOption("app_attempt_id", true, + "App Attempt ID. Not to be used unless for testing purposes"); + opts.addOption("debug", false, "Dump out debug information"); + opts.addOption("customappmainclass", true, "Custom Function Class"); + + opts.addOption("help", false, "Print usage"); + CommandLine cliParser = new GnuParser().parse(opts, args); + + if (args.length == 0) { + printUsage(opts); + throw new IllegalArgumentException( + "No args specified for application master to initialize"); + } + + if (cliParser.hasOption("help")) { + printUsage(opts); + return false; + } + + if (cliParser.hasOption("debug")) { + dumpOutDebugInfo(); + } + + Map envs = System.getenv(); + + appAttemptID = Records.newRecord(ApplicationAttemptId.class); + if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) { + if (cliParser.hasOption("app_attempt_id")) { + String appIdStr = cliParser.getOptionValue("app_attempt_id", ""); + appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr); + } else { + throw new IllegalArgumentException( + "Application Attempt Id not set in the environment"); + } + } else { + ContainerId containerId = ConverterUtils.toContainerId(envs + .get(ApplicationConstants.AM_CONTAINER_ID_ENV)); + appAttemptID = containerId.getApplicationAttemptId(); + } + + LOG.info("Application master for app" + ", appId=" + + appAttemptID.getApplicationId().getId() + ", clustertimestamp=" + + appAttemptID.getApplicationId().getClusterTimestamp() + + ", attemptId=" + appAttemptID.getAttemptId()); + + if (cliParser.hasOption("customappmainclass")) { + customAppMainClass = cliParser.getOptionValue("customappmainclass"); + } + return true; + } + + /** + * Main run function for the application master + * + * @throws YarnRemoteException + */ + public boolean run() throws YarnRemoteException { + LOG.info("Starting ApplicationMaster for Java Application."); + + resourceManager = connectToRM(); + registerToRM(); + + Thread appThread = new Thread() { + @Override + public void run() { + try { + Class customMainClass = Class.forName(customAppMainClass); + Method method = customMainClass.getMethod("main", String[].class); + method.invoke(null, new Object[] { new String[] {} }); + } catch (Throwable error) { + LOG.fatal("Failed to execute the custom application.", error); + hasUserAppFailed = true; + failureInfo = error.getMessage(); + } finally { + hasUserAppCompleted = true; + } + } + }; + + appThread.start(); + + while (hasUserAppCompleted == false) { + AllocateRequest req = Records.newRecord(AllocateRequest.class); + req.setResponseId(rmRequestID.incrementAndGet()); + req.setApplicationAttemptId(appAttemptID); + resourceManager.allocate(req); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.info(e); + } + } + + LOG.info("Application completed. Signalling finish to RM"); + + FinishApplicationMasterRequest finishReq = Records + .newRecord(FinishApplicationMasterRequest.class); + finishReq.setAppAttemptId(appAttemptID); + boolean isSuccess = true; + if (hasUserAppFailed) { + finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED); + finishReq.setDiagnostics(failureInfo); + isSuccess = false; + } else { + finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED); + } + resourceManager.finishApplicationMaster(finishReq); + return isSuccess; + } + + private void dumpOutDebugInfo() { + + LOG.info("Dump debug output"); + Map envs = System.getenv(); + for (Map.Entry env : envs.entrySet()) { + LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue()); + System.out.println("System env: key=" + env.getKey() + ", val=" + + env.getValue()); + } + + String cmd = "ls -al"; + Runtime run = Runtime.getRuntime(); + Process pr = null; + try { + pr = run.exec(cmd); + pr.waitFor(); + + BufferedReader buf = new BufferedReader(new InputStreamReader(pr + .getInputStream())); + String line = ""; + while ((line = buf.readLine()) != null) { + LOG.info("System CWD content: " + line); + System.out.println("System CWD content: " + line); + } + buf.close(); + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + private void printUsage(Options opts) { + new HelpFormatter().printHelp("Sample Java Yarn ApplicationMaster", opts); + } + + private AMRMProtocol connectToRM() { + YarnConfiguration yarnConf = new YarnConfiguration(conf); + InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get( + YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS)); + LOG.info("Connecting to ResourceManager at " + rmAddress); + return ((AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf)); + } + + private RegisterApplicationMasterResponse registerToRM() + throws YarnRemoteException { + RegisterApplicationMasterRequest appMasterRequest = Records + .newRecord(RegisterApplicationMasterRequest.class); + appMasterRequest.setApplicationAttemptId(appAttemptID); + appMasterRequest.setHost(""); + appMasterRequest.setRpcPort(0); + appMasterRequest.setTrackingUrl(""); + return resourceManager.registerApplicationMaster(appMasterRequest); + } +} Index: hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/javaapp/SimpleYarnJavaAppClient.java =================================================================== --- hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/javaapp/SimpleYarnJavaAppClient.java (revision 0) +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/javaapp/SimpleYarnJavaAppClient.java (revision 0) @@ -0,0 +1,489 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.javaapp; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URL; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Vector; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ClientRMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; + +/** + * Client for Submitting simple java application submission to YARN. + * + *

+ * The client allows an application master to be launched and that internally + * execute the functionality provided by the user. + *

+ * + */ + +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class SimpleYarnJavaAppClient { + + private static final Log LOG = LogFactory.getLog(SimpleYarnJavaAppClient.class); + + private Configuration conf; + private YarnRPC rpc; + private ClientRMProtocol applicationsManager; + + private String appName = ""; + private int amPriority = 0; + private String amQueue = ""; + private String amUser = ""; + private int amMemory = 10; + private String log4jPropFile = ""; + private boolean debugFlag = false; + private String customAppJar; + private String customAppMainClass; + private long clientTimeout = 600000; + + private static final String appMasterMainClass = + "org.apache.hadoop.yarn.applications.javaapp.SimpleYarnJavaAppApplicationMaster"; + + private final long clientStartTime = System.currentTimeMillis(); + + public SimpleYarnJavaAppClient() throws Exception { + conf = new Configuration(); + rpc = YarnRPC.create(conf); + } + + public static void main(String[] args) { + boolean result = false; + try { + SimpleYarnJavaAppClient client = new SimpleYarnJavaAppClient(); + LOG.info("Initializing Client"); + boolean doRun = client.init(args); + if (!doRun) { + System.exit(0); + } + result = client.run(); + } catch (Throwable t) { + LOG.fatal("Failed to run Client", t); + System.exit(1); + } + if (result) { + LOG.info("Application completed successfully."); + System.exit(0); + } + LOG.error("Application failed to complete."); + System.exit(2); + } + + private void printUsage(Options opts) { + new HelpFormatter().printHelp("Client", opts); + } + + /** + * Parse command line options + * + * @param args + * Parsed command line options + * @return valid command line options or not + */ + public boolean init(String[] args) throws ParseException { + + Options opts = new Options(); + addOptions(opts); + CommandLine cliParser = new GnuParser().parse(opts, args); + + if (cliParser.hasOption("help")) { + printUsage(opts); + return false; + } + if (cliParser.hasOption("debug")) { + debugFlag = true; + } + appName = cliParser.getOptionValue("appname", "SampleYarnJavaApplication"); + amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); + amQueue = cliParser.getOptionValue("queue", ""); + amUser = cliParser.getOptionValue("user", ""); + amMemory = Integer.parseInt(cliParser.getOptionValue("ammemory", "10")); + if (amMemory < 0) { + throw new IllegalArgumentException( + "Invalid memory specified for application master, exiting." + + " Specified memory=" + amMemory); + } + if (cliParser.hasOption("customappjar")) { + customAppJar = cliParser.getOptionValue("customappjar"); + if (cliParser.hasOption("customappmainclass") == false) { + throw new IllegalArgumentException( + "Custom application Main class is not set."); + } + customAppMainClass = cliParser.getOptionValue("customappmainclass"); + } else { + LOG + .info("No Custom application jar is specified. It takes default as am jar and main class as HelloWorld."); + customAppMainClass = "org.apache.hadoop.yarn.applications.javaapp.HelloWorld"; + } + clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", + "600000")); + log4jPropFile = cliParser.getOptionValue("log_properties", ""); + + return true; + } + + private void addOptions(Options opts) { + opts.addOption("appname", true, + "Application Name. Default value - SampleYarnJavaApplication"); + opts.addOption("priority", true, "Application Priority. Default 0"); + opts.addOption("queue", true, + "RM Queue in which this application is to be submitted"); + opts.addOption("user", true, "User to run the application as"); + opts.addOption("timeout", true, "Application timeout in milliseconds"); + opts.addOption("ammemory", true, + "Amount of memory in MB to be requested to run the application master"); + opts.addOption("customappjar", true, + "User Application jar to execute as part of Application Master."); + opts + .addOption("customappmainclass", true, + "Custom Application Main class to execute as part of Application Master."); + opts.addOption("log_properties", true, "log4j.properties file"); + opts.addOption("debug", false, "Dump out debug information"); + opts.addOption("help", false, "Print usage"); + } + + /** + * Main run function for the client + * + * @return true if application completed successfully + * @throws IOException + */ + public boolean run() throws IOException { + LOG.info("Starting Client"); + + connectToRM(); + + GetNewApplicationResponse newApp = getApplication(); + ApplicationId appId = newApp.getApplicationId(); + + checkAMMemoryReqs(newApp); + + ApplicationSubmissionContext appContext = getApplicationSubmissionContext(appId); + SubmitApplicationRequest appRequest = Records + .newRecord(SubmitApplicationRequest.class); + appRequest.setApplicationSubmissionContext(appContext); + + LOG.info("Submitting application to RM"); + applicationsManager.submitApplication(appRequest); + return monitorApplication(appId); + + } + + private void checkAMMemoryReqs(GetNewApplicationResponse newApp) { + int minMem = newApp.getMinimumResourceCapability().getMemory(); + int maxMem = newApp.getMaximumResourceCapability().getMemory(); + LOG.info("Min mem capabililty of resources in this cluster " + minMem); + LOG.info("Max mem capabililty of resources in this cluster " + maxMem); + + if (amMemory < minMem) { + LOG + .info("AM memory specified below min threshold of cluster. Using min value." + + ", specified=" + amMemory + ", min=" + minMem); + amMemory = minMem; + } else if (amMemory > maxMem) { + LOG + .info("AM memory specified above max threshold of cluster. Using max value." + + ", specified=" + amMemory + ", max=" + maxMem); + amMemory = maxMem; + } + } + + private ApplicationSubmissionContext getApplicationSubmissionContext( + ApplicationId appId) throws IOException { + LOG.info("Setting up application submission context for ASM"); + ApplicationSubmissionContext appContext = Records + .newRecord(ApplicationSubmissionContext.class); + appContext.setApplicationId(appId); + appContext.setApplicationName(appName); + ContainerLaunchContext amContainer = setupAndGetAMLaunchContext(appId); + appContext.setAMContainerSpec(amContainer); + Priority pri = Records.newRecord(Priority.class); + pri.setPriority(amPriority); + appContext.setPriority(pri); + appContext.setQueue(amQueue); + appContext.setUser(amUser); + return appContext; + } + + private ContainerLaunchContext setupAndGetAMLaunchContext(ApplicationId appId) + throws IOException { + ContainerLaunchContext amContainer = Records + .newRecord(ContainerLaunchContext.class); + + Map localResources = prepareAndGetLocalResources(appId); + + amContainer.setLocalResources(localResources); + Map env = setEnvironmentForAmContainer(); + amContainer.setEnvironment(env); + + List commands = getAMCommands(); + amContainer.setCommands(commands); + + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(amMemory); + amContainer.setResource(capability); + + return amContainer; + } + + private List getAMCommands() { + Vector vargs = new Vector(30); + + vargs.add("${JAVA_HOME}" + "/bin/java"); + vargs.add("-Xmx" + amMemory + "m"); + vargs.add(appMasterMainClass); + vargs.add("--customappmainclass " + customAppMainClass); + if (debugFlag) { + vargs.add("--debug"); + } + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + + "/AppMaster.stdout"); + vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + + "/AppMaster.stderr"); + StringBuilder command = new StringBuilder(); + for (CharSequence str : vargs) { + command.append(str).append(' '); + } + + LOG.info("Completed setting up app master command " + command.toString()); + List commands = new ArrayList(); + commands.add(command.toString()); + return commands; + } + + private Map prepareAndGetLocalResources( + ApplicationId appId) throws IOException { + Map localResources = new HashMap(); + + LOG + .info("Copy App Master jar from local filesystem and add to local environment"); + FileSystem fs = FileSystem.get(conf); + String appMasterJar = getApplicationMasterJar(); + LocalResource amJarRsrc = copyLocalResource(appId, fs, appMasterJar, + "AppMaster.jar"); + localResources.put("AppMaster.jar", amJarRsrc); + + LOG + .info("Copy Custom App jar from local filesystem and add to local environment"); + if (customAppJar != null) { + LocalResource customAppJarRsrc = copyLocalResource(appId, fs, + customAppJar, "CustomApp.jar"); + localResources.put("CustomApp.jar", customAppJarRsrc); + } + // Set the log4j properties if needed + if (!log4jPropFile.isEmpty()) { + LocalResource log4Jsrc = copyLocalResource(appId, fs, log4jPropFile, + "log4j.properties"); + localResources.put("log4j.properties", log4Jsrc); + } + return localResources; + } + + private Map setEnvironmentForAmContainer() { + LOG.info("Set the environment for the application master"); + Map env = new HashMap(); + StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:./*"); + for (String c : conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH) + .split(",")) { + classPathEnv.append(':'); + classPathEnv.append(c.trim()); + } + classPathEnv.append(":./log4j.properties"); + env.put("CLASSPATH", classPathEnv.toString()); + return env; + } + + private LocalResource copyLocalResource(ApplicationId appId, FileSystem fs, + String resource, String resourceName) throws IOException { + Path src = new Path(resource); + String pathSuffix = appName + "/" + appId.getId() + "/" + resourceName; + Path dst = new Path(fs.getHomeDirectory(), pathSuffix); + fs.copyFromLocalFile(false, true, src, dst); + FileStatus destStatus = fs.getFileStatus(dst); + LocalResource localResource = Records.newRecord(LocalResource.class); + localResource.setType(LocalResourceType.FILE); + localResource.setVisibility(LocalResourceVisibility.APPLICATION); + localResource.setResource(ConverterUtils.getYarnUrlFromPath(dst)); + localResource.setTimestamp(destStatus.getModificationTime()); + localResource.setSize(destStatus.getLen()); + return localResource; + } + + /** + * Monitor the submitted application for completion. Kill application if time + * expires. + * + * @param appId + * Application Id of application to be monitored + * @return true if application completed successfully + * @throws YarnRemoteException + */ + private boolean monitorApplication(ApplicationId appId) + throws YarnRemoteException { + + while (true) { + GetApplicationReportRequest reportRequest = Records + .newRecord(GetApplicationReportRequest.class); + reportRequest.setApplicationId(appId); + GetApplicationReportResponse reportResponse = applicationsManager + .getApplicationReport(reportRequest); + ApplicationReport report = reportResponse.getApplicationReport(); + + LOG.info("Got application report from ASM for" + ", appId=" + + appId.getId() + ", clientToken=" + report.getClientToken() + + ", appDiagnostics=" + report.getDiagnostics() + ", appMasterHost=" + + report.getHost() + ", appQueue=" + report.getQueue() + + ", appMasterRpcPort=" + report.getRpcPort() + ", appStartTime=" + + report.getStartTime() + ", yarnAppState=" + + report.getYarnApplicationState().toString() + + ", distributedFinalState=" + + report.getFinalApplicationStatus().toString() + ", appTrackingUrl=" + + report.getTrackingUrl() + ", appUser=" + report.getUser()); + + YarnApplicationState state = report.getYarnApplicationState(); + FinalApplicationStatus dsStatus = report.getFinalApplicationStatus(); + if (YarnApplicationState.FINISHED == state) { + if (FinalApplicationStatus.SUCCEEDED == dsStatus) { + LOG + .info("Application has completed successfully. Breaking monitoring loop"); + return true; + } else { + LOG.info("Application did finished unsuccessfully." + " YarnState=" + + state.toString() + ", DSFinalStatus=" + dsStatus.toString() + + ". Breaking monitoring loop"); + return false; + } + } else if (YarnApplicationState.KILLED == state + || YarnApplicationState.FAILED == state) { + LOG.info("Application did not finish." + " YarnState=" + + state.toString() + ", DSFinalStatus=" + dsStatus.toString() + + ". Breaking monitoring loop"); + return false; + } + + if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) { + LOG + .info("Reached client specified timeout for application. Killing application"); + killApplication(appId); + return false; + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.debug("Thread sleep in monitoring loop interrupted"); + } + } + + } + + private void killApplication(ApplicationId appId) throws YarnRemoteException { + KillApplicationRequest request = Records + .newRecord(KillApplicationRequest.class); + request.setApplicationId(appId); + applicationsManager.forceKillApplication(request); + } + + private void connectToRM() throws IOException { + YarnConfiguration yarnConf = new YarnConfiguration(conf); + InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get( + YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS)); + LOG.info("Connecting to ResourceManager at " + rmAddress); + applicationsManager = ((ClientRMProtocol) rpc.getProxy( + ClientRMProtocol.class, rmAddress, conf)); + } + + private GetNewApplicationResponse getApplication() throws YarnRemoteException { + GetNewApplicationRequest request = Records + .newRecord(GetNewApplicationRequest.class); + GetNewApplicationResponse response = applicationsManager + .getNewApplication(request); + LOG.info("Got new application id=" + response.getApplicationId()); + return response; + } + + protected String getApplicationMasterJar() { + ClassLoader loader = this.getClass().getClassLoader(); + String class_file = this.getClass().getName().replaceAll("\\.", "/") + + ".class"; + try { + for (Enumeration itr = loader.getResources(class_file); itr + .hasMoreElements();) { + URL url = (URL) itr.nextElement(); + if ("jar".equals(url.getProtocol())) { + String toReturn = url.getPath(); + if (toReturn.startsWith("file:")) { + toReturn = toReturn.substring("file:".length()); + } + toReturn = toReturn.replaceAll("\\+", "%2B"); + toReturn = URLDecoder.decode(toReturn, "UTF-8"); + return toReturn.replaceAll("!.*$", ""); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return null; + } +} Index: hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/javaapp/TestYarnJavaApp.java =================================================================== --- hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/javaapp/TestYarnJavaApp.java (revision 0) +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/javaapp/TestYarnJavaApp.java (revision 0) @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.applications.javaapp; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.JarFinder; +import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestYarnJavaApp { + private MiniYARNCluster yarnCluster = null; + private Configuration conf = new Configuration(); + + @Before + public void setup() throws Exception { + if (yarnCluster == null) { + yarnCluster = new MiniYARNCluster(TestYarnJavaApp.class.getName(), 1, 1, + 1); + yarnCluster.init(conf); + yarnCluster.start(); + } + } + + @After + public void tearDown() throws IOException { + if (yarnCluster != null) { + yarnCluster.stop(); + yarnCluster = null; + } + } + + @Test + public void testYarnJavaApplication() throws Exception { + SimpleYarnJavaAppClient client = instantiateJavaAppClient(); + boolean initSuccess = client.init(new String[0]); + Assert.assertTrue(initSuccess); + boolean result = client.run(); + Assert.assertTrue(result); + } + + private SimpleYarnJavaAppClient instantiateJavaAppClient() throws Exception { + SimpleYarnJavaAppClient client = new SimpleYarnJavaAppClient() { + @Override + protected String getApplicationMasterJar() { + return JarFinder.getJar(ApplicationMaster.class); + } + }; + return client; + } +}