diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptReport.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptReport.java index 031573f..53c18ae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptReport.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptReport.java @@ -51,14 +51,15 @@ @Unstable public static ApplicationAttemptReport newInstance( ApplicationAttemptId applicationAttemptId, String host, int rpcPort, - String url, String diagnostics, YarnApplicationAttemptState state, - ContainerId amContainerId) { + String url, String oUrl, String diagnostics, + YarnApplicationAttemptState state, ContainerId amContainerId) { ApplicationAttemptReport report = Records.newRecord(ApplicationAttemptReport.class); report.setApplicationAttemptId(applicationAttemptId); report.setHost(host); report.setRpcPort(rpcPort); report.setTrackingUrl(url); + report.setOriginalTrackingUrl(oUrl); report.setDiagnostics(diagnostics); report.setYarnApplicationAttemptState(state); report.setAMContainerId(amContainerId); @@ -136,6 +137,19 @@ public abstract void setYarnApplicationAttemptState( public abstract void setTrackingUrl(String url); /** + * Get the original tracking url for the application attempt. + * + * @return original tracking url for the application attempt + */ + @Public + @Unstable + public abstract String getOriginalTrackingUrl(); + + @Private + @Unstable + public abstract void setOriginalTrackingUrl(String oUrl); + + /** * Get the ApplicationAttemptId of this attempt of the * application * diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 7b7511d..31bb631 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -316,6 +316,19 @@ public static final int DEFAULT_RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE = 10; + /** + * The setting that controls whether yarn system metrics is published on the + * timeline server or not by RM. + */ + public static final String RM_METRICS_PUBLISHER_ENABLED = + RM_PREFIX + "metrics-publisher.enabled"; + public static final boolean DEFAULT_RM_METRICS_PUBLISHER_ENABLED = false; + + public static final String RM_METRICS_PUBLISHER_MULTI_DISPATCHER_POOL_SIZE = + RM_PREFIX + "metrics-publisher.dispatcher.pool-size"; + public static final int DEFAULT_RM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = + 10; + //Delegation token related keys public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY = RM_PREFIX + "delegation.key.update-interval"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 3f1fa6c..b8bad64 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -200,6 +200,7 @@ message ApplicationAttemptReportProto { optional string diagnostics = 5 [default = "N/A"]; optional YarnApplicationAttemptStateProto yarn_application_attempt_state = 6; optional ContainerIdProto am_container_id = 7; + optional string original_tracking_url = 8; } enum NodeStateProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index 72cb1b1..120538c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -676,7 +676,7 @@ public QueueInfo createFakeQueueInfo() { public ApplicationAttemptReport createFakeApplicationAttemptReport() { return ApplicationAttemptReport.newInstance( - createFakeApplicationAttemptId(), "localhost", 0, "", "", + createFakeApplicationAttemptId(), "localhost", 0, "", "", "", YarnApplicationAttemptState.RUNNING, createFakeContainerId()); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAHSClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAHSClient.java index 797faa5..d3c182b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAHSClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAHSClient.java @@ -346,6 +346,7 @@ private void createAppReports() { "host", 124, "url", + "oUrl", "diagnostics", YarnApplicationAttemptState.FINISHED, ContainerId.newInstance( @@ -357,6 +358,7 @@ private void createAppReports() { "host", 124, "url", + "oUrl", "diagnostics", YarnApplicationAttemptState.FINISHED, ContainerId.newInstance( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 6407f7a..8259893 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -457,6 +457,7 @@ public void setYarnApplicationState(YarnApplicationState state) { "host", 124, "url", + "oUrl", "diagnostics", YarnApplicationAttemptState.FINISHED, ContainerId.newInstance( @@ -467,6 +468,7 @@ public void setYarnApplicationState(YarnApplicationState state) { "host", 124, "url", + "oUrl", "diagnostics", YarnApplicationAttemptState.FINISHED, ContainerId.newInstance( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java index b408b61..66a54dd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java @@ -127,7 +127,7 @@ public void testGetApplicationAttemptReport() throws Exception { ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( applicationId, 1); ApplicationAttemptReport attemptReport = ApplicationAttemptReport - .newInstance(attemptId, "host", 124, "url", "diagnostics", + .newInstance(attemptId, "host", 124, "url", "oUrl", "diagnostics", YarnApplicationAttemptState.FINISHED, ContainerId.newInstance( attemptId, 1)); when( @@ -163,11 +163,11 @@ public void testGetApplicationAttempts() throws Exception { ApplicationAttemptId attemptId1 = ApplicationAttemptId.newInstance( applicationId, 2); ApplicationAttemptReport attemptReport = ApplicationAttemptReport - .newInstance(attemptId, "host", 124, "url", "diagnostics", + .newInstance(attemptId, "host", 124, "url", "oUrl", "diagnostics", YarnApplicationAttemptState.FINISHED, ContainerId.newInstance( attemptId, 1)); ApplicationAttemptReport attemptReport1 = ApplicationAttemptReport - .newInstance(attemptId1, "host", 124, "url", "diagnostics", + .newInstance(attemptId1, "host", 124, "url", "oUrl", "diagnostics", YarnApplicationAttemptState.FINISHED, ContainerId.newInstance( attemptId1, 1)); List reports = new ArrayList(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptReportPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptReportPBImpl.java index 8999987..c3c0221 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptReportPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptReportPBImpl.java @@ -88,6 +88,15 @@ public String getTrackingUrl() { } @Override + public String getOriginalTrackingUrl() { + ApplicationAttemptReportProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasOriginalTrackingUrl()) { + return null; + } + return p.getOriginalTrackingUrl(); + } + + @Override public String getDiagnostics() { ApplicationAttemptReportProtoOrBuilder p = viaProto ? proto : builder; if (!p.hasDiagnostics()) { @@ -161,6 +170,16 @@ public void setTrackingUrl(String url) { } @Override + public void setOriginalTrackingUrl(String oUrl) { + maybeInitBuilder(); + if (oUrl == null) { + builder.clearOriginalTrackingUrl(); + return; + } + builder.setOriginalTrackingUrl(oUrl); + } + + @Override public void setDiagnostics(String diagnostics) { maybeInitBuilder(); if (diagnostics == null) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/HtmlBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/HtmlBlock.java index 8f885bb..6ee0d1c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/HtmlBlock.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/HtmlBlock.java @@ -21,6 +21,7 @@ import java.io.PrintWriter; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.webapp.MimeType; import org.apache.hadoop.yarn.webapp.SubView; import org.apache.hadoop.yarn.webapp.WebAppException; @@ -81,4 +82,15 @@ public void renderPartial() { * @param html the block to render */ protected abstract void render(Block html); + + protected UserGroupInformation getCallerUGI() { + // Check for the authorization. + String remoteUser = request().getRemoteUser(); + UserGroupInformation callerUGI = null; + if (remoteUser != null) { + callerUGI = UserGroupInformation.createRemoteUser(remoteUser); + } + return callerUGI; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 9b4a90f..aa75fbd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -616,23 +616,31 @@ - Number of worker threads that write the history data. - yarn.resourcemanager.history-writer.multi-threaded-dispatcher.pool-size - 10 - - - The class to use as the configuration provider. If org.apache.hadoop.yarn.LocalConfigurationProvider is used, the local configuration will be loaded. If org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider is used, the configuration which will be loaded should be uploaded to remote File system first. - > + yarn.resourcemanager.configuration.provider-class org.apache.hadoop.yarn.LocalConfigurationProvider + + The setting that controls whether yarn system metrics is + published on the timeline server or not by RM. + yarn.resourcemanager.metrics-publisher.enabled + false + + + + Number of worker threads that send the yarn system metrics + data. + yarn.resourcemanager.metrics-publisher.dispatcher.pool-size + 10 + + The hostname of the NM. @@ -1307,38 +1315,6 @@ /etc/krb5.keytab - - Indicate to ResourceManager as well as clients whether - history-service is enabled or not. If enabled, ResourceManager starts - recording historical data that ApplicationHistory service can consume. - Similarly, clients can redirect to the history service when applications - finish if this is enabled. - yarn.timeline-service.generic-application-history.enabled - false - - - - URI pointing to the location of the FileSystem path where - the history will be persisted. This must be supplied when using - org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemApplicationHistoryStore - as the value for yarn.timeline-service.generic-application-history.store-class - yarn.timeline-service.generic-application-history.fs-history-store.uri - ${hadoop.tmp.dir}/yarn/timeline/generic-history - - - - T-file compression types used to compress history data. - yarn.timeline-service.generic-application-history.fs-history-store.compression-type - none - - - - Store class name for history store, defaulting to file - system store - yarn.timeline-service.generic-application-history.store-class - org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemApplicationHistoryStore - - The interval that the yarn client library uses to poll the diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerImpl.java index b56a595..803dc01 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerImpl.java @@ -163,7 +163,7 @@ private ApplicationAttemptReport convertToApplicationAttemptReport( ApplicationAttemptHistoryData appAttemptHistory) { return ApplicationAttemptReport.newInstance( appAttemptHistory.getApplicationAttemptId(), appAttemptHistory.getHost(), - appAttemptHistory.getRPCPort(), appAttemptHistory.getTrackingURL(), + appAttemptHistory.getRPCPort(), appAttemptHistory.getTrackingURL(), null, appAttemptHistory.getDiagnosticsInfo(), appAttemptHistory.getYarnApplicationAttemptState(), appAttemptHistory.getMasterContainerId()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerOnTimelineStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerOnTimelineStore.java new file mode 100644 index 0000000..23b9218 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerOnTimelineStore.java @@ -0,0 +1,511 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; +import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.server.timeline.NameValuePair; +import org.apache.hadoop.yarn.server.timeline.TimelineDataManager; +import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; + +public class ApplicationHistoryManagerOnTimelineStore extends AbstractService + implements + ApplicationHistoryManager { + + private TimelineDataManager timelineDataManager; + private String serverHttpAddress; + + public ApplicationHistoryManagerOnTimelineStore( + TimelineDataManager timelineDataManager) { + super(ApplicationHistoryManagerOnTimelineStore.class.getName()); + this.timelineDataManager = timelineDataManager; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + serverHttpAddress = WebAppUtils.getHttpSchemePrefix(conf) + + WebAppUtils.getAHSWebAppURLWithoutScheme(conf); + super.serviceInit(conf); + } + + @Override + public ApplicationReport getApplication(ApplicationId appId) + throws YarnException, IOException { + return getApplication(appId, ApplicationReportField.ALL); + } + + @Override + public Map getAllApplications() + throws YarnException, IOException { + TimelineEntities entities = timelineDataManager.getEntities( + ApplicationMetricsConstants.ENTITY_TYPE, null, null, null, null, + null, null, Long.MAX_VALUE, EnumSet.allOf(Field.class), + UserGroupInformation.getLoginUser()); + Map apps = + new HashMap(); + if (entities != null && entities.getEntities() != null) { + for (TimelineEntity entity : entities.getEntities()) { + ApplicationReport app = + generateApplicationReport(entity, ApplicationReportField.ALL); + apps.put(app.getApplicationId(), app); + } + } + return apps; + } + + @Override + public Map + getApplicationAttempts(ApplicationId appId) + throws YarnException, IOException { + TimelineEntities entities = timelineDataManager.getEntities( + AppAttemptMetricsConstants.ENTITY_TYPE, + new NameValuePair( + AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER, appId + .toString()), null, null, null, null, null, + Long.MAX_VALUE, EnumSet.allOf(Field.class), + UserGroupInformation.getLoginUser()); + Map appAttempts = + new HashMap(); + if (entities != null && entities.getEntities() != null) { + for (TimelineEntity entity : entities.getEntities()) { + ApplicationAttemptReport appAttempt = + convertToApplicationAttemptReport(entity); + appAttempts.put(appAttempt.getApplicationAttemptId(), appAttempt); + } + } else { + // It is likely that the attemtps are not found due to non-existing + // application. In this case, we need to throw ApplicationNotFoundException. + getApplication(appId, ApplicationReportField.NONE); + } + return appAttempts; + } + + @Override + public ApplicationAttemptReport getApplicationAttempt( + ApplicationAttemptId appAttemptId) throws YarnException, IOException { + TimelineEntity entity = timelineDataManager.getEntity( + AppAttemptMetricsConstants.ENTITY_TYPE, + appAttemptId.toString(), EnumSet.allOf(Field.class), + UserGroupInformation.getLoginUser()); + if (entity == null) { + // Will throw ApplicationNotFoundException first + getApplication(appAttemptId.getApplicationId(), ApplicationReportField.NONE); + throw new ApplicationAttemptNotFoundException( + "The entity for application attempt " + appAttemptId + + " doesn't exist in the timeline store"); + } else { + return convertToApplicationAttemptReport(entity); + } + } + + @Override + public ContainerReport getContainer(ContainerId containerId) + throws YarnException, IOException { + ApplicationReport app = getApplication( + containerId.getApplicationAttemptId().getApplicationId(), + ApplicationReportField.USER); + TimelineEntity entity = timelineDataManager.getEntity( + ContainerMetricsConstants.ENTITY_TYPE, + containerId.toString(), EnumSet.allOf(Field.class), + UserGroupInformation.getLoginUser()); + if (entity == null) { + throw new ContainerNotFoundException( + "The entity for container " + containerId + + " doesn't exist in the timeline store"); + } else { + return convertToContainerReport(entity, serverHttpAddress, app.getUser()); + } + } + + @Override + public ContainerReport getAMContainer(ApplicationAttemptId appAttemptId) + throws YarnException, IOException { + ApplicationAttemptReport appAttempt = getApplicationAttempt(appAttemptId); + return getContainer(appAttempt.getAMContainerId()); + } + + @Override + public Map getContainers( + ApplicationAttemptId appAttemptId) throws YarnException, IOException { + ApplicationReport app = getApplication( + appAttemptId.getApplicationId(), ApplicationReportField.USER); + TimelineEntities entities = timelineDataManager.getEntities( + ContainerMetricsConstants.ENTITY_TYPE, + new NameValuePair( + ContainerMetricsConstants.PARENT_PRIMARIY_FILTER, + appAttemptId.toString()), null, null, null, + null, null, Long.MAX_VALUE, EnumSet.allOf(Field.class), + UserGroupInformation.getLoginUser()); + Map containers = + new HashMap(); + if (entities != null && entities.getEntities() != null) { + for (TimelineEntity entity : entities.getEntities()) { + ContainerReport container = + convertToContainerReport(entity, serverHttpAddress, app.getUser()); + containers.put(container.getContainerId(), container); + } + } + return containers; + } + + private static ApplicationReport convertToApplicationReport( + TimelineEntity entity, ApplicationReportField field) { + String user = null; + String queue = null; + String name = null; + String type = null; + long createdTime = 0; + long finishedTime = 0; + ApplicationAttemptId latestApplicationAttemptId = null; + String diagnosticsInfo = null; + FinalApplicationStatus finalStatus = FinalApplicationStatus.UNDEFINED; + YarnApplicationState state = null; + if (field == ApplicationReportField.NONE) { + return ApplicationReport.newInstance( + ConverterUtils.toApplicationId(entity.getEntityId()), + latestApplicationAttemptId, user, queue, name, null, -1, null, state, + diagnosticsInfo, null, createdTime, finishedTime, finalStatus, null, + null, 1.0F, type, null); + } + Map entityInfo = entity.getOtherInfo(); + if (entityInfo != null) { + if (entityInfo.containsKey(ApplicationMetricsConstants.USER_ENTITY_INFO)) { + user = + entityInfo.get(ApplicationMetricsConstants.USER_ENTITY_INFO) + .toString(); + } + if (field == ApplicationReportField.USER) { + return ApplicationReport.newInstance( + ConverterUtils.toApplicationId(entity.getEntityId()), + latestApplicationAttemptId, user, queue, name, null, -1, null, state, + diagnosticsInfo, null, createdTime, finishedTime, finalStatus, null, + null, 1.0F, type, null); + } + if (entityInfo.containsKey(ApplicationMetricsConstants.QUEUE_ENTITY_INFO)) { + queue = + entityInfo.get(ApplicationMetricsConstants.QUEUE_ENTITY_INFO) + .toString(); + } + if (entityInfo.containsKey(ApplicationMetricsConstants.NAME_ENTITY_INFO)) { + name = + entityInfo.get(ApplicationMetricsConstants.NAME_ENTITY_INFO) + .toString(); + } + if (entityInfo.containsKey(ApplicationMetricsConstants.TYPE_ENTITY_INFO)) { + type = + entityInfo.get(ApplicationMetricsConstants.TYPE_ENTITY_INFO) + .toString(); + } + } + List events = entity.getEvents(); + if (events != null) { + for (TimelineEvent event : events) { + if (event.getEventType().equals( + ApplicationMetricsConstants.CREATED_EVENT_TYPE)) { + createdTime = event.getTimestamp(); + } else if (event.getEventType().equals( + ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) { + finishedTime = event.getTimestamp(); + Map eventInfo = event.getEventInfo(); + if (eventInfo == null) { + continue; + } + if (eventInfo + .containsKey(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO)) { + latestApplicationAttemptId = + ConverterUtils + .toApplicationAttemptId( + eventInfo + .get( + ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO) + .toString()); + } + if (eventInfo + .containsKey(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)) { + diagnosticsInfo = + eventInfo.get( + ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO) + .toString(); + } + if (eventInfo + .containsKey(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO)) { + finalStatus = + FinalApplicationStatus.valueOf(eventInfo.get( + ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO) + .toString()); + } + if (eventInfo + .containsKey(ApplicationMetricsConstants.STATE_EVENT_INFO)) { + state = + YarnApplicationState.valueOf(eventInfo.get( + ApplicationMetricsConstants.STATE_EVENT_INFO).toString()); + } + } + } + } + return ApplicationReport.newInstance( + ConverterUtils.toApplicationId(entity.getEntityId()), + latestApplicationAttemptId, user, queue, name, null, -1, null, state, + diagnosticsInfo, null, createdTime, finishedTime, finalStatus, null, + null, 1.0F, type, null); + } + + private static ApplicationAttemptReport convertToApplicationAttemptReport( + TimelineEntity entity) { + String host = null; + int rpcPort = -1; + ContainerId amContainerId = null; + String trackingUrl = null; + String originalTrackingUrl = null; + String diagnosticsInfo = null; + YarnApplicationAttemptState state = null; + List events = entity.getEvents(); + if (events != null) { + for (TimelineEvent event : events) { + if (event.getEventType().equals( + AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE)) { + Map eventInfo = event.getEventInfo(); + if (eventInfo == null) { + continue; + } + if (eventInfo.containsKey(AppAttemptMetricsConstants.HOST_EVENT_INFO)) { + host = + eventInfo.get(AppAttemptMetricsConstants.HOST_EVENT_INFO) + .toString(); + } + if (eventInfo + .containsKey(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO)) { + rpcPort = (Integer) eventInfo.get( + AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO); + } + if (eventInfo + .containsKey(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO)) { + amContainerId = + ConverterUtils.toContainerId(eventInfo.get( + AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO) + .toString()); + } + } else if (event.getEventType().equals( + AppAttemptMetricsConstants.FINISHED_EVENT_TYPE)) { + Map eventInfo = event.getEventInfo(); + if (eventInfo == null) { + continue; + } + if (eventInfo + .containsKey(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO)) { + trackingUrl = + eventInfo.get( + AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO) + .toString(); + } + if (eventInfo + .containsKey(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO)) { + originalTrackingUrl = + eventInfo + .get( + AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO) + .toString(); + } + if (eventInfo + .containsKey(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)) { + diagnosticsInfo = + eventInfo.get( + AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO) + .toString(); + } + if (eventInfo + .containsKey(AppAttemptMetricsConstants.STATE_EVENT_INFO)) { + state = + YarnApplicationAttemptState.valueOf(eventInfo.get( + AppAttemptMetricsConstants.STATE_EVENT_INFO) + .toString()); + } + } + } + } + return ApplicationAttemptReport.newInstance( + ConverterUtils.toApplicationAttemptId(entity.getEntityId()), + host, rpcPort, trackingUrl, originalTrackingUrl, diagnosticsInfo, + state, amContainerId); + } + + private static ContainerReport convertToContainerReport( + TimelineEntity entity, String serverHttpAddress, String user) { + int allocatedMem = 0; + int allocatedVcore = 0; + String allocatedHost = null; + int allocatedPort = -1; + int allocatedPriority = 0; + long createdTime = 0; + long finishedTime = 0; + String diagnosticsInfo = null; + int exitStatus = ContainerExitStatus.INVALID; + ContainerState state = null; + Map entityInfo = entity.getOtherInfo(); + if (entityInfo != null) { + if (entityInfo + .containsKey(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO)) { + allocatedMem = (Integer) entityInfo.get( + ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO); + } + if (entityInfo + .containsKey(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO)) { + allocatedVcore = (Integer) entityInfo.get( + ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO); + } + if (entityInfo + .containsKey(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO)) { + allocatedHost = + entityInfo + .get(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO) + .toString(); + } + if (entityInfo + .containsKey(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO)) { + allocatedPort = (Integer) entityInfo.get( + ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO); + } + if (entityInfo + .containsKey(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO)) { + allocatedPriority = (Integer) entityInfo.get( + ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO); + } + } + List events = entity.getEvents(); + if (events != null) { + for (TimelineEvent event : events) { + if (event.getEventType().equals( + ContainerMetricsConstants.CREATED_EVENT_TYPE)) { + createdTime = event.getTimestamp(); + } else if (event.getEventType().equals( + ContainerMetricsConstants.FINISHED_EVENT_TYPE)) { + finishedTime = event.getTimestamp(); + Map eventInfo = event.getEventInfo(); + if (eventInfo == null) { + continue; + } + if (eventInfo + .containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)) { + diagnosticsInfo = + eventInfo.get( + ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO) + .toString(); + } + if (eventInfo + .containsKey(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO)) { + exitStatus = (Integer) eventInfo.get( + ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO); + } + if (eventInfo + .containsKey(ContainerMetricsConstants.STATE_EVENT_INFO)) { + state = + ContainerState.valueOf(eventInfo.get( + ContainerMetricsConstants.STATE_EVENT_INFO).toString()); + } + } + } + } + NodeId allocatedNode = NodeId.newInstance(allocatedHost, allocatedPort); + ContainerId containerId = + ConverterUtils.toContainerId(entity.getEntityId()); + String logUrl = WebAppUtils.getAggregatedLogURL( + serverHttpAddress, + allocatedNode.toString(), + containerId.toString(), + containerId.toString(), + user); + return ContainerReport.newInstance( + ConverterUtils.toContainerId(entity.getEntityId()), + Resource.newInstance(allocatedMem, allocatedVcore), + NodeId.newInstance(allocatedHost, allocatedPort), + Priority.newInstance(allocatedPriority), + createdTime, finishedTime, diagnosticsInfo, logUrl, exitStatus, state); + } + + private ApplicationReport generateApplicationReport(TimelineEntity entity, + ApplicationReportField field) throws YarnException, IOException { + ApplicationReport app = convertToApplicationReport(entity, field); + if (field == ApplicationReportField.ALL && + app != null && app.getCurrentApplicationAttemptId() != null) { + ApplicationAttemptReport appAttempt = + getApplicationAttempt(app.getCurrentApplicationAttemptId()); + if (appAttempt != null) { + app.setHost(appAttempt.getHost()); + app.setRpcPort(appAttempt.getRpcPort()); + app.setTrackingUrl(appAttempt.getTrackingUrl()); + app.setOriginalTrackingUrl(appAttempt.getOriginalTrackingUrl()); + } + } + return app; + } + + private ApplicationReport getApplication(ApplicationId appId, + ApplicationReportField field) throws YarnException, IOException { + TimelineEntity entity = timelineDataManager.getEntity( + ApplicationMetricsConstants.ENTITY_TYPE, + appId.toString(), EnumSet.allOf(Field.class), + UserGroupInformation.getLoginUser()); + if (entity == null) { + throw new ApplicationNotFoundException("The entity for application " + + appId + " doesn't exist in the timeline store"); + } else { + return generateApplicationReport(entity, field); + } + } + + private static enum ApplicationReportField { + ALL, // retrieve all the fields + NONE, // retrieve no fields + USER // retrieve user info only + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java index 158f2e6..c0ea94b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java @@ -170,7 +170,15 @@ public static void main(String[] args) { private ApplicationHistoryManager createApplicationHistoryManager( Configuration conf) { - return new ApplicationHistoryManagerImpl(); + // Backward compatibility: + // APPLICATION_HISTORY_STORE is not null, it means that the user has enabled + // it explicitly. + if (conf.get(YarnConfiguration.APPLICATION_HISTORY_STORE) == null) { + return new ApplicationHistoryManagerOnTimelineStore(timelineDataManager); + } else { + LOG.warn("The filesystem based application history store is deprecated."); + return new ApplicationHistoryManagerImpl(); + } } private TimelineStore createTimelineStore( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java index a2d9140..7840dd1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java @@ -117,7 +117,8 @@ protected FileSystem getFileSystem(Path path, Configuration conf) throws Excepti @Override public void serviceInit(Configuration conf) throws Exception { Path fsWorkingPath = - new Path(conf.get(YarnConfiguration.FS_APPLICATION_HISTORY_STORE_URI)); + new Path(conf.get(YarnConfiguration.FS_APPLICATION_HISTORY_STORE_URI, + conf.get("hadoop.tmp.dir") + "/yarn/timeline/generic-history")); rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME); try { fs = getFileSystem(fsWorkingPath, conf); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java index b94711c..099d5ef 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java @@ -49,6 +49,8 @@ * they may encounter reading and writing history data in different memory * store. * + * The methods are synchronized to avoid concurrent modification on the memory. + * */ @Private @Unstable @@ -65,7 +67,7 @@ public MemoryTimelineStore() { } @Override - public TimelineEntities getEntities(String entityType, Long limit, + public synchronized TimelineEntities getEntities(String entityType, Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs, NameValuePair primaryFilter, Collection secondaryFilters, EnumSet fields) { @@ -148,7 +150,7 @@ public TimelineEntities getEntities(String entityType, Long limit, } @Override - public TimelineEntity getEntity(String entityId, String entityType, + public synchronized TimelineEntity getEntity(String entityId, String entityType, EnumSet fieldsToRetrieve) { if (fieldsToRetrieve == null) { fieldsToRetrieve = EnumSet.allOf(Field.class); @@ -162,7 +164,7 @@ public TimelineEntity getEntity(String entityId, String entityType, } @Override - public TimelineEvents getEntityTimelines(String entityType, + public synchronized TimelineEvents getEntityTimelines(String entityType, SortedSet entityIds, Long limit, Long windowStart, Long windowEnd, Set eventTypes) { @@ -209,7 +211,7 @@ public TimelineEvents getEntityTimelines(String entityType, } @Override - public TimelinePutResponse put(TimelineEntities data) { + public synchronized TimelinePutResponse put(TimelineEntities data) { TimelinePutResponse response = new TimelinePutResponse(); for (TimelineEntity entity : data.getEntities()) { EntityIdentifier entityId = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java index 7b5b74b..ad2907b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java @@ -64,6 +64,7 @@ public void setup() { WebAppUtils.getAHSWebAppURLWithoutScheme(config) + "/applicationhistory/logs/localhost:0/container_0_0001_01_000001/" + "container_0_0001_01_000001/test user"; + config.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, true); config.setClass(YarnConfiguration.APPLICATION_HISTORY_STORE, MemoryApplicationHistoryStore.class, ApplicationHistoryStore.class); historyServer.init(config); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java new file mode 100644 index 0000000..65eafc6 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore; +import org.apache.hadoop.yarn.server.timeline.TimelineDataManager; +import org.apache.hadoop.yarn.server.timeline.TimelineStore; +import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestApplicationHistoryManagerOnTimelineStore { + + private static ApplicationHistoryManagerOnTimelineStore historyManager; + private static final int SCALE = 5; + + @BeforeClass + public static void setup() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + TimelineStore store = new MemoryTimelineStore(); + prepareTimelineStore(store); + TimelineACLsManager aclsManager = new TimelineACLsManager(conf); + TimelineDataManager dataManager = + new TimelineDataManager(store, aclsManager); + historyManager = new ApplicationHistoryManagerOnTimelineStore(dataManager); + historyManager.init(conf); + historyManager.start(); + } + + @AfterClass + public static void tearDown() { + if (historyManager != null) { + historyManager.stop(); + } + } + + private static void prepareTimelineStore(TimelineStore store) + throws Exception { + for (int i = 1; i <= SCALE; ++i) { + TimelineEntities entities = new TimelineEntities(); + ApplicationId appId = ApplicationId.newInstance(0, i); + entities.addEntity(createApplicationTimelineEntity(appId)); + store.put(entities); + for (int j = 1; j <= SCALE; ++j) { + entities = new TimelineEntities(); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, j); + entities.addEntity(createAppAttemptTimelineEntity(appAttemptId)); + store.put(entities); + for (int k = 1; k <= SCALE; ++k) { + entities = new TimelineEntities(); + ContainerId containerId = ContainerId.newInstance(appAttemptId, k); + entities.addEntity(createContainerEntity(containerId)); + store.put(entities); + } + } + } + } + + @Test + public void testGetApplicationReport() throws Exception { + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationReport app = historyManager.getApplication(appId); + Assert.assertNotNull(app); + Assert.assertEquals(appId, app.getApplicationId()); + Assert.assertEquals("test app", app.getName()); + Assert.assertEquals("test app type", app.getApplicationType()); + Assert.assertEquals("test user", app.getUser()); + Assert.assertEquals("test queue", app.getQueue()); + Assert.assertEquals(Integer.MAX_VALUE + 2L, app.getStartTime()); + Assert.assertEquals(Integer.MAX_VALUE + 3L, app.getFinishTime()); + Assert.assertTrue(Math.abs(app.getProgress() - 1.0F) < 0.0001); + Assert.assertEquals("test host", app.getHost()); + Assert.assertEquals(-100, app.getRpcPort()); + Assert.assertEquals("test tracking url", app.getTrackingUrl()); + Assert.assertEquals("test original tracking url", + app.getOriginalTrackingUrl()); + Assert.assertEquals("test diagnostics info", app.getDiagnostics()); + Assert.assertEquals(FinalApplicationStatus.UNDEFINED, + app.getFinalApplicationStatus()); + Assert.assertEquals(YarnApplicationState.FINISHED, + app.getYarnApplicationState()); + } + + @Test + public void testGetApplicationAttemptReport() throws Exception { + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1); + ApplicationAttemptReport appAttempt = + historyManager.getApplicationAttempt(appAttemptId); + Assert.assertNotNull(appAttempt); + Assert.assertEquals(appAttemptId, appAttempt.getApplicationAttemptId()); + Assert.assertEquals(ContainerId.newInstance(appAttemptId, 1), + appAttempt.getAMContainerId()); + Assert.assertEquals("test host", appAttempt.getHost()); + Assert.assertEquals(-100, appAttempt.getRpcPort()); + Assert.assertEquals("test tracking url", appAttempt.getTrackingUrl()); + Assert.assertEquals("test original tracking url", + appAttempt.getOriginalTrackingUrl()); + Assert.assertEquals("test diagnostics info", appAttempt.getDiagnostics()); + Assert.assertEquals(YarnApplicationAttemptState.FINISHED, + appAttempt.getYarnApplicationAttemptState()); + } + + @Test + public void testGetContainerReport() throws Exception { + ContainerId containerId = + ContainerId.newInstance(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 1), 1), 1); + ContainerReport container = historyManager.getContainer(containerId); + Assert.assertNotNull(container); + Assert.assertEquals(Integer.MAX_VALUE + 1L, container.getCreationTime()); + Assert.assertEquals(Integer.MAX_VALUE + 2L, container.getFinishTime()); + Assert.assertEquals(Resource.newInstance(-1, -1), + container.getAllocatedResource()); + Assert.assertEquals(NodeId.newInstance("test host", -100), + container.getAssignedNode()); + Assert.assertEquals(Priority.UNDEFINED, container.getPriority()); + Assert + .assertEquals("test diagnostics info", container.getDiagnosticsInfo()); + Assert.assertEquals(ContainerState.COMPLETE, container.getContainerState()); + Assert.assertEquals(-1, container.getContainerExitStatus()); + Assert.assertEquals("http://0.0.0.0:8188/applicationhistory/logs/" + + "test host:-100/container_0_0001_01_000001/" + + "container_0_0001_01_000001/test user", container.getLogUrl()); + } + + @Test + public void testGetApplications() throws Exception { + Collection apps = + historyManager.getAllApplications().values(); + Assert.assertNotNull(apps); + Assert.assertEquals(SCALE, apps.size()); + } + + @Test + public void testGetApplicationAttempts() throws Exception { + Collection appAttempts = + historyManager.getApplicationAttempts(ApplicationId.newInstance(0, 1)) + .values(); + Assert.assertNotNull(appAttempts); + Assert.assertEquals(SCALE, appAttempts.size()); + } + + @Test + public void testGetContainers() throws Exception { + Collection containers = + historyManager + .getContainers( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 1), 1)).values(); + Assert.assertNotNull(containers); + Assert.assertEquals(SCALE, containers.size()); + } + + @Test + public void testGetAMContainer() throws Exception { + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1); + ContainerReport container = historyManager.getAMContainer(appAttemptId); + Assert.assertNotNull(container); + Assert.assertEquals(appAttemptId, container.getContainerId() + .getApplicationAttemptId()); + } + + private static TimelineEntity createApplicationTimelineEntity( + ApplicationId appId) { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE); + entity.setEntityId(appId.toString()); + Map entityInfo = new HashMap(); + entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, "test app"); + entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO, + "test app type"); + entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, "test user"); + entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, "test queue"); + entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO, + Integer.MAX_VALUE + 1L); + entity.setOtherInfo(entityInfo); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + tEvent.setTimestamp(Integer.MAX_VALUE + 2L); + entity.addEvent(tEvent); + tEvent = new TimelineEvent(); + tEvent.setEventType( + ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(Integer.MAX_VALUE + 3L); + Map eventInfo = new HashMap(); + eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + "test diagnostics info"); + eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, + FinalApplicationStatus.UNDEFINED.toString()); + eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, + YarnApplicationState.FINISHED.toString()); + eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO, + ApplicationAttemptId.newInstance(appId, 1)); + tEvent.setEventInfo(eventInfo); + entity.addEvent(tEvent); + return entity; + } + + private static TimelineEntity createAppAttemptTimelineEntity( + ApplicationAttemptId appAttemptId) { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityType(AppAttemptMetricsConstants.ENTITY_TYPE); + entity.setEntityId(appAttemptId.toString()); + entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER, + appAttemptId.getApplicationId().toString()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE); + tEvent.setTimestamp(Integer.MAX_VALUE + 1L); + Map eventInfo = new HashMap(); + eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, + "test tracking url"); + eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, + "test original tracking url"); + eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, "test host"); + eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO, -100); + eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, + ContainerId.newInstance(appAttemptId, 1)); + tEvent.setEventInfo(eventInfo); + entity.addEvent(tEvent); + tEvent = new TimelineEvent(); + tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(Integer.MAX_VALUE + 2L); + eventInfo = new HashMap(); + eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, + "test tracking url"); + eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, + "test original tracking url"); + eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + "test diagnostics info"); + eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, + FinalApplicationStatus.UNDEFINED.toString()); + eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, + YarnApplicationAttemptState.FINISHED.toString()); + tEvent.setEventInfo(eventInfo); + entity.addEvent(tEvent); + return entity; + } + + private static TimelineEntity createContainerEntity(ContainerId containerId) { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityType(ContainerMetricsConstants.ENTITY_TYPE); + entity.setEntityId(containerId.toString()); + entity.addPrimaryFilter(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER, + containerId.getApplicationAttemptId().toString()); + Map entityInfo = new HashMap(); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, -1); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, -1); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, + "test host"); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, -100); + entityInfo + .put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, -1); + entity.setOtherInfo(entityInfo); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE); + tEvent.setTimestamp(Integer.MAX_VALUE + 1L); + entity.addEvent(tEvent); + ; + tEvent = new TimelineEvent(); + tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(Integer.MAX_VALUE + 2L); + Map eventInfo = new HashMap(); + eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + "test diagnostics info"); + eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, -1); + eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, + ContainerState.COMPLETE.toString()); + tEvent.setEventInfo(eventInfo); + entity.addEvent(tEvent); + return entity; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ApplicationContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ApplicationContext.java index 78ae0dd..0e2ffdf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ApplicationContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ApplicationContext.java @@ -21,17 +21,14 @@ import java.io.IOException; import java.util.Map; -import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.exceptions.YarnException; -@Public -@Unstable public interface ApplicationContext { /** * This method returns Application {@link ApplicationReport} for the specified @@ -40,21 +37,21 @@ * @param appId * * @return {@link ApplicationReport} for the ApplicationId. + * @throws YarnException * @throws IOException */ - @Public - @Unstable - ApplicationReport getApplication(ApplicationId appId) throws IOException; + ApplicationReport getApplication(ApplicationId appId) + throws YarnException, IOException; /** * This method returns all Application {@link ApplicationReport}s * * @return map of {@link ApplicationId} to {@link ApplicationReport}s. + * @throws YarnException * @throws IOException */ - @Public - @Unstable - Map getAllApplications() throws IOException; + Map getAllApplications() + throws YarnException, IOException; /** * Application can have multiple application attempts @@ -64,12 +61,11 @@ * @param appId * * @return all {@link ApplicationAttemptReport}s for the Application. + * @throws YarnException * @throws IOException */ - @Public - @Unstable Map getApplicationAttempts( - ApplicationId appId) throws IOException; + ApplicationId appId) throws YarnException, IOException; /** * This method returns {@link ApplicationAttemptReport} for specified @@ -78,12 +74,11 @@ * @param appAttemptId * {@link ApplicationAttemptId} * @return {@link ApplicationAttemptReport} for ApplicationAttemptId + * @throws YarnException * @throws IOException */ - @Public - @Unstable ApplicationAttemptReport getApplicationAttempt( - ApplicationAttemptId appAttemptId) throws IOException; + ApplicationAttemptId appAttemptId) throws YarnException, IOException; /** * This method returns {@link ContainerReport} for specified @@ -92,11 +87,11 @@ ApplicationAttemptReport getApplicationAttempt( * @param containerId * {@link ContainerId} * @return {@link ContainerReport} for ContainerId + * @throws YarnException * @throws IOException */ - @Public - @Unstable - ContainerReport getContainer(ContainerId containerId) throws IOException; + ContainerReport getContainer(ContainerId containerId) + throws YarnException, IOException; /** * This method returns {@link ContainerReport} for specified @@ -105,12 +100,11 @@ ApplicationAttemptReport getApplicationAttempt( * @param appAttemptId * {@link ApplicationAttemptId} * @return {@link ContainerReport} for ApplicationAttemptId + * @throws YarnException * @throws IOException */ - @Public - @Unstable ContainerReport getAMContainer(ApplicationAttemptId appAttemptId) - throws IOException; + throws YarnException, IOException; /** * This method returns Map of {@link ContainerId} to {@link ContainerReport} @@ -120,10 +114,9 @@ ContainerReport getAMContainer(ApplicationAttemptId appAttemptId) * {@link ApplicationAttemptId} * @return Map of {@link ContainerId} to {@link ContainerReport} for * ApplicationAttemptId + * @throws YarnException * @throws IOException */ - @Public - @Unstable Map getContainers( - ApplicationAttemptId appAttemptId) throws IOException; + ApplicationAttemptId appAttemptId) throws YarnException, IOException; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/AppAttemptMetricsConstants.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/AppAttemptMetricsConstants.java new file mode 100644 index 0000000..a7809cf --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/AppAttemptMetricsConstants.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.metrics; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +@Private +@Unstable +public class AppAttemptMetricsConstants { + + public static final String ENTITY_TYPE = + "YARN_APPLICATION_ATTEMPT"; + + public static final String REGISTERED_EVENT_TYPE = + "YARN_APPLICATION_ATTEMPT_REGISTERED"; + + public static final String FINISHED_EVENT_TYPE = + "YARN_APPLICATION_ATTEMPT_FINISHED"; + + public static final String PARENT_PRIMARY_FILTER = + "YARN_APPLICATION_ATTEMPT_PARENT"; + + public static final String TRACKING_URL_EVENT_INFO = + "YARN_APPLICATION_ATTEMPT_TRACKING_URL"; + + public static final String ORIGINAL_TRACKING_URL_EVENT_INFO = + "YARN_APPLICATION_ATTEMPT_ORIGINAL_TRACKING_URL"; + + public static final String HOST_EVENT_INFO = + "YARN_APPLICATION_ATTEMPT_HOST"; + + public static final String RPC_PORT_EVENT_INFO = + "YARN_APPLICATION_ATTEMPT_RPC_PORT"; + + public static final String MASTER_CONTAINER_EVENT_INFO = + "YARN_APPLICATION_ATTEMPT_MASTER_CONTAINER"; + + public static final String DIAGNOSTICS_INFO_EVENT_INFO = + "YARN_APPLICATION_ATTEMPT_DIAGNOSTICS_INFO"; + + public static final String FINAL_STATUS_EVENT_INFO = + "YARN_APPLICATION_ATTEMPT_FINAL_STATUS"; + + public static final String STATE_EVENT_INFO = + "YARN_APPLICATION_ATTEMPT_STATE"; + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ApplicationMetricsConstants.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ApplicationMetricsConstants.java new file mode 100644 index 0000000..f6a40bd --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ApplicationMetricsConstants.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.metrics; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +@Private +@Unstable +public class ApplicationMetricsConstants { + + public static final String ENTITY_TYPE = + "YARN_APPLICATION"; + + public static final String CREATED_EVENT_TYPE = + "YARN_APPLICATION_CREATED"; + + public static final String FINISHED_EVENT_TYPE = + "YARN_APPLICATION_FINISHED"; + + public static final String NAME_ENTITY_INFO = + "YARN_APPLICATION_NAME"; + + public static final String TYPE_ENTITY_INFO = + "YARN_APPLICATION_TYPE"; + + public static final String USER_ENTITY_INFO = + "YARN_APPLICATION_USER"; + + public static final String QUEUE_ENTITY_INFO = + "YARN_APPLICATION_QUEUE"; + + public static final String SUBMITTED_TIME_ENTITY_INFO = + "YARN_APPLICATION_SUBMITTED_TIME"; + + public static final String DIAGNOSTICS_INFO_EVENT_INFO = + "YARN_APPLICATION_DIAGNOSTICS_INFO"; + + public static final String FINAL_STATUS_EVENT_INFO = + "YARN_APPLICATION_FINAL_STATUS"; + + public static final String STATE_EVENT_INFO = + "YARN_APPLICATION_STATE"; + + public static final String LATEST_APP_ATTEMPT_EVENT_INFO = + "YARN_APPLICATION_LATEST_APP_ATTEMPT"; + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java new file mode 100644 index 0000000..8791da4 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.metrics; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +@Private +@Unstable +public class ContainerMetricsConstants { + + public static final String ENTITY_TYPE = "YARN_CONTAINER"; + + public static final String CREATED_EVENT_TYPE = "YARN_CONTAINER_CREATED"; + + public static final String FINISHED_EVENT_TYPE = "YARN_CONTAINER_FINISHED"; + + public static final String PARENT_PRIMARIY_FILTER = "YARN_CONTAINER_PARENT"; + + public static final String ALLOCATED_MEMORY_ENTITY_INFO = + "YARN_CONTAINER_ALLOCATED_MEMORY"; + + public static final String ALLOCATED_VCORE_ENTITY_INFO = + "YARN_CONTAINER_ALLOCATED_VCORE"; + + public static final String ALLOCATED_HOST_ENTITY_INFO = + "YARN_CONTAINER_ALLOCATED_HOST"; + + public static final String ALLOCATED_PORT_ENTITY_INFO = + "YARN_CONTAINER_ALLOCATED_PORT"; + + public static final String ALLOCATED_PRIORITY_ENTITY_INFO = + "YARN_CONTAINER_ALLOCATED_PRIORITY"; + + public static final String DIAGNOSTICS_INFO_EVENT_INFO = + "YARN_CONTAINER_DIAGNOSTICS_INFO"; + + public static final String EXIT_STATUS_EVENT_INFO = + "YARN_CONTAINER_EXIT_STATUS"; + + public static final String STATE_EVENT_INFO = + "YARN_CONTAINER_STATE"; + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppAttemptBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppAttemptBlock.java index 1a76eca..4a02892 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppAttemptBlock.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppAttemptBlock.java @@ -20,12 +20,13 @@ import static org.apache.hadoop.yarn.util.StringHelper.join; import static org.apache.hadoop.yarn.webapp.YarnWebParams.APPLICATION_ATTEMPT_ID; -import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.Collection; import org.apache.commons.lang.StringEscapeUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ContainerReport; @@ -67,10 +68,22 @@ protected void render(Block html) { return; } + final ApplicationAttemptId appAttemptIdFinal = appAttemptId; + UserGroupInformation callerUGI = getCallerUGI(); ApplicationAttemptReport appAttemptReport; try { - appAttemptReport = appContext.getApplicationAttempt(appAttemptId); - } catch (IOException e) { + if (callerUGI == null) { + appAttemptReport = appContext.getApplicationAttempt(appAttemptId); + } else { + appAttemptReport = callerUGI.doAs( + new PrivilegedExceptionAction () { + @Override + public ApplicationAttemptReport run() throws Exception { + return appContext.getApplicationAttempt(appAttemptIdFinal); + } + }); + } + } catch (Exception e) { String message = "Failed to read the application attempt " + appAttemptId + "."; LOG.error(message, e); @@ -108,8 +121,26 @@ protected void render(Block html) { Collection containers; try { - containers = appContext.getContainers(appAttemptId).values(); - } catch (IOException e) { + if (callerUGI == null) { + containers = appContext.getContainers(appAttemptId).values(); + } else { + containers = callerUGI.doAs( + new PrivilegedExceptionAction> () { + @Override + public Collection run() throws Exception { + return appContext.getContainers(appAttemptIdFinal).values(); + } + }); + } + } catch (RuntimeException e) { + // have this block to suppress the findbugs warning + html + .p() + ._( + "Sorry, Failed to get containers for application attempt" + attemptid + + ".")._(); + return; + } catch (Exception e) { html .p() ._( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java index 2ae495b..8fa4086 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java @@ -21,10 +21,11 @@ import static org.apache.hadoop.yarn.util.StringHelper.join; import static org.apache.hadoop.yarn.webapp.YarnWebParams.APPLICATION_ID; -import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.Collection; import org.apache.commons.lang.StringEscapeUtils; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -70,10 +71,22 @@ protected void render(Block html) { return; } + final ApplicationId appIDFinal = appID; + UserGroupInformation callerUGI = getCallerUGI(); ApplicationReport appReport; try { - appReport = appContext.getApplication(appID); - } catch (IOException e) { + if (callerUGI == null) { + appReport = appContext.getApplication(appID); + } else { + appReport = callerUGI.doAs( + new PrivilegedExceptionAction () { + @Override + public ApplicationReport run() throws Exception { + return appContext.getApplication(appIDFinal); + } + }); + } + } catch (Exception e) { String message = "Failed to read the application " + appID + "."; LOG.error(message, e); html.p()._(message)._(); @@ -106,8 +119,18 @@ protected void render(Block html) { Collection attempts; try { - attempts = appContext.getApplicationAttempts(appID).values(); - } catch (IOException e) { + if (callerUGI == null) { + attempts = appContext.getApplicationAttempts(appID).values(); + } else { + attempts = callerUGI.doAs( + new PrivilegedExceptionAction> () { + @Override + public Collection run() throws Exception { + return appContext.getApplicationAttempts(appIDFinal).values(); + } + }); + } + } catch (Exception e) { String message = "Failed to read the attempts of the application " + appID + "."; LOG.error(message, e); @@ -122,14 +145,24 @@ protected void render(Block html) { ._()._().tbody(); StringBuilder attemptsTableData = new StringBuilder("[\n"); - for (ApplicationAttemptReport appAttemptReport : attempts) { + for (final ApplicationAttemptReport appAttemptReport : attempts) { AppAttemptInfo appAttempt = new AppAttemptInfo(appAttemptReport); ContainerReport containerReport; try { - containerReport = - appContext.getAMContainer(appAttemptReport + if (callerUGI == null) { + containerReport = appContext.getAMContainer(appAttemptReport .getApplicationAttemptId()); - } catch (IOException e) { + } else { + containerReport = callerUGI.doAs( + new PrivilegedExceptionAction () { + @Override + public ContainerReport run() throws Exception { + return appContext.getAMContainer(appAttemptReport + .getApplicationAttemptId()); + } + }); + } + } catch (Exception e) { String message = "Failed to read the AM container of the application attempt " + appAttemptReport.getApplicationAttemptId() + "."; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppsBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppsBlock.java index d4a77a8..f341cf6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppsBlock.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppsBlock.java @@ -23,11 +23,12 @@ import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_PROGRESSBAR; import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_PROGRESSBAR_VALUE; -import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.Collection; import java.util.HashSet; import org.apache.commons.lang.StringEscapeUtils; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.server.api.ApplicationContext; @@ -70,10 +71,21 @@ public void render(Block html) { } } + UserGroupInformation callerUGI = getCallerUGI(); Collection appReports; try { - appReports = appContext.getAllApplications().values(); - } catch (IOException e) { + if (callerUGI == null) { + appReports = appContext.getAllApplications().values(); + } else { + appReports = callerUGI.doAs( + new PrivilegedExceptionAction> () { + @Override + public Collection run() throws Exception { + return appContext.getAllApplications().values(); + } + }); + } + } catch (Exception e) { String message = "Failed to read the applications."; LOG.error(message, e); html.p()._(message)._(); @@ -86,7 +98,7 @@ public void render(Block html) { continue; } AppInfo app = new AppInfo(appReport); - String percent = String.format("%.1f", app.getProgress()); + String percent = String.format("%.1f", app.getProgress() * 100.0F); // AppID numerical value parsed by parseHadoopID in yarn.dt.plugins.js appsTableData .append("[\" { + + public SystemMetricsEvent(SystemMetricsEventType type) { + super(type); + } + + public SystemMetricsEvent(SystemMetricsEventType type, long timestamp) { + super(type, timestamp); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEventType.java new file mode 100644 index 0000000..c593f69 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEventType.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.metrics; + + +public enum SystemMetricsEventType { + // app events + APP_CREATED, + APP_FINISHED, + + // app attempt events + APP_ATTEMPT_REGISTERED, + APP_ATTEMPT_FINISHED, + + // container events + CONTAINER_CREATED, + CONTAINER_FINISHED +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java new file mode 100644 index 0000000..88ecb82 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java @@ -0,0 +1,489 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.metrics; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; + +@Private +@Unstable +public class SystemMetricsPublisher extends CompositeService { + + private static final Log LOG = LogFactory + .getLog(SystemMetricsPublisher.class); + private static final int MAX_GET_TIMELINE_DELEGATION_TOKEN_ATTEMPTS = 10; + + private Dispatcher dispatcher; + private TimelineClient client; + private boolean publishSystemMetrics; + private int getTimelineDelegtionTokenAttempts = 0; + private boolean hasReceivedTimelineDelegtionToken = false; + + public SystemMetricsPublisher() { + super(SystemMetricsPublisher.class.getName()); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + publishSystemMetrics = + conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) && + conf.getBoolean(YarnConfiguration.RM_METRICS_PUBLISHER_ENABLED, + YarnConfiguration.DEFAULT_RM_METRICS_PUBLISHER_ENABLED); + + if (publishSystemMetrics) { + client = TimelineClient.createTimelineClient(); + addIfService(client); + + dispatcher = createDispatcher(conf); + dispatcher.register(SystemMetricsEventType.class, + new ForwardingEventHandler()); + addIfService(dispatcher); + LOG.info("YARN system metrics publishing service is enabled"); + } else { + LOG.info("YARN system metrics publishing service is not enabled"); + } + super.serviceInit(conf); + } + + @SuppressWarnings("unchecked") + public void appCreated(RMApp app, long createdTime) { + if (publishSystemMetrics) { + dispatcher.getEventHandler().handle( + new ApplicationCreatedEvent( + app.getApplicationId(), + app.getName(), + app.getApplicationType(), + app.getUser(), + app.getQueue(), + app.getSubmitTime(), + createdTime)); + } + } + + @SuppressWarnings("unchecked") + public void appFinished(RMApp app, RMAppState state, long finishedTime) { + if (publishSystemMetrics) { + dispatcher.getEventHandler().handle( + new ApplicationFinishedEvent( + app.getApplicationId(), + app.getDiagnostics().toString(), + app.getFinalApplicationStatus(), + RMServerUtils.createApplicationState(state), + app.getCurrentAppAttempt() == null ? + null : app.getCurrentAppAttempt().getAppAttemptId(), + finishedTime)); + } + } + + @SuppressWarnings("unchecked") + public void appAttemptRegistered(RMAppAttempt appAttempt, + long registeredTime) { + if (publishSystemMetrics) { + dispatcher.getEventHandler().handle( + new AppAttemptRegisteredEvent( + appAttempt.getAppAttemptId(), + appAttempt.getHost(), + appAttempt.getRpcPort(), + appAttempt.getTrackingUrl(), + appAttempt.getOriginalTrackingUrl(), + appAttempt.getMasterContainer().getId(), + registeredTime)); + } + } + + @SuppressWarnings("unchecked") + public void appAttemptFinished(RMAppAttempt appAttempt, + RMAppAttemptState state, long finishedTime) { + if (publishSystemMetrics) { + dispatcher.getEventHandler().handle( + new AppAttemptFinishedEvent( + appAttempt.getAppAttemptId(), + appAttempt.getTrackingUrl(), + appAttempt.getOriginalTrackingUrl(), + appAttempt.getDiagnostics(), + appAttempt.getFinalApplicationStatus(), + RMServerUtils.createApplicationAttemptState(state), + finishedTime)); + } + } + + @SuppressWarnings("unchecked") + public void containerCreated(RMContainer container, long createdTime) { + if (publishSystemMetrics) { + dispatcher.getEventHandler().handle( + new ContainerCreatedEvent( + container.getContainerId(), + container.getAllocatedResource(), + container.getAllocatedNode(), + container.getAllocatedPriority(), + createdTime)); + } + } + + @SuppressWarnings("unchecked") + public void containerFinished(RMContainer container, long finishedTime) { + if (publishSystemMetrics) { + dispatcher.getEventHandler().handle( + new ContainerFinishedEvent( + container.getContainerId(), + container.getDiagnosticsInfo(), + container.getContainerExitStatus(), + container.getContainerState(), + finishedTime)); + } + } + + protected Dispatcher createDispatcher(Configuration conf) { + MultiThreadedDispatcher dispatcher = + new MultiThreadedDispatcher( + conf.getInt( + YarnConfiguration.RM_METRICS_PUBLISHER_MULTI_DISPATCHER_POOL_SIZE, + YarnConfiguration.DEFAULT_RM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE)); + dispatcher.setDrainEventsOnStop(); + return dispatcher; + } + + protected void handleSystemMetricsEvent( + SystemMetricsEvent event) { + switch (event.getType()) { + case APP_CREATED: + publishApplicationCreatedEvent((ApplicationCreatedEvent) event); + break; + case APP_FINISHED: + publishApplicationFinishedEvent((ApplicationFinishedEvent) event); + break; + case APP_ATTEMPT_REGISTERED: + publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event); + break; + case APP_ATTEMPT_FINISHED: + publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event); + break; + case CONTAINER_CREATED: + publishContainerCreatedEvent((ContainerCreatedEvent) event); + break; + case CONTAINER_FINISHED: + publishContainerFinishedEvent((ContainerFinishedEvent) event); + default: + LOG.error("Unknown SystemMetricsEvent type: " + event.getType()); + } + } + + private void publishApplicationCreatedEvent(ApplicationCreatedEvent event) { + TimelineEntity entity = + createApplicationEntity(event.getApplicationId()); + Map entityInfo = new HashMap(); + entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, + event.getApplicationName()); + entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO, + event.getApplicationType()); + entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, + event.getUser()); + entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, + event.getQueue()); + entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO, + event.getSubmittedTime()); + entity.setOtherInfo(entityInfo); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType( + ApplicationMetricsConstants.CREATED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + entity.addEvent(tEvent); + putEntity(entity); + } + + private void publishApplicationFinishedEvent(ApplicationFinishedEvent event) { + TimelineEntity entity = + createApplicationEntity(event.getApplicationId()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType( + ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + event.getDiagnosticsInfo()); + eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, + event.getFinalApplicationStatus().toString()); + eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, + event.getYarnApplicationState().toString()); + if (event.getLatestApplicationAttemptId() != null) { + eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO, + event.getLatestApplicationAttemptId().toString()); + } + tEvent.setEventInfo(eventInfo); + entity.addEvent(tEvent); + putEntity(entity); + } + + private static TimelineEntity createApplicationEntity( + ApplicationId applicationId) { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE); + entity.setEntityId(applicationId.toString()); + return entity; + } + + private void + publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) { + TimelineEntity entity = + createAppAttemptEntity(event.getApplicationAttemptId()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType( + AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put( + AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, + event.getTrackingUrl()); + eventInfo.put( + AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, + event.getOriginalTrackingURL()); + eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, + event.getHost()); + eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO, + event.getRpcPort()); + eventInfo.put( + AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, + event.getMasterContainerId().toString()); + tEvent.setEventInfo(eventInfo); + entity.addEvent(tEvent); + putEntity(entity); + } + + private void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) { + TimelineEntity entity = + createAppAttemptEntity(event.getApplicationAttemptId()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put( + AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, + event.getTrackingUrl()); + eventInfo.put( + AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, + event.getOriginalTrackingURL()); + eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + event.getDiagnosticsInfo()); + eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, + event.getFinalApplicationStatus().toString()); + eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, + event.getYarnApplicationAttemptState().toString()); + tEvent.setEventInfo(eventInfo); + entity.addEvent(tEvent); + putEntity(entity); + } + + private static TimelineEntity createAppAttemptEntity( + ApplicationAttemptId appAttemptId) { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityType( + AppAttemptMetricsConstants.ENTITY_TYPE); + entity.setEntityId(appAttemptId.toString()); + entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER, + appAttemptId.getApplicationId().toString()); + return entity; + } + + private void publishContainerCreatedEvent(ContainerCreatedEvent event) { + TimelineEntity entity = createContainerEntity(event.getContainerId()); + Map entityInfo = new HashMap(); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, + event.getAllocatedResource().getMemory()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, + event.getAllocatedResource().getVirtualCores()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, + event.getAllocatedNode().getHost()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, + event.getAllocatedNode().getPort()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, + event.getAllocatedPriority().getPriority()); + entity.setOtherInfo(entityInfo); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + entity.addEvent(tEvent); + putEntity(entity); + } + + private void publishContainerFinishedEvent(ContainerFinishedEvent event) { + TimelineEntity entity = createContainerEntity(event.getContainerId()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + event.getDiagnosticsInfo()); + eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, + event.getContainerExitStatus()); + eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, + event.getContainerState().toString()); + tEvent.setEventInfo(eventInfo); + entity.addEvent(tEvent); + putEntity(entity); + } + + private static TimelineEntity createContainerEntity( + ContainerId containerId) { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityType( + ContainerMetricsConstants.ENTITY_TYPE); + entity.setEntityId(containerId.toString()); + entity.addPrimaryFilter(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER, + containerId.getApplicationAttemptId().toString()); + return entity; + } + + private void putEntity(TimelineEntity entity) { + if (UserGroupInformation.isSecurityEnabled()) { + if (!hasReceivedTimelineDelegtionToken + && getTimelineDelegtionTokenAttempts < MAX_GET_TIMELINE_DELEGATION_TOKEN_ATTEMPTS) { + try { + Token token = + client.getDelegationToken( + UserGroupInformation.getCurrentUser().getUserName()); + UserGroupInformation.getCurrentUser().addToken(token); + hasReceivedTimelineDelegtionToken = true; + } catch (Exception e) { + LOG.error("Error happens when getting timeline delegation token", e); + } finally { + ++getTimelineDelegtionTokenAttempts; + if (!hasReceivedTimelineDelegtionToken + && getTimelineDelegtionTokenAttempts == MAX_GET_TIMELINE_DELEGATION_TOKEN_ATTEMPTS) { + LOG.error("Run out of the attempts to get timeline delegation token. " + + "Use kerberos authentication only."); + } + } + } + } + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Publishing the entity " + entity.getEntityId() + + ", JSON-style content: " + TimelineUtils.dumpTimelineRecordtoJSON(entity)); + } + client.putEntities(entity); + } catch (Exception e) { + LOG.error("Error when publishing entity [" + entity.getEntityType() + "," + + entity.getEntityId() + "]", e); + } + } + + /** + * EventHandler implementation which forward events to SystemMetricsPublisher. + * Making use of it, SystemMetricsPublisher can avoid to have a public handle + * method. + */ + private final class ForwardingEventHandler implements + EventHandler { + + @Override + public void handle(SystemMetricsEvent event) { + handleSystemMetricsEvent(event); + } + + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + protected static class MultiThreadedDispatcher extends CompositeService + implements Dispatcher { + + private List dispatchers = + new ArrayList(); + + public MultiThreadedDispatcher(int num) { + super(MultiThreadedDispatcher.class.getName()); + for (int i = 0; i < num; ++i) { + AsyncDispatcher dispatcher = createDispatcher(); + dispatchers.add(dispatcher); + addIfService(dispatcher); + } + } + + @Override + public EventHandler getEventHandler() { + return new CompositEventHandler(); + } + + @Override + public void register(Class eventType, EventHandler handler) { + for (AsyncDispatcher dispatcher : dispatchers) { + dispatcher.register(eventType, handler); + } + } + + public void setDrainEventsOnStop() { + for (AsyncDispatcher dispatcher : dispatchers) { + dispatcher.setDrainEventsOnStop(); + } + } + + private class CompositEventHandler implements EventHandler { + + @Override + public void handle(Event event) { + // Use hashCode (of ApplicationId) to dispatch the event to the child + // dispatcher, such that all the writing events of one application will + // be handled by one thread, the scheduled order of the these events + // will be preserved + int index = (event.hashCode() & Integer.MAX_VALUE) % dispatchers.size(); + dispatchers.get(index).getEventHandler().handle(event); + } + + } + + protected AsyncDispatcher createDispatcher() { + return new AsyncDispatcher(); + } + + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index bf374b4..a1ae3ca 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -172,6 +172,12 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, String getTrackingUrl(); /** + * The original tracking url for the application master. + * @return the original tracking url for the application master. + */ + String getOriginalTrackingUrl(); + + /** * the diagnostics information for the application master. * @return the diagnostics information for the application master. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 48cf460..2018197 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -362,6 +362,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, this.stateMachine = stateMachineFactory.make(this); rmContext.getRMApplicationHistoryWriter().applicationStarted(this); + rmContext.getSystemMetricsPublisher().appCreated(this, startTime); } @Override @@ -621,6 +622,20 @@ public String getTrackingUrl() { } @Override + public String getOriginalTrackingUrl() { + this.readLock.lock(); + + try { + if (this.currentAttempt != null) { + return this.currentAttempt.getOriginalTrackingUrl(); + } + return null; + } finally { + this.readLock.unlock(); + } + } + + @Override public StringBuilder getDiagnostics() { this.readLock.lock(); @@ -1089,6 +1104,8 @@ public void transition(RMAppImpl app, RMAppEvent event) { app.rmContext.getRMApplicationHistoryWriter() .applicationFinished(app, finalState); + app.rmContext.getSystemMetricsPublisher() + .appFinished(app, finalState, app.finishTime); }; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 19fc800..4e05e9c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -1146,6 +1146,9 @@ public void transition(RMAppAttemptImpl appAttempt, appAttempt.rmContext.getRMApplicationHistoryWriter() .applicationAttemptFinished(appAttempt, finalAttemptState); + appAttempt.rmContext.getSystemMetricsPublisher() + .appAttemptFinished( + appAttempt, finalAttemptState, System.currentTimeMillis()); } } @@ -1259,6 +1262,8 @@ public void transition(RMAppAttemptImpl appAttempt, appAttempt.rmContext.getRMApplicationHistoryWriter() .applicationAttemptStarted(appAttempt); + appAttempt.rmContext.getSystemMetricsPublisher() + .appAttemptRegistered(appAttempt, System.currentTimeMillis()); } } @@ -1713,8 +1718,8 @@ public ApplicationAttemptReport createApplicationAttemptReport() { masterContainer == null ? null : masterContainer.getId(); attemptReport = ApplicationAttemptReport.newInstance(this .getAppAttemptId(), this.getHost(), this.getRpcPort(), this - .getTrackingUrl(), this.getDiagnostics(), YarnApplicationAttemptState - .valueOf(this.getState().toString()), amId); + .getTrackingUrl(), this.getOriginalTrackingUrl(), this.getDiagnostics(), + YarnApplicationAttemptState .valueOf(this.getState().toString()), amId); } finally { this.readLock.unlock(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index eef361f..d923809 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -190,6 +190,8 @@ public RMContainerImpl(Container container, this.writeLock = lock.writeLock(); rmContext.getRMApplicationHistoryWriter().containerStarted(this); + rmContext.getSystemMetricsPublisher().containerCreated( + this, this.creationTime); } @Override @@ -495,6 +497,8 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { container.rmContext.getRMApplicationHistoryWriter().containerFinished( container); + container.rmContext.getSystemMetricsPublisher().containerFinished( + container, container.finishTime); } private static void updateMetricsIfPreempted(RMContainerImpl container) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index d720eb6..e3f1eba 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -21,6 +21,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; @@ -119,6 +120,8 @@ public static RMContext mockRMContext(int n, long time) { } }; ((RMContextImpl)context).setStateStore(mock(RMStateStore.class)); + ((RMContextImpl)context).setSystemMetricsPublisher( + mock(SystemMetricsPublisher.class)); return context; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 4f4da37..abfb415 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -102,6 +102,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -1047,6 +1048,8 @@ private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext) .thenThrow(new IOException("queue does not exist")); RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); + SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); + when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); ConcurrentHashMap apps = getRMApps(rmContext, yarnScheduler); when(rmContext.getRMApps()).thenReturn(apps); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index ff60fcd..49fd9f0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -116,6 +116,10 @@ public String getTrackingUrl() { throw new UnsupportedOperationException("Not supported yet."); } @Override + public String getOriginalTrackingUrl() { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override public int getMaxAppAttempts() { throw new UnsupportedOperationException("Not supported yet."); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystsemMetricsPublisher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystsemMetricsPublisher.java new file mode 100644 index 0000000..b57bd3b --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystsemMetricsPublisher.java @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.metrics; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.EnumSet; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer; +import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp; +import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore; +import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timeline.TimelineStore; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestSystsemMetricsPublisher { + + private static ApplicationHistoryServer timelineServer; + private static SystemMetricsPublisher metricsPublisher; + private static TimelineStore store; + + @BeforeClass + public static void setup() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setBoolean(YarnConfiguration.RM_METRICS_PUBLISHER_ENABLED, true); + conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE, + MemoryTimelineStore.class, TimelineStore.class); + conf.setInt( + YarnConfiguration.RM_METRICS_PUBLISHER_MULTI_DISPATCHER_POOL_SIZE, + 2); + + timelineServer = new ApplicationHistoryServer(); + timelineServer.init(conf); + timelineServer.start(); + store = timelineServer.getTimelineStore(); + + metricsPublisher = new SystemMetricsPublisher(); + metricsPublisher.init(conf); + metricsPublisher.start(); + } + + @AfterClass + public static void tearDown() throws Exception { + if (metricsPublisher != null) { + metricsPublisher.stop(); + } + if (timelineServer != null) { + timelineServer.stop(); + } + AHSWebApp.resetInstance(); + } + + @Test(timeout = 10000) + public void testPublishApplicationMetrics() throws Exception { + ApplicationId appId = ApplicationId.newInstance(0, 1); + RMApp app = createRMApp(appId); + metricsPublisher.appCreated(app, app.getStartTime()); + metricsPublisher.appFinished(app, RMAppState.FINISHED, app.getFinishTime()); + TimelineEntity entity = null; + do { + entity = + store.getEntity(appId.toString(), + ApplicationMetricsConstants.ENTITY_TYPE, + EnumSet.allOf(Field.class)); + // ensure two events are both published before leaving the loop + } while (entity == null || entity.getEvents().size() < 2); + // verify all the fields + Assert.assertEquals(ApplicationMetricsConstants.ENTITY_TYPE, + entity.getEntityType()); + Assert + .assertEquals(app.getApplicationId().toString(), entity.getEntityId()); + Assert + .assertEquals( + app.getName(), + entity.getOtherInfo().get( + ApplicationMetricsConstants.NAME_ENTITY_INFO)); + Assert.assertEquals(app.getQueue(), + entity.getOtherInfo() + .get(ApplicationMetricsConstants.QUEUE_ENTITY_INFO)); + Assert + .assertEquals( + app.getUser(), + entity.getOtherInfo().get( + ApplicationMetricsConstants.USER_ENTITY_INFO)); + Assert + .assertEquals( + app.getApplicationType(), + entity.getOtherInfo().get( + ApplicationMetricsConstants.TYPE_ENTITY_INFO)); + Assert.assertEquals(app.getSubmitTime(), + entity.getOtherInfo().get( + ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO)); + boolean hasCreatedEvent = false; + boolean hasFinishedEvent = false; + for (TimelineEvent event : entity.getEvents()) { + if (event.getEventType().equals( + ApplicationMetricsConstants.CREATED_EVENT_TYPE)) { + hasCreatedEvent = true; + Assert.assertEquals(app.getStartTime(), event.getTimestamp()); + } else if (event.getEventType().equals( + ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) { + hasFinishedEvent = true; + Assert.assertEquals(app.getFinishTime(), event.getTimestamp()); + Assert.assertEquals( + app.getDiagnostics().toString(), + event.getEventInfo().get( + ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)); + Assert.assertEquals( + app.getFinalApplicationStatus().toString(), + event.getEventInfo().get( + ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO)); + Assert.assertEquals(YarnApplicationState.FINISHED.toString(), event + .getEventInfo().get(ApplicationMetricsConstants.STATE_EVENT_INFO)); + } + } + Assert.assertTrue(hasCreatedEvent && hasFinishedEvent); + } + + @Test(timeout = 10000) + public void testPublishAppAttemptMetrics() throws Exception { + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1); + RMAppAttempt appAttempt = createRMAppAttempt(appAttemptId); + metricsPublisher.appAttemptRegistered(appAttempt, Integer.MAX_VALUE + 1L); + metricsPublisher.appAttemptFinished(appAttempt, RMAppAttemptState.FINISHED, + Integer.MAX_VALUE + 2L); + TimelineEntity entity = null; + do { + entity = + store.getEntity(appAttemptId.toString(), + AppAttemptMetricsConstants.ENTITY_TYPE, + EnumSet.allOf(Field.class)); + // ensure two events are both published before leaving the loop + } while (entity == null || entity.getEvents().size() < 2); + // verify all the fields + Assert.assertEquals(AppAttemptMetricsConstants.ENTITY_TYPE, + entity.getEntityType()); + Assert.assertEquals(appAttemptId.toString(), entity.getEntityId()); + Assert.assertEquals( + appAttemptId.getApplicationId().toString(), + entity.getPrimaryFilters() + .get(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER).iterator() + .next()); + boolean hasRegisteredEvent = false; + boolean hasFinishedEvent = false; + for (TimelineEvent event : entity.getEvents()) { + if (event.getEventType().equals( + AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE)) { + hasRegisteredEvent = true; + Assert.assertEquals(appAttempt.getHost(), + event.getEventInfo() + .get(AppAttemptMetricsConstants.HOST_EVENT_INFO)); + Assert + .assertEquals(appAttempt.getRpcPort(), + event.getEventInfo().get( + AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO)); + Assert.assertEquals( + appAttempt.getMasterContainer().getId().toString(), + event.getEventInfo().get( + AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO)); + } else if (event.getEventType().equals( + AppAttemptMetricsConstants.FINISHED_EVENT_TYPE)) { + hasFinishedEvent = true; + Assert.assertEquals(appAttempt.getDiagnostics(), event.getEventInfo() + .get(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)); + Assert.assertEquals(appAttempt.getTrackingUrl(), event.getEventInfo() + .get(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO)); + Assert.assertEquals( + appAttempt.getOriginalTrackingUrl(), + event.getEventInfo().get( + AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO)); + Assert.assertEquals( + appAttempt.getFinalApplicationStatus().toString(), + event.getEventInfo().get( + AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO)); + Assert.assertEquals( + YarnApplicationAttemptState.FINISHED.toString(), + event.getEventInfo().get( + AppAttemptMetricsConstants.STATE_EVENT_INFO)); + } + } + Assert.assertTrue(hasRegisteredEvent && hasFinishedEvent); + } + + @Test(timeout = 10000) + public void testPublishContainerMetrics() throws Exception { + ContainerId containerId = + ContainerId.newInstance(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 1), 1), 1); + RMContainer container = createRMContainer(containerId); + metricsPublisher.containerCreated(container, container.getCreationTime()); + metricsPublisher.containerFinished(container, container.getFinishTime()); + TimelineEntity entity = null; + do { + entity = + store.getEntity(containerId.toString(), + ContainerMetricsConstants.ENTITY_TYPE, + EnumSet.allOf(Field.class)); + // ensure two events are both published before leaving the loop + } while (entity == null || entity.getEvents().size() < 2); + // verify all the fields + Assert.assertEquals(ContainerMetricsConstants.ENTITY_TYPE, + entity.getEntityType()); + Assert.assertEquals(containerId.toString(), entity.getEntityId()); + Assert.assertEquals( + containerId.getApplicationAttemptId().toString(), + entity.getPrimaryFilters() + .get(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER).iterator() + .next()); + Assert.assertEquals( + container.getAllocatedNode().getHost(), + entity.getOtherInfo().get( + ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO)); + Assert.assertEquals( + container.getAllocatedNode().getPort(), + entity.getOtherInfo().get( + ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO)); + Assert.assertEquals( + container.getAllocatedResource().getMemory(), + entity.getOtherInfo().get( + ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO)); + Assert.assertEquals( + container.getAllocatedResource().getVirtualCores(), + entity.getOtherInfo().get( + ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO)); + Assert.assertEquals( + container.getAllocatedPriority().getPriority(), + entity.getOtherInfo().get( + ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO)); + boolean hasCreatedEvent = false; + boolean hasFinishedEvent = false; + for (TimelineEvent event : entity.getEvents()) { + if (event.getEventType().equals( + ContainerMetricsConstants.CREATED_EVENT_TYPE)) { + hasCreatedEvent = true; + Assert.assertEquals(container.getCreationTime(), event.getTimestamp()); + } else if (event.getEventType().equals( + ContainerMetricsConstants.FINISHED_EVENT_TYPE)) { + hasFinishedEvent = true; + Assert.assertEquals(container.getFinishTime(), event.getTimestamp()); + Assert.assertEquals( + container.getDiagnosticsInfo(), + event.getEventInfo().get( + ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)); + Assert.assertEquals( + container.getContainerExitStatus(), + event.getEventInfo().get( + ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO)); + Assert.assertEquals(container.getContainerState().toString(), event + .getEventInfo().get(ContainerMetricsConstants.STATE_EVENT_INFO)); + } + } + Assert.assertTrue(hasCreatedEvent && hasFinishedEvent); + } + + private static RMApp createRMApp(ApplicationId appId) { + RMApp app = mock(RMApp.class); + when(app.getApplicationId()).thenReturn(appId); + when(app.getName()).thenReturn("test app"); + when(app.getApplicationType()).thenReturn("test app type"); + when(app.getUser()).thenReturn("test user"); + when(app.getQueue()).thenReturn("test queue"); + when(app.getSubmitTime()).thenReturn(Integer.MAX_VALUE + 1L); + when(app.getStartTime()).thenReturn(Integer.MAX_VALUE + 2L); + when(app.getFinishTime()).thenReturn(Integer.MAX_VALUE + 3L); + when(app.getDiagnostics()).thenReturn( + new StringBuilder("test diagnostics info")); + RMAppAttempt appAttempt = mock(RMAppAttempt.class); + when(appAttempt.getAppAttemptId()).thenReturn( + ApplicationAttemptId.newInstance(appId, 1)); + when(app.getCurrentAppAttempt()).thenReturn(appAttempt); + when(app.getFinalApplicationStatus()).thenReturn( + FinalApplicationStatus.UNDEFINED); + return app; + } + + private static RMAppAttempt createRMAppAttempt( + ApplicationAttemptId appAttemptId) { + RMAppAttempt appAttempt = mock(RMAppAttempt.class); + when(appAttempt.getAppAttemptId()).thenReturn(appAttemptId); + when(appAttempt.getHost()).thenReturn("test host"); + when(appAttempt.getRpcPort()).thenReturn(-100); + Container container = mock(Container.class); + when(container.getId()) + .thenReturn(ContainerId.newInstance(appAttemptId, 1)); + when(appAttempt.getMasterContainer()).thenReturn(container); + when(appAttempt.getDiagnostics()).thenReturn("test diagnostics info"); + when(appAttempt.getTrackingUrl()).thenReturn("test tracking url"); + when(appAttempt.getOriginalTrackingUrl()).thenReturn( + "test original tracking url"); + when(appAttempt.getFinalApplicationStatus()).thenReturn( + FinalApplicationStatus.UNDEFINED); + return appAttempt; + } + + private static RMContainer createRMContainer(ContainerId containerId) { + RMContainer container = mock(RMContainer.class); + when(container.getContainerId()).thenReturn(containerId); + when(container.getAllocatedNode()).thenReturn( + NodeId.newInstance("test host", -100)); + when(container.getAllocatedResource()).thenReturn( + Resource.newInstance(-1, -1)); + when(container.getAllocatedPriority()).thenReturn(Priority.UNDEFINED); + when(container.getCreationTime()).thenReturn(Integer.MAX_VALUE + 1L); + when(container.getFinishTime()).thenReturn(Integer.MAX_VALUE + 2L); + when(container.getDiagnosticsInfo()).thenReturn("test diagnostics info"); + when(container.getContainerExitStatus()).thenReturn(-1); + when(container.getContainerState()).thenReturn(ContainerState.COMPLETE); + return container; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index b63d2fe..2fff718 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -50,6 +50,7 @@ int failCount = 0; ApplicationId id; String url = null; + String oUrl = null; StringBuilder diagnostics = new StringBuilder(); RMAppAttempt attempt; int maxAppAttempts = 1; @@ -184,6 +185,15 @@ public void setTrackingUrl(String url) { } @Override + public String getOriginalTrackingUrl() { + return oUrl; + } + + public void setOriginalTrackingUrl(String oUrl) { + this.oUrl = oUrl; + } + + @Override public StringBuilder getDiagnostics() { return diagnostics; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 2fc4431..f045bd1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; @@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; @@ -89,6 +91,7 @@ private DrainDispatcher rmDispatcher; private RMStateStore store; private RMApplicationHistoryWriter writer; + private SystemMetricsPublisher publisher; private YarnScheduler scheduler; private TestSchedulerEventDispatcher schedulerDispatcher; @@ -198,6 +201,8 @@ public void setUp() throws Exception { new ClientToAMTokenSecretManagerInRM(), writer); ((RMContextImpl)rmContext).setStateStore(store); + publisher = mock(SystemMetricsPublisher.class); + ((RMContextImpl)rmContext).setSystemMetricsPublisher(publisher); rmDispatcher.register(RMAppAttemptEventType.class, new TestApplicationAttemptEventDispatcher(this.rmContext)); @@ -342,6 +347,7 @@ protected RMApp testCreateAppNewSaving( ApplicationSubmissionContext submissionContext) throws IOException { RMApp application = createNewTestApp(submissionContext); verify(writer).applicationStarted(any(RMApp.class)); + verify(publisher).appCreated(any(RMApp.class), anyLong()); // NEW => NEW_SAVING event RMAppEventType.START RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.START); @@ -465,6 +471,7 @@ public void testUnmanagedApp() throws IOException { // reset the counter of Mockito.verify reset(writer); + reset(publisher); // test app fails after 1 app attempt failure LOG.info("--- START: testUnmanagedAppFailPath ---"); @@ -931,6 +938,10 @@ private void verifyApplicationFinished(RMAppState state) { ArgumentCaptor.forClass(RMAppState.class); verify(writer).applicationFinished(any(RMApp.class), finalState.capture()); Assert.assertEquals(state, finalState.getValue()); + finalState = ArgumentCaptor.forClass(RMAppState.class); + verify(publisher).appFinished(any(RMApp.class), finalState.capture(), + anyLong()); + Assert.assertEquals(state, finalState.getValue()); } private void verifyAppRemovedSchedulerEvent(RMAppState finalState) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index efcecd9..b852be6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -70,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -126,6 +128,7 @@ private AMLivelinessMonitor amLivelinessMonitor; private AMLivelinessMonitor amFinishingMonitor; private RMApplicationHistoryWriter writer; + private SystemMetricsPublisher publisher; private RMStateStore store; @@ -238,6 +241,8 @@ public void setUp() throws Exception { store = mock(RMStateStore.class); ((RMContextImpl) rmContext).setStateStore(store); + publisher = mock(SystemMetricsPublisher.class); + ((RMContextImpl) rmContext).setSystemMetricsPublisher(publisher); scheduler = mock(YarnScheduler.class); masterService = mock(ApplicationMasterService.class); @@ -1315,6 +1320,11 @@ private void verifyApplicationAttemptFinished(RMAppAttemptState state) { verify(writer).applicationAttemptFinished( any(RMAppAttempt.class), finalState.capture()); Assert.assertEquals(state, finalState.getValue()); + finalState = + ArgumentCaptor.forClass(RMAppAttemptState.class); + verify(publisher).appAttemptFinished(any(RMAppAttempt.class), finalState.capture(), + anyLong()); + Assert.assertEquals(state, finalState.getValue()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index 44f8381..49d99ad 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; @@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; @@ -88,10 +90,12 @@ public void testReleaseWhileRunning() { "host:3465", resource, priority, null); RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); + SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); RMContext rmContext = mock(RMContext.class); when(rmContext.getDispatcher()).thenReturn(drainDispatcher); when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); + when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, nodeId, "user", rmContext); @@ -100,6 +104,7 @@ public void testReleaseWhileRunning() { assertEquals(nodeId, rmContainer.getAllocatedNode()); assertEquals(priority, rmContainer.getAllocatedPriority()); verify(writer).containerStarted(any(RMContainer.class)); + verify(publisher).containerCreated(any(RMContainer.class), anyLong()); rmContainer.handle(new RMContainerEvent(containerId, RMContainerEventType.START)); @@ -132,6 +137,7 @@ public void testReleaseWhileRunning() { rmContainer.getContainerExitStatus()); assertEquals(ContainerState.COMPLETE, rmContainer.getContainerState()); verify(writer).containerFinished(any(RMContainer.class)); + verify(publisher).containerFinished(any(RMContainer.class), anyLong()); ArgumentCaptor captor = ArgumentCaptor .forClass(RMAppAttemptContainerFinishedEvent.class); @@ -173,10 +179,12 @@ public void testExpireWhileRunning() { "host:3465", resource, priority, null); RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); + SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); RMContext rmContext = mock(RMContext.class); when(rmContext.getDispatcher()).thenReturn(drainDispatcher); when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); + when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, nodeId, "user", rmContext); @@ -185,6 +193,7 @@ public void testExpireWhileRunning() { assertEquals(nodeId, rmContainer.getAllocatedNode()); assertEquals(priority, rmContainer.getAllocatedPriority()); verify(writer).containerStarted(any(RMContainer.class)); + verify(publisher).containerCreated(any(RMContainer.class), anyLong()); rmContainer.handle(new RMContainerEvent(containerId, RMContainerEventType.START)); @@ -213,6 +222,8 @@ public void testExpireWhileRunning() { drainDispatcher.await(); assertEquals(RMContainerState.RUNNING, rmContainer.getState()); verify(writer, never()).containerFinished(any(RMContainer.class)); + verify(publisher, never()).containerFinished(any(RMContainer.class), + anyLong()); } @Test diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index fd14ef6..66ec0e6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -249,10 +250,12 @@ public void testSortedQueues() throws Exception { mock(ContainerAllocationExpirer.class); DrainDispatcher drainDispatcher = new DrainDispatcher(); RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); + SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); RMContext rmContext = mock(RMContext.class); when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); when(rmContext.getDispatcher()).thenReturn(drainDispatcher); when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); + when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( app_0.getApplicationId(), 1); ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index e548661..fa7145c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -92,6 +93,7 @@ public EventHandler getEventHandler() { new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), new ClientToAMTokenSecretManagerInRM(), writer); + rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class)); return rmContext; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 3d38364..b4c4c10 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -145,6 +146,8 @@ public void testAppAttemptMetrics() throws Exception { RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, null, null, null, null, writer); + ((RMContextImpl) rmContext).setSystemMetricsPublisher( + mock(SystemMetricsPublisher.class)); FifoScheduler scheduler = new FifoScheduler(); Configuration conf = new Configuration(); @@ -188,6 +191,8 @@ public void testNodeLocalAssignment() throws Exception { RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, null, containerTokenSecretManager, nmTokenSecretManager, null, writer); + ((RMContextImpl) rmContext).setSystemMetricsPublisher( + mock(SystemMetricsPublisher.class)); FifoScheduler scheduler = new FifoScheduler(); scheduler.setRMContext(rmContext); @@ -257,6 +262,8 @@ public void testUpdateResourceOnNode() throws Exception { RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, null, containerTokenSecretManager, nmTokenSecretManager, null, writer); + ((RMContextImpl) rmContext).setSystemMetricsPublisher( + mock(SystemMetricsPublisher.class)); FifoScheduler scheduler = new FifoScheduler(){ @SuppressWarnings("unused")