diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 8d869a2..f60e5f3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -53,6 +53,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; @@ -78,7 +79,10 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvent; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.async.NMClientAsync; import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; @@ -87,6 +91,7 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; +import org.apache.log4j.LogManager; import com.google.common.annotations.VisibleForTesting; @@ -157,6 +162,10 @@ private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); + private static enum AppEvent { + APP_ATTEMPT_START, APP_ATTEMPT_END, CONTAINER_START, CONTAINER_END + } + // Configuration private Configuration conf; @@ -238,6 +247,9 @@ // Launch threads private List launchThreads = new ArrayList(); + // ATS Client + private static TimelineClient atsClient; + private final String linux_bash_command = "bash"; private final String windows_command = "cmd /c"; @@ -257,7 +269,8 @@ public static void main(String[] args) { result = appMaster.finish(); } catch (Throwable t) { LOG.fatal("Error running ApplicationMaster", t); - System.exit(1); + LogManager.shutdown(); + ExitUtil.terminate(1, t); } if (result) { LOG.info("Application Master completed successfully. exiting"); @@ -460,6 +473,12 @@ public boolean init(String[] args) throws ParseException, IOException { requestPriority = Integer.parseInt(cliParser .getOptionValue("priority", "0")); + // Creating the Application Timeline CLient + Configuration config = new YarnConfiguration(); + atsClient = TimelineClient.createTimelineClient(); + atsClient.init(config); + atsClient.start(); + return true; } @@ -481,6 +500,18 @@ private void printUsage(Options opts) { @SuppressWarnings({ "unchecked" }) public void run() throws YarnException, IOException { LOG.info("Starting ApplicationMaster"); + try { + ApplicationMaster.publishApplicationAttemptEvent(atsClient, + appAttemptID.toString(), AppEvent.APP_ATTEMPT_START); + } catch (IOException e) { + LOG.error("App Attempt start event coud not be pulished for " + + appAttemptID.toString()); + LOG.error(e); + } catch (YarnException e) { + LOG.error("App Attempt start event coud not be pulished for " + + appAttemptID.toString()); + LOG.error(e); + } Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); @@ -560,6 +591,18 @@ public void run() throws YarnException, IOException { amRMClient.addContainerRequest(containerAsk); } numRequestedContainers.set(numTotalContainersToRequest); + try { + ApplicationMaster.publishApplicationAttemptEvent(atsClient, + appAttemptID.toString(), AppEvent.APP_ATTEMPT_END); + } catch (IOException e) { + LOG.error("App Attempt start event coud not be pulished for " + + appAttemptID.toString()); + LOG.error(e); + } catch (YarnException e) { + LOG.error("App Attempt start event coud not be pulished for " + + appAttemptID.toString()); + LOG.error(e); + } } @VisibleForTesting @@ -664,6 +707,18 @@ public void onContainersCompleted(List completedContainers) { LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId()); } + try { + ApplicationMaster + .publishContainerEndEvent(atsClient, containerStatus); + } catch (IOException e) { + LOG.error("Container start event could not be pulished for " + + containerStatus.getContainerId().toString()); + LOG.error(e); + } catch (YarnException e) { + LOG.error("Container start event could not be pulished for " + + containerStatus.getContainerId().toString()); + LOG.error(e); + } } // ask for more containers if any failed @@ -778,6 +833,17 @@ public void onContainerStarted(ContainerId containerId, if (container != null) { applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); } + try { + ApplicationMaster.publishContainerStartEvent(atsClient, container); + } catch (IOException e) { + LOG.error("Container start event coud not be pulished for " + + container.getId().toString()); + LOG.error(e); + } catch (YarnException e) { + LOG.error("Container start event coud not be pulished for " + + container.getId().toString()); + LOG.error(e); + } } @Override @@ -950,4 +1016,46 @@ private String readContent(String filePath) throws IOException { org.apache.commons.io.IOUtils.closeQuietly(ds); } } + + private static void publishContainerStartEvent(TimelineClient atsClient, + Container container) throws IOException, YarnException { + ATSEntity entity = new ATSEntity(); + entity.setEntityId(container.getId().toString()); + entity.setEntityType("ContainerId"); + ATSEvent event = new ATSEvent(); + event.setTimestamp(System.currentTimeMillis()); + event.setEventType(AppEvent.CONTAINER_START.toString()); + event.addEventInfo("Node", container.getNodeId().toString()); + event.addEventInfo("Resources", container.getResource().toString()); + entity.addEvent(event); + + atsClient.postEntities(entity); + } + + private static void publishContainerEndEvent(TimelineClient atsClient, + ContainerStatus container) throws IOException, YarnException { + ATSEntity entity = new ATSEntity(); + entity.setEntityId(container.getContainerId().toString()); + entity.setEntityType("ContainerId"); + ATSEvent event = new ATSEvent(); + event.setTimestamp(System.currentTimeMillis()); + event.setEventType(AppEvent.CONTAINER_END.toString()); + event.addEventInfo("State", container.getState().name()); + event.addEventInfo("Exit STatus", container.getExitStatus()); + entity.addEvent(event); + + atsClient.postEntities(entity); + } + + private static void publishApplicationAttemptEvent(TimelineClient atsClient, + String appAttemptId, AppEvent appEvent) throws IOException, YarnException { + ATSEntity entity = new ATSEntity(); + entity.setEntityId(appAttemptId); + entity.setEntityType(appEvent.toString()); + ATSEvent event = new ATSEvent(); + event.setTimestamp(System.currentTimeMillis()); + entity.addEvent(event); + + atsClient.postEntities(entity); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 97522f3..72d3db4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -29,6 +29,7 @@ import java.net.URL; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import junit.framework.Assert; @@ -41,6 +42,7 @@ import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; @@ -171,7 +173,13 @@ public void run() { t.join(); LOG.info("Client run completed. Result=" + result); Assert.assertTrue(result.get()); - + + ATSEntities entities = yarnCluster.getApplicationHistoryServer() + .getTimelineStore() + .getEntities("ContainerId", 1l, null, null, null, null, null); + Assert.assertNotNull(entities); + Assert.assertEquals(entities.getEntities().get(0).getEntityType() + .toString(), "ContainerId"); } @Test(timeout=90000) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java index 73a0941..0da38fd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java @@ -169,5 +169,11 @@ protected void startWebApp() { throw new YarnRuntimeException(msg, e); } } - + /** + * @return ApplicationTimelineStore for the AHS. + */ + @VisibleForTesting + public ApplicationTimelineStore getTimelineStore() { + return timelineStore; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 1412409..2190b25 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; +import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; @@ -97,6 +98,9 @@ private ResourceManager[] resourceManagers; private String[] rmIds; + private ApplicationHistoryServer appHistoryServer; + private ApplicationHistoryServerWrapper appHistoryServerWrapper; + private boolean useFixedPorts; private boolean useRpc = false; private int failoverTimeout; @@ -238,6 +242,8 @@ protected void doSecureLogin() throws IOException { addService(new NodeManagerWrapper(index)); } + addService(new ApplicationHistoryServerWrapper()); + super.serviceInit( conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); } @@ -646,4 +652,68 @@ public boolean waitForNodeManagersToConnect(long timeout) } return false; } + + private class ApplicationHistoryServerWrapper extends AbstractService { + public ApplicationHistoryServerWrapper() { + super(ApplicationHistoryServerWrapper.class.getName()); + } + + @Override + protected synchronized void serviceInit(Configuration conf) + throws Exception { + if (!conf.getBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, + YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS)) { + // pick free random ports. + conf.set(YarnConfiguration.AHS_ADDRESS, + YarnConfiguration.DEFAULT_AHS_ADDRESS); + conf.set(YarnConfiguration.AHS_WEBAPP_ADDRESS, + YarnConfiguration.DEFAULT_AHS_WEBAPP_ADDRESS); + } + appHistoryServer = new ApplicationHistoryServer(); + appHistoryServer.init(conf); + super.serviceInit(conf); + } + + @Override + protected synchronized void serviceStart() throws Exception { + try { + new Thread() { + public void run() { + appHistoryServer.start(); + }; + }.start(); + int waitCount = 0; + while (appHistoryServer.getServiceState() == STATE.INITED + && waitCount++ < 60) { + LOG.info("Waiting for AHS to start..."); + Thread.sleep(1500); + } + if (appHistoryServer.getServiceState() != STATE.STARTED) { + // AHS could have failed. + throw new IOException( + "ApplicationHistoryServer failed to start. Final state is " + + appHistoryServer.getServiceState()); + } + super.serviceStart(); + } catch (Throwable t) { + throw new YarnRuntimeException(t); + } + LOG.info("MiniYARN ApplicationHistoryServer address: " + + getConfig().get(YarnConfiguration.AHS_ADDRESS)); + LOG.info("MiniYARN ApplicationHistoryServer web address: " + + getConfig().get(YarnConfiguration.AHS_WEBAPP_ADDRESS)); + } + + @Override + protected synchronized void serviceStop() throws Exception { + if (appHistoryServer != null) { + appHistoryServer.stop(); + } + super.serviceStop(); + } + } + + public ApplicationHistoryServer getApplicationHistoryServer() { + return this.appHistoryServer; + } }