diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 3644761..ca46d83 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -19,12 +19,14 @@ package org.apache.hadoop.yarn.applications.distributedshell; import java.io.BufferedReader; +import java.io.File; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -74,6 +76,9 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntityWithEvents; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvent; +import org.apache.hadoop.yarn.client.api.ATSClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.async.NMClientAsync; @@ -151,6 +156,13 @@ private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); + private static enum AppEvent { + APP_ATTEMPT_START, + APP_ATTEMPT_END, + CONTAINER_START, + CONTAINER_END + } + // Configuration private Configuration conf; @@ -222,6 +234,9 @@ // Launch threads private List launchThreads = new ArrayList(); + + // ATS Client + private static ATSClient atsClient; /** * @param args Command line args @@ -436,6 +451,11 @@ public boolean init(String[] args) throws ParseException, IOException { requestPriority = Integer.parseInt(cliParser .getOptionValue("priority", "0")); + // Creating the Application Timeline CLient + Configuration config = new YarnConfiguration(); + atsClient = ATSClient.createAMATSClient(); + atsClient.init(config); + atsClient.start(); return true; } @@ -457,7 +477,18 @@ private void printUsage(Options opts) { @SuppressWarnings({ "unchecked" }) public boolean run() throws YarnException, IOException { LOG.info("Starting ApplicationMaster"); - + try { + ApplicationMaster.publishApplicationAttemptEvent(atsClient, appAttemptID + .toString(), AppEvent.APP_ATTEMPT_START); + } catch (IOException e) { + LOG.error("App Attempt start event coud not be pulished for " + + appAttemptID.toString()); + LOG.error(e); + } catch (YarnException e) { + LOG.error("App Attempt start event coud not be pulished for " + + appAttemptID.toString()); + LOG.error(e); + } Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); DataOutputBuffer dob = new DataOutputBuffer(); @@ -536,7 +567,18 @@ public boolean run() throws YarnException, IOException { } catch (InterruptedException ex) {} } finish(); - + try { + ApplicationMaster.publishApplicationAttemptEvent(atsClient, appAttemptID + .toString(), AppEvent.APP_ATTEMPT_END); + } catch (IOException e) { + LOG.error("App Attempt start event coud not be pulished for " + + appAttemptID.toString()); + LOG.error(e); + } catch (YarnException e) { + LOG.error("App Attempt start event coud not be pulished for " + + appAttemptID.toString()); + LOG.error(e); + } return success; } @@ -631,6 +673,18 @@ public void onContainersCompleted(List completedContainers) { LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId()); } + try { + ApplicationMaster + .publishContainerEndEvent(atsClient, containerStatus); + } catch (IOException e) { + LOG.error("Container start event could not be pulished for " + + containerStatus.getContainerId().toString()); + LOG.error(e); + } catch (YarnException e) { + LOG.error("Container start event could not be pulished for " + + containerStatus.getContainerId().toString()); + LOG.error(e); + } } // ask for more containers if any failed @@ -745,6 +799,17 @@ public void onContainerStarted(ContainerId containerId, if (container != null) { applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); } + try { + ApplicationMaster.publishContainerStartEvent(atsClient, container); + } catch (IOException e) { + LOG.error("Container start event coud not be pulished for " + + container.getId().toString()); + LOG.error(e); + } catch (YarnException e) { + LOG.error("Container start event coud not be pulished for " + + container.getId().toString()); + LOG.error(e); + } } @Override @@ -900,4 +965,46 @@ private ContainerRequest setupContainerAskForRM() { LOG.info("Requested container ask: " + request.toString()); return request; } + + private static void publishContainerStartEvent(ATSClient atsClient, + Container container) throws IOException, YarnException { + ATSEntityWithEvents entity = new ATSEntityWithEvents(); + entity.setEntityId(container.getId().toString()); + entity.setEntityType("ContainerId"); + ATSEvent event = new ATSEvent(); + event.setTimestamp(System.currentTimeMillis()); + event.setEventType(AppEvent.CONTAINER_START.toString()); + event.addEventInfo("Node", container.getNodeId().toString()); + event.addEventInfo("Resources", container.getResource().toString()); + entity.addEvent(event); + + atsClient.postEntities(entity); + } + + private static void publishContainerEndEvent(ATSClient atsClient, + ContainerStatus container) throws IOException, YarnException { + ATSEntityWithEvents entity = new ATSEntityWithEvents(); + entity.setEntityId(container.getContainerId().toString()); + entity.setEntityType("ContainerId"); + ATSEvent event = new ATSEvent(); + event.setTimestamp(System.currentTimeMillis()); + event.setEventType(AppEvent.CONTAINER_END.toString()); + event.addEventInfo("State", container.getState().name()); + event.addEventInfo("Exit STatus", container.getExitStatus()); + entity.addEvent(event); + + atsClient.postEntities(entity); + } + + private static void publishApplicationAttemptEvent(ATSClient atsClient, String appAttemptId, + AppEvent appEvent) throws IOException, YarnException { + ATSEntityWithEvents entity = new ATSEntityWithEvents(); + entity.setEntityId(appAttemptId); + entity.setEntityType(appEvent.toString()); + ATSEvent event = new ATSEvent(); + event.setTimestamp(System.currentTimeMillis()); + entity.addEvent(event); + + atsClient.postEntities(entity); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 2f311b5..79b0006 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -25,6 +25,7 @@ import java.io.OutputStream; import java.net.URL; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import junit.framework.Assert; @@ -40,6 +41,8 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.Entity; +import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.Event; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -106,7 +109,7 @@ public static void tearDown() throws IOException { } } - @Test(timeout=90000) + @Test(timeout=110000) public void testDSShell() throws Exception { String[] args = { @@ -149,6 +152,8 @@ public void run() { String hostName = NetUtils.getHostname(); boolean verified = false; while(!verified) { List apps = yarnClient.getApplications(); if (apps.size() == 0 ) { Thread.sleep(10); @@ -171,6 +176,72 @@ public void run() { } @Test(timeout=90000) + public void testATSEvents() throws Exception { + + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "2", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1" + }; + + LOG.info("Initializing DS Client"); + final Client client = new Client(new Configuration(yarnCluster.getConfig())); + boolean initSuccess = client.init(args); + Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); + final AtomicBoolean result = new AtomicBoolean(false); + Thread t = new Thread() { + public void run() { + try { + result.set(client.run()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + t.start(); + + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(new Configuration(yarnCluster.getConfig())); + yarnClient.start(); + String hostName = NetUtils.getHostname(); + boolean verified = false; + while(!verified) { + List apps = yarnClient.getApplications(); + if (apps.size() == 0 ) { + Thread.sleep(10); + continue; + } + ApplicationReport appReport = apps.get(0); + if (appReport.getHost().startsWith(hostName) + && appReport.getRpcPort() == -1) { + verified = true; + } + if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED) { + break; + } + } + Assert.assertTrue(verified); + t.join(); + LOG.info("Client run completed. Result=" + result); + Assert.assertTrue(result.get()); + Map entities = yarnCluster.getApplicationHistoryServer() + .getTimelineStore().getEntities("ContainerId", 1l,null, null, null, null); + Assert.assertNull(entities); + + } + @Test(timeout=90000) public void testDSShellWithInvalidArgs() throws Exception { Client client = new Client(new Configuration(yarnCluster.getConfig())); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java index 9329bab..1ce22cf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java @@ -177,4 +177,12 @@ protected void startWebApp() { public InetSocketAddress getWebAppListenerAddress() { return webApp.getListenerAddress(); } + + /** + * @return ApplicationTimelineStore for the AHS. + */ + @VisibleForTesting + public ApplicationTimelineStore getTimelineStore() { + return timelineStore; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index a03bb0b..0b35723 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -184,6 +184,10 @@ public ResourceManager getResourceManager() { return this.resourceManager; } + public ApplicationHistoryServer getApplicationHistoryServer() { + return this.appHistoryServer; + } + public NodeManager getNodeManager(int i) { return this.nodeManagers[i]; }