diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 228f184..daa5900 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -46,6 +46,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.IOUtils; @@ -53,6 +54,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; @@ -74,12 +76,14 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; -import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.async.NMClientAsync; import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; @@ -88,6 +92,7 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; +import org.apache.log4j.LogManager; import com.google.common.annotations.VisibleForTesting; @@ -158,6 +163,18 @@ private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); + @VisibleForTesting + @Private + public static enum AppEvent { + DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END + } + + @VisibleForTesting + @Private + public static enum DSEntity { + DS_APP_ATTEMPT, DS_CONTAINER + } + // Configuration private Configuration conf; @@ -239,6 +256,9 @@ // Launch threads private List launchThreads = new ArrayList(); + // ATS Client + private static TimelineClient timelineClient; + private final String linux_bash_command = "bash"; private final String windows_command = "cmd /c"; @@ -258,7 +278,8 @@ public static void main(String[] args) { result = appMaster.finish(); } catch (Throwable t) { LOG.fatal("Error running ApplicationMaster", t); - System.exit(1); + LogManager.shutdown(); + ExitUtil.terminate(1, t); } if (result) { LOG.info("Application Master completed successfully. exiting"); @@ -461,6 +482,12 @@ public boolean init(String[] args) throws ParseException, IOException { requestPriority = Integer.parseInt(cliParser .getOptionValue("priority", "0")); + // Creating the Application Timeline CLient + Configuration config = new YarnConfiguration(); + timelineClient = TimelineClient.createTimelineClient(); + timelineClient.init(config); + timelineClient.start(); + return true; } @@ -482,6 +509,14 @@ private void printUsage(Options opts) { @SuppressWarnings({ "unchecked" }) public void run() throws YarnException, IOException { LOG.info("Starting ApplicationMaster"); + try { + ApplicationMaster.publishApplicationAttemptEvent(timelineClient, + appAttemptID.toString(), AppEvent.DS_APP_ATTEMPT_START); + } catch (Exception e) { + LOG.error("App Attempt start event coud not be pulished for " + + appAttemptID.toString()); + LOG.error(e); + } Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); @@ -561,6 +596,14 @@ public void run() throws YarnException, IOException { amRMClient.addContainerRequest(containerAsk); } numRequestedContainers.set(numTotalContainersToRequest); + try { + ApplicationMaster.publishApplicationAttemptEvent(timelineClient, + appAttemptID.toString(), AppEvent.DS_APP_ATTEMPT_END); + } catch (Exception e) { + LOG.error("App Attempt start event coud not be pulished for " + + appAttemptID.toString()); + LOG.error(e); + } } @VisibleForTesting @@ -665,6 +708,18 @@ public void onContainersCompleted(List completedContainers) { LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId()); } + try { + ApplicationMaster + .publishContainerEndEvent(timelineClient, containerStatus); + } catch (IOException e) { + LOG.error("Container start event could not be pulished for " + + containerStatus.getContainerId().toString()); + LOG.error(e); + } catch (YarnException e) { + LOG.error("Container start event could not be pulished for " + + containerStatus.getContainerId().toString()); + LOG.error(e); + } } // ask for more containers if any failed @@ -779,6 +834,17 @@ public void onContainerStarted(ContainerId containerId, if (container != null) { applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); } + try { + ApplicationMaster.publishContainerStartEvent(timelineClient, container); + } catch (IOException e) { + LOG.error("Container start event coud not be pulished for " + + container.getId().toString()); + LOG.error(e); + } catch (YarnException e) { + LOG.error("Container start event coud not be pulished for " + + container.getId().toString()); + LOG.error(e); + } } @Override @@ -951,4 +1017,54 @@ private String readContent(String filePath) throws IOException { org.apache.commons.io.IOUtils.closeQuietly(ds); } } + + private static void publishContainerStartEvent(TimelineClient timelineClient, + Container container) throws IOException, YarnException { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityId(container.getId().toString()); + entity.setEntityType(DSEntity.DS_CONTAINER.toString()); + entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser() + .toString()); + TimelineEvent event = new TimelineEvent(); + event.setTimestamp(System.currentTimeMillis()); + event.setEventType(AppEvent.DS_CONTAINER_START.toString()); + event.addEventInfo("Node", container.getNodeId().toString()); + event.addEventInfo("Resources", container.getResource().toString()); + entity.addEvent(event); + + timelineClient.putEntities(entity); + } + + private static void publishContainerEndEvent(TimelineClient timelineClient, + ContainerStatus container) throws IOException, YarnException { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityId(container.getContainerId().toString()); + entity.setEntityType(DSEntity.DS_CONTAINER.toString()); + entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser() + .toString()); + TimelineEvent event = new TimelineEvent(); + event.setTimestamp(System.currentTimeMillis()); + event.setEventType(AppEvent.DS_CONTAINER_END.toString()); + event.addEventInfo("State", container.getState().name()); + event.addEventInfo("Exit Status", container.getExitStatus()); + entity.addEvent(event); + + timelineClient.putEntities(entity); + } + + private static void publishApplicationAttemptEvent( + TimelineClient timelineClient, String appAttemptId, AppEvent appEvent) + throws IOException, YarnException { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityId(appAttemptId); + entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString()); + entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser() + .toString()); + TimelineEvent event = new TimelineEvent(); + event.setEventType(appEvent.toString()); + event.setTimestamp(System.currentTimeMillis()); + entity.addEvent(event); + + timelineClient.putEntities(entity); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 97522f3..aacc956 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -41,6 +41,9 @@ import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents; +import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster.DSEntity; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; @@ -171,7 +174,24 @@ public void run() { t.join(); LOG.info("Client run completed. Result=" + result); Assert.assertTrue(result.get()); - + + TimelineEntities entitiesAttempts = yarnCluster + .getApplicationHistoryServer() + .getTimelineStore() + .getEntities(ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString(), 1l, + null, null, null, null, null); + Assert.assertNotNull(entitiesAttempts); + Assert.assertEquals(entitiesAttempts.getEntities().get(0).getEntityType() + .toString(), ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString()); + + TimelineEntities entities = yarnCluster + .getApplicationHistoryServer() + .getTimelineStore() + .getEntities(ApplicationMaster.DSEntity.DS_CONTAINER.toString(), 1l, + null, null, null, null, null); + Assert.assertNotNull(entities); + Assert.assertEquals(entities.getEntities().get(0).getEntityType() + .toString(), ApplicationMaster.DSEntity.DS_CONTAINER.toString()); } @Test(timeout=90000) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java index 89e13f4..36f25f6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java @@ -169,5 +169,12 @@ protected void startWebApp() { throw new YarnRuntimeException(msg, e); } } - + /** + * @return ApplicationTimelineStore for the AHS. + */ + @Private + @VisibleForTesting + public TimelineStore getTimelineStore() { + return timelineStore; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index ff2e995..563117f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; +import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; @@ -100,6 +101,9 @@ private ResourceManager[] resourceManagers; private String[] rmIds; + private ApplicationHistoryServer appHistoryServer; + private ApplicationHistoryServerWrapper appHistoryServerWrapper; + private boolean useFixedPorts; private boolean useRpc = false; private int failoverTimeout; @@ -241,6 +245,8 @@ protected void doSecureLogin() throws IOException { addService(new NodeManagerWrapper(index)); } + addService(new ApplicationHistoryServerWrapper()); + super.serviceInit( conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); } @@ -649,4 +655,67 @@ public boolean waitForNodeManagersToConnect(long timeout) } return false; } + + private class ApplicationHistoryServerWrapper extends AbstractService { + public ApplicationHistoryServerWrapper() { + super(ApplicationHistoryServerWrapper.class.getName()); + } + + @Override + protected synchronized void serviceInit(Configuration conf) + throws Exception { + if (!conf.getBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, + YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS)) { + conf.set(YarnConfiguration.TIMELINE_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS); + conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS); + } + appHistoryServer = new ApplicationHistoryServer(); + appHistoryServer.init(conf); + super.serviceInit(conf); + } + + @Override + protected synchronized void serviceStart() throws Exception { + try { + new Thread() { + public void run() { + appHistoryServer.start(); + }; + }.start(); + int waitCount = 0; + while (appHistoryServer.getServiceState() == STATE.INITED + && waitCount++ < 60) { + LOG.info("Waiting for AHS to start..."); + Thread.sleep(1500); + } + if (appHistoryServer.getServiceState() != STATE.STARTED) { + // AHS could have failed. + throw new IOException( + "ApplicationHistoryServer failed to start. Final state is " + + appHistoryServer.getServiceState()); + } + super.serviceStart(); + } catch (Throwable t) { + throw new YarnRuntimeException(t); + } + LOG.info("MiniYARN ApplicationHistoryServer address: " + + getConfig().get(YarnConfiguration.TIMELINE_SERVICE_ADDRESS)); + LOG.info("MiniYARN ApplicationHistoryServer web address: " + + getConfig().get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS)); + } + + @Override + protected synchronized void serviceStop() throws Exception { + if (appHistoryServer != null) { + appHistoryServer.stop(); + } + super.serviceStop(); + } + } + + public ApplicationHistoryServer getApplicationHistoryServer() { + return this.appHistoryServer; + } }