diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 3644761..ca46d83 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -19,12 +19,14 @@ package org.apache.hadoop.yarn.applications.distributedshell; import java.io.BufferedReader; +import java.io.File; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -74,6 +76,9 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntityWithEvents; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvent; +import org.apache.hadoop.yarn.client.api.ATSClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.async.NMClientAsync; @@ -151,6 +156,13 @@ private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); + private static enum AppEvent { + APP_ATTEMPT_START, + APP_ATTEMPT_END, + CONTAINER_START, + CONTAINER_END + } + // Configuration private Configuration conf; @@ -222,6 +234,9 @@ // Launch threads private List launchThreads = new ArrayList(); + + // ATS Client + private static ATSClient atsClient; /** * @param args Command line args @@ -436,6 +451,11 @@ public boolean init(String[] args) throws ParseException, IOException { requestPriority = Integer.parseInt(cliParser .getOptionValue("priority", "0")); + // Creating the Application Timeline CLient + Configuration config = new YarnConfiguration(); + atsClient = ATSClient.createAMATSClient(); + atsClient.init(config); + atsClient.start(); return true; } @@ -457,7 +477,18 @@ private void printUsage(Options opts) { @SuppressWarnings({ "unchecked" }) public boolean run() throws YarnException, IOException { LOG.info("Starting ApplicationMaster"); - + try { + ApplicationMaster.publishApplicationAttemptEvent(atsClient, appAttemptID + .toString(), AppEvent.APP_ATTEMPT_START); + } catch (IOException e) { + LOG.error("App Attempt start event coud not be pulished for " + + appAttemptID.toString()); + LOG.error(e); + } catch (YarnException e) { + LOG.error("App Attempt start event coud not be pulished for " + + appAttemptID.toString()); + LOG.error(e); + } Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); DataOutputBuffer dob = new DataOutputBuffer(); @@ -536,7 +567,18 @@ public boolean run() throws YarnException, IOException { } catch (InterruptedException ex) {} } finish(); - + try { + ApplicationMaster.publishApplicationAttemptEvent(atsClient, appAttemptID + .toString(), AppEvent.APP_ATTEMPT_END); + } catch (IOException e) { + LOG.error("App Attempt start event coud not be pulished for " + + appAttemptID.toString()); + LOG.error(e); + } catch (YarnException e) { + LOG.error("App Attempt start event coud not be pulished for " + + appAttemptID.toString()); + LOG.error(e); + } return success; } @@ -631,6 +673,18 @@ public void onContainersCompleted(List completedContainers) { LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId()); } + try { + ApplicationMaster + .publishContainerEndEvent(atsClient, containerStatus); + } catch (IOException e) { + LOG.error("Container start event could not be pulished for " + + containerStatus.getContainerId().toString()); + LOG.error(e); + } catch (YarnException e) { + LOG.error("Container start event could not be pulished for " + + containerStatus.getContainerId().toString()); + LOG.error(e); + } } // ask for more containers if any failed @@ -745,6 +799,17 @@ public void onContainerStarted(ContainerId containerId, if (container != null) { applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); } + try { + ApplicationMaster.publishContainerStartEvent(atsClient, container); + } catch (IOException e) { + LOG.error("Container start event coud not be pulished for " + + container.getId().toString()); + LOG.error(e); + } catch (YarnException e) { + LOG.error("Container start event coud not be pulished for " + + container.getId().toString()); + LOG.error(e); + } } @Override @@ -900,4 +965,46 @@ private ContainerRequest setupContainerAskForRM() { LOG.info("Requested container ask: " + request.toString()); return request; } + + private static void publishContainerStartEvent(ATSClient atsClient, + Container container) throws IOException, YarnException { + ATSEntityWithEvents entity = new ATSEntityWithEvents(); + entity.setEntityId(container.getId().toString()); + entity.setEntityType("ContainerId"); + ATSEvent event = new ATSEvent(); + event.setTimestamp(System.currentTimeMillis()); + event.setEventType(AppEvent.CONTAINER_START.toString()); + event.addEventInfo("Node", container.getNodeId().toString()); + event.addEventInfo("Resources", container.getResource().toString()); + entity.addEvent(event); + + atsClient.postEntities(entity); + } + + private static void publishContainerEndEvent(ATSClient atsClient, + ContainerStatus container) throws IOException, YarnException { + ATSEntityWithEvents entity = new ATSEntityWithEvents(); + entity.setEntityId(container.getContainerId().toString()); + entity.setEntityType("ContainerId"); + ATSEvent event = new ATSEvent(); + event.setTimestamp(System.currentTimeMillis()); + event.setEventType(AppEvent.CONTAINER_END.toString()); + event.addEventInfo("State", container.getState().name()); + event.addEventInfo("Exit STatus", container.getExitStatus()); + entity.addEvent(event); + + atsClient.postEntities(entity); + } + + private static void publishApplicationAttemptEvent(ATSClient atsClient, String appAttemptId, + AppEvent appEvent) throws IOException, YarnException { + ATSEntityWithEvents entity = new ATSEntityWithEvents(); + entity.setEntityId(appAttemptId); + entity.setEntityType(appEvent.toString()); + ATSEvent event = new ATSEvent(); + event.setTimestamp(System.currentTimeMillis()); + entity.addEvent(event); + + atsClient.postEntities(entity); + } }