diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AHSClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AHSClient.java index b590a51f108..13918ccda99 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AHSClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AHSClient.java @@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; @@ -33,6 +34,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.client.api.impl.AHSClientImpl; +import org.apache.hadoop.yarn.client.api.impl.AHSv2ClientImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -45,9 +48,11 @@ * Create a new instance of AHSClient. */ @Public - public static AHSClient createAHSClient() { - AHSClient client = new AHSClientImpl(); - return client; + public static AHSClient createAHSClient(Configuration conf) { + if (YarnConfiguration.timelineServiceV2Enabled(conf)) { + return new AHSv2ClientImpl(); + } + return new AHSClientImpl(); } @Private diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AHSv2ClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AHSv2ClientImpl.java new file mode 100644 index 00000000000..c3124ab283a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AHSv2ClientImpl.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.client.api.AHSClient; +import org.apache.hadoop.yarn.client.api.TimelineReaderClient; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.timeline.TimelineEntityV2Converter; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class AHSv2ClientImpl extends AHSClient { + private TimelineReaderClient readerClient; + + public AHSv2ClientImpl() { + super(AHSv2ClientImpl.class.getName()); + } + + @Override + public void serviceInit(Configuration conf) { + readerClient = TimelineReaderClient.createTimelineReaderClient(); + readerClient.init(conf); + } + + protected void setReaderClient(TimelineReaderClient readerClient) { + this.readerClient = readerClient; + } + + @Override + public void serviceStart() { + readerClient.start(); + } + + @Override + public void serviceStop() { + readerClient.stop(); + } + + @Override + public ApplicationReport getApplicationReport(ApplicationId appId) + throws YarnException, IOException { + TimelineEntity entity = readerClient.getApplicationEntity( + appId.toString()); + return TimelineEntityV2Converter.convertToApplicationReport(entity); + } + + @Override + public List getApplications() + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public ApplicationAttemptReport getApplicationAttemptReport( + ApplicationAttemptId applicationAttemptId) + throws YarnException, IOException { + ApplicationId appId = applicationAttemptId.getApplicationId(); + TimelineEntity entity = readerClient.getApplicationAttemptEntity( + appId.toString(), + applicationAttemptId.toString()); + return TimelineEntityV2Converter.convertToApplicationAttemptReport(entity); + } + + @Override + public List getApplicationAttempts( + ApplicationId applicationId) throws YarnException, IOException { + List entities = readerClient.getApplicationAttemptEntities( + applicationId.toString()); + List appAttemptReports = + new ArrayList<>(); + if (entities != null && !entities.isEmpty()) { + for (TimelineEntity entity : entities) { + ApplicationAttemptReport container = + TimelineEntityV2Converter.convertToApplicationAttemptReport( + entity); + appAttemptReports.add(container); + } + } + return appAttemptReports; + } + + @Override + public ContainerReport getContainerReport(ContainerId containerId) + throws YarnException, IOException { + ApplicationId appId = containerId.getApplicationAttemptId(). + getApplicationId(); + TimelineEntity entity = readerClient.getContainerEntity(appId.toString(), + containerId.toString()); + return TimelineEntityV2Converter.convertToContainerReport(entity); + } + + @Override + public List getContainers(ApplicationAttemptId + applicationAttemptId) throws YarnException, IOException { + ApplicationId appId = applicationAttemptId. + getApplicationId(); + List entities = readerClient.getContainerEntities( + appId.toString(), applicationAttemptId.toString()); + List containers = + new ArrayList<>(); + if (entities != null && !entities.isEmpty()) { + for (TimelineEntity entity : entities) { + ContainerReport container = + TimelineEntityV2Converter.convertToContainerReport( + entity); + containers.add(container); + } + } + return containers; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index 1ceb46209b1..8c659e192f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -154,6 +154,8 @@ protected boolean timelineServiceBestEffort; private boolean loadResourceTypesFromServer; + private boolean timelineV2ServiceEnabled; + private static final String ROOT = "root"; public YarnClientImpl() { @@ -183,15 +185,19 @@ protected void serviceInit(Configuration conf) throws Exception { timelineService = TimelineUtils.buildTimelineTokenService(conf); } + if (YarnConfiguration.timelineServiceV2Enabled(conf)) { + timelineV2ServiceEnabled = true; + } + // The AHSClientService is enabled by default when we start the // TimelineServer which means we are able to get history information // for applications/applicationAttempts/containers by using ahsClient // when the TimelineServer is running. - if (timelineV1ServiceEnabled || conf.getBoolean( + if (timelineV2ServiceEnabled || timelineV1ServiceEnabled || conf.getBoolean( YarnConfiguration.APPLICATION_HISTORY_ENABLED, YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) { historyServiceEnabled = true; - historyClient = AHSClient.createAHSClient(); + historyClient = AHSClient.createAHSClient(conf); historyClient.init(conf); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAHSClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAHSClient.java index f0e3ca2a4cc..9e63bfb71f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAHSClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAHSClient.java @@ -64,7 +64,7 @@ @Test public void testClientStop() { Configuration conf = new Configuration(); - AHSClient client = AHSClient.createAHSClient(); + AHSClient client = AHSClient.createAHSClient(conf); client.init(conf); client.start(); client.stop(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAHSv2ClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAHSv2ClientImpl.java new file mode 100644 index 00000000000..7eca57f2f02 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAHSv2ClientImpl.java @@ -0,0 +1,222 @@ +package org.apache.hadoop.yarn.client.api.impl; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.client.api.TimelineReaderClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class TestAHSv2ClientImpl { + + private AHSv2ClientImpl client; + private TimelineReaderClient spyTimelineReaderClient; + @Before + public void setup() { + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + client = new AHSv2ClientImpl(); + spyTimelineReaderClient = mock(TimelineReaderClient.class); + client.setReaderClient(spyTimelineReaderClient); + } + + @Test + public void testGetContainerReport() throws IOException, YarnException { + final ApplicationId appId = ApplicationId.newInstance(0, 1); + final ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + final ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1); + /*doThrow(new IOException()).when( + spyTimelineReaderClient).getContainerEntity("application_1533643711195_0001", + "container_1533643711195_0001_01_000001");*/ + when(spyTimelineReaderClient.getContainerEntity(appId.toString(), + containerId.toString())).thenReturn(createContainerEntity(containerId)); + ContainerReport report = client.getContainerReport(containerId); + Assert.assertEquals(report.getContainerId(), containerId); + Assert.assertEquals(report.getAssignedNode().getHost(), "test host"); + Assert.assertEquals(report.getAssignedNode().getPort(), 100); + Assert.assertEquals(report.getAllocatedResource().getVirtualCores(), 8); + } + + @Test + public void testGetAppAttemptReport() throws IOException, YarnException { + final ApplicationId appId = ApplicationId.newInstance(0, 1); + final ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + when(spyTimelineReaderClient.getApplicationAttemptEntity(appId.toString(), + appAttemptId.toString())) + .thenReturn(createAppAttemptTimelineEntity(appAttemptId)); + ApplicationAttemptReport report = + client.getApplicationAttemptReport(appAttemptId); + Assert.assertEquals(report.getApplicationAttemptId(), appAttemptId); + Assert.assertEquals(report.getFinishTime(), Integer.MAX_VALUE + 2L); + Assert.assertEquals(report.getOriginalTrackingUrl(), + "test original tracking url"); + } + + @Test + public void testGetAppReport() throws IOException, YarnException { + final ApplicationId appId = ApplicationId.newInstance(0, 1); + when(spyTimelineReaderClient.getApplicationEntity(appId.toString())) + .thenReturn(createApplicationTimelineEntity(appId, false, false)); + ApplicationReport report = client.getApplicationReport(appId); + Assert.assertEquals(report.getApplicationId(), appId); + Assert.assertEquals(report.getAppNodeLabelExpression(), "test_node_label"); + Assert.assertTrue(report.getApplicationTags().contains("Test_APP_TAGS_1")); + Assert.assertEquals(report.getYarnApplicationState(), + YarnApplicationState.FINISHED); + } + + private static TimelineEntity createApplicationTimelineEntity( + ApplicationId appId, boolean emptyACLs, + boolean wrongAppId) { + TimelineEntity entity = new TimelineEntity(); + entity.setType(ApplicationMetricsConstants.ENTITY_TYPE); + if (wrongAppId) { + entity.setId("wrong_app_id"); + } else { + entity.setId(appId.toString()); + } + + Map entityInfo = new HashMap(); + entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, "test app"); + entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO, + "test app type"); + entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, "user1"); + entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, + "test queue"); + entityInfo.put( + ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO, "false"); + entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, + Priority.newInstance(0)); + entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO, + Integer.MAX_VALUE + 1L); + entityInfo.put(ApplicationMetricsConstants.APP_MEM_METRICS, 123); + entityInfo.put(ApplicationMetricsConstants.APP_CPU_METRICS, 345); + + entityInfo.put(ApplicationMetricsConstants.APP_MEM_PREEMPT_METRICS, 456); + entityInfo.put(ApplicationMetricsConstants.APP_CPU_PREEMPT_METRICS, 789); + + if (emptyACLs) { + entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, ""); + } else { + entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, + "user2"); + } + + Set appTags = new HashSet(); + appTags.add("Test_APP_TAGS_1"); + appTags.add("Test_APP_TAGS_2"); + entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO, appTags); + entity.setInfo(entityInfo); + + Map configs = new HashMap<>(); + configs.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION, + "test_node_label"); + entity.setConfigs(configs); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(Integer.MAX_VALUE + 1L + appId.getId()); + entity.addEvent(tEvent); + + // send a YARN_APPLICATION_STATE_UPDATED event + // after YARN_APPLICATION_FINISHED + // The final YarnApplicationState should not be changed + tEvent = new TimelineEvent(); + tEvent.setId( + ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE); + tEvent.setTimestamp(Integer.MAX_VALUE + 2L + appId.getId()); + Map eventInfo = new HashMap<>(); + eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, + YarnApplicationState.KILLED); + tEvent.setInfo(eventInfo); + entity.addEvent(tEvent); + + return entity; + } + + private static TimelineEntity createAppAttemptTimelineEntity( + ApplicationAttemptId appAttemptId) { + TimelineEntity entity = new TimelineEntity(); + entity.setType(AppAttemptMetricsConstants.ENTITY_TYPE); + entity.setId(appAttemptId.toString()); + + Map entityInfo = new HashMap(); + entityInfo.put(AppAttemptMetricsConstants.TRACKING_URL_INFO, + "test tracking url"); + entityInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_INFO, + "test original tracking url"); + entityInfo.put(AppAttemptMetricsConstants.HOST_INFO, "test host"); + entityInfo.put(AppAttemptMetricsConstants.RPC_PORT_INFO, 100); + entityInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO, + ContainerId.newContainerId(appAttemptId, 1)); + entity.setInfo(entityInfo); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE); + tEvent.setTimestamp(Integer.MAX_VALUE + 1L); + entity.addEvent(tEvent); + + tEvent = new TimelineEvent(); + tEvent.setId(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(Integer.MAX_VALUE + 2L); + entity.addEvent(tEvent); + + return entity; + } + + private static TimelineEntity createContainerEntity(ContainerId containerId) { + TimelineEntity entity = new TimelineEntity(); + entity.setType(ContainerMetricsConstants.ENTITY_TYPE); + entity.setId(containerId.toString()); + Map entityInfo = new HashMap(); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_INFO, 1024); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_INFO, 8); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_INFO, + "test host"); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_INFO, 100); + entityInfo + .put(ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO, -1); + entityInfo.put(ContainerMetricsConstants + .ALLOCATED_HOST_HTTP_ADDRESS_INFO, "http://test:1234"); + entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO, + "test diagnostics info"); + entityInfo.put(ContainerMetricsConstants.EXIT_STATUS_INFO, -1); + entityInfo.put(ContainerMetricsConstants.STATE_INFO, + ContainerState.COMPLETE.toString()); + entity.setInfo(entityInfo); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE); + tEvent.setTimestamp(123456); + entity.addEvent(tEvent); + + return entity; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineReaderClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineReaderClient.java new file mode 100644 index 00000000000..9b1c0018bf4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineReaderClient.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.client.api.impl.TimelineReaderClientImpl; + +import java.io.IOException; +import java.util.List; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public abstract class TimelineReaderClient extends AbstractService { + + /** + * Create a new instance of AHSClient. + */ + @InterfaceAudience.Public + public static TimelineReaderClient createTimelineReaderClient() { + return new TimelineReaderClientImpl(); + } + + @InterfaceAudience.Private + public TimelineReaderClient(String name) { + super(name); + } + + /** + * Get Timeline entity for the container. + * @param appId application id + * @param containerId container id + * @return timeline entity for contianer + * @throws IOException + */ + public abstract TimelineEntity getContainerEntity( + String appId, String containerId) throws IOException; + + /** + * Get container entities for an application + * @param appId application id + * @param appAttemptId application attempt id + * @return list of entities + * @throws IOException + */ + public abstract List getContainerEntities(String appId, + String appAttemptId) + throws IOException; + + /** + * Get application entity + * @param appId application id + * @param entityType Entity type to be returned + * @return entity of the application + * @throws IOException + */ + public abstract TimelineEntity getApplicationEntity(String appId, + TimelineEntityType entityType) throws IOException; + + /** + * Get application entity + * @param appId application id + * @return entity of the application + * @throws IOException + */ + public abstract TimelineEntity getApplicationEntity( + String appId) throws IOException; + + /** + * Get application attempt entity + * @param appId application id + * @param appAttemptId application attempt id + * @return entity associated with application attempt + * @throws IOException + */ + public abstract TimelineEntity getApplicationAttemptEntity( + String appId, String appAttemptId) throws IOException; + + /** + * Get application attempt entities + * @param appId application id + * @return list of application attempt entities + * @throws IOException + */ + public abstract List getApplicationAttemptEntities( + String appId) throws IOException; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineReaderClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineReaderClientImpl.java new file mode 100644 index 00000000000..4a92a1dc68d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineReaderClientImpl.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.client.api.impl; + +import com.sun.jersey.api.client.config.DefaultClientConfig; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.client.api.TimelineReaderClient; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.config.ClientConfig; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; + +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.List; + +import static org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType.YARN_APPLICATION_ATTEMPT; +import static org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType.YARN_CONTAINER; + +public class TimelineReaderClientImpl extends TimelineReaderClient { + private static final Log LOG = LogFactory.getLog(TimelineReaderClientImpl.class); + + private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/"; + + private Client client; + private volatile String timelineServiceAddress; + private Configuration conf; + + public TimelineReaderClientImpl() { + super(TimelineReaderClientImpl.class.getName()); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.conf = conf; + ClientConfig cc = new DefaultClientConfig(); + cc.getClasses().add(YarnJacksonJaxbJsonProvider.class); + client = Client.create(cc); + timelineServiceAddress = conf.get( + YarnConfiguration.TIMELINE_SERVICE_READER_WEBAPP_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_READER_WEBAPP_ADDRESS); + } + + @Override + public TimelineEntity getApplicationEntity(String appId, + TimelineEntityType entityType) throws IOException { + String url = RESOURCE_URI_STR_V2 + + "apps/" + appId; + if (entityType != null) { + url += "/entities/" + entityType.toString(); + } + url += "?fields=ALL"; + URI uri = TimelineConnector.constructResURI(conf, timelineServiceAddress, url); + ClientResponse response = client.resource(uri).accept("application/json") + .get(ClientResponse.class); + if (response.getStatus() != 200) { + throw new IOException("Failed : HTTP error code : " + + response.getStatus()); + } + TimelineEntity entity = response.getEntity(TimelineEntity.class); + LOG.info("Entity: " + TimelineUtils.dumpTimelineRecordtoJSON(entity)); + return entity; + } + + @Override + public TimelineEntity getContainerEntity(String appId, String containerId) + throws IOException { + String url = RESOURCE_URI_STR_V2 + + "apps/" + appId; + url += "/entities/" + YARN_CONTAINER.toString(); + url += "/" + containerId; + url += "?fields=ALL"; + URI uri = TimelineConnector.constructResURI(conf, timelineServiceAddress, url); + ClientResponse response = client.resource(uri).accept("application/json") + .get(ClientResponse.class); + if (response.getStatus() != 200) { + throw new IOException("Failed : HTTP error code : " + + response.getStatus()); + } + TimelineEntity entity = response.getEntity(TimelineEntity.class); + LOG.info("Entity: " + TimelineUtils.dumpTimelineRecordtoJSON(entity)); + return entity; + } + + @Override + public List getContainerEntities(String appId, + String appAttemptId) throws IOException { + String url = RESOURCE_URI_STR_V2 + + "apps/" + appId; + url += "/entities/" + YARN_CONTAINER.toString(); + url += "?fields=ALL&infofilters=SYSTEM_INFO_PARENT_ENTITY eq {\"id\":\"" + + appAttemptId + "\",\"type\":\"YARN_APPLICATION_ATTEMPT\"}"; + URI uri = TimelineConnector.constructResURI(conf, timelineServiceAddress, url); + ClientResponse response = client.resource(uri).accept("application/json") + .get(ClientResponse.class); + if (response.getStatus() != 200) { + throw new IOException("Failed : HTTP error code : " + + response.getStatus()); + } + TimelineEntity[] entity = response.getEntity(TimelineEntity[].class); + LOG.info("Entity: " + TimelineUtils.dumpTimelineRecordtoJSON(entity)); + return Arrays.asList(entity); + } + + @Override + public TimelineEntity getApplicationEntity(String appId) + throws IOException { + return getApplicationEntity(appId, null); + } + + @Override + public TimelineEntity getApplicationAttemptEntity(String appId, + String appAttemptId) throws IOException { + String url = RESOURCE_URI_STR_V2 + + "apps/" + appId; + url += "/entities/" + YARN_APPLICATION_ATTEMPT.toString(); + url += "/" + appAttemptId; + url += "?fields=ALL"; + URI uri = TimelineConnector.constructResURI(conf, timelineServiceAddress, url); + ClientResponse response = client.resource(uri).accept("application/json") + .get(ClientResponse.class); + if (response.getStatus() != 200) { + throw new IOException("Failed : HTTP error code : " + + response.getStatus()); + } + TimelineEntity entity = response.getEntity(TimelineEntity.class); + LOG.info("Entity: " + TimelineUtils.dumpTimelineRecordtoJSON(entity)); + return entity; + } + + @Override + public List getApplicationAttemptEntities(String appId) + throws IOException { + String url = RESOURCE_URI_STR_V2 + + "apps/" + appId; + url += "/entities/" + YARN_APPLICATION_ATTEMPT.toString(); + url += "?fields=ALL"; + URI uri = TimelineConnector.constructResURI(conf, timelineServiceAddress, url); + ClientResponse response = client.resource(uri).accept("application/json") + .get(ClientResponse.class); + if (response.getStatus() != 200) { + throw new IOException("Failed : HTTP error code : " + + response.getStatus()); + } + TimelineEntity[] entities = response.getEntity(TimelineEntity[].class); + LOG.info("Entity: " + TimelineUtils.dumpTimelineRecordtoJSON(entities)); + return Arrays.asList(entities); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/AppAttemptMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/AppAttemptMetricsConstants.java similarity index 100% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/AppAttemptMetricsConstants.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/AppAttemptMetricsConstants.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ApplicationMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ApplicationMetricsConstants.java similarity index 100% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ApplicationMetricsConstants.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ApplicationMetricsConstants.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java similarity index 100% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineEntityV2Converter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineEntityV2Converter.java new file mode 100644 index 00000000000..8e4db19a0d9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineEntityV2Converter.java @@ -0,0 +1,426 @@ +package org.apache.hadoop.yarn.util.timeline; + +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; + +public class TimelineEntityV2Converter { + private TimelineEntityV2Converter() { + } + + public static ContainerReport convertToContainerReport( + TimelineEntity entity) { + int allocatedMem = 0; + int allocatedVcore = 0; + String allocatedHost = null; + int allocatedPort = -1; + int allocatedPriority = 0; + long createdTime = 0; + long finishedTime = 0; + String diagnosticsInfo = null; + int exitStatus = ContainerExitStatus.INVALID; + ContainerState state = null; + String nodeHttpAddress = null; + Map entityInfo = entity.getInfo(); + if (entityInfo != null) { + if (entityInfo + .containsKey(ContainerMetricsConstants.ALLOCATED_MEMORY_INFO)) { + allocatedMem = (Integer) entityInfo.get( + ContainerMetricsConstants.ALLOCATED_MEMORY_INFO); + } + if (entityInfo + .containsKey(ContainerMetricsConstants.ALLOCATED_VCORE_INFO)) { + allocatedVcore = (Integer) entityInfo.get( + ContainerMetricsConstants.ALLOCATED_VCORE_INFO); + } + if (entityInfo + .containsKey(ContainerMetricsConstants.ALLOCATED_HOST_INFO)) { + allocatedHost = + entityInfo + .get(ContainerMetricsConstants.ALLOCATED_HOST_INFO) + .toString(); + } + if (entityInfo + .containsKey(ContainerMetricsConstants.ALLOCATED_PORT_INFO)) { + allocatedPort = (Integer) entityInfo.get( + ContainerMetricsConstants.ALLOCATED_PORT_INFO); + } + if (entityInfo + .containsKey(ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO)) { + allocatedPriority = (Integer) entityInfo.get( + ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO); + } + if (entityInfo.containsKey( + ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO)) { + nodeHttpAddress = + (String) entityInfo.get( + ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO); + } + if (entityInfo.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO)) { + diagnosticsInfo = + entityInfo.get( + ContainerMetricsConstants.DIAGNOSTICS_INFO) + .toString(); + } + if (entityInfo.containsKey(ContainerMetricsConstants.EXIT_STATUS_INFO)) { + exitStatus = (Integer) entityInfo.get( + ContainerMetricsConstants.EXIT_STATUS_INFO); + } + if (entityInfo.containsKey(ContainerMetricsConstants.STATE_INFO)) { + state = + ContainerState.valueOf(entityInfo.get( + ContainerMetricsConstants.STATE_INFO).toString()); + } + } + NavigableSet events = entity.getEvents(); + if (events != null) { + for (TimelineEvent event : events) { + if (event.getId().equals( + ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE)) { + createdTime = event.getTimestamp(); + } else if (event.getId().equals( + ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE)) { + finishedTime = event.getTimestamp(); + } + } + } + String logUrl = null; + NodeId allocatedNode = null; + if (allocatedHost != null) { + allocatedNode = NodeId.newInstance(allocatedHost, allocatedPort); + } + return ContainerReport.newInstance( + ContainerId.fromString(entity.getId()), + Resource.newInstance(allocatedMem, allocatedVcore), allocatedNode, + Priority.newInstance(allocatedPriority), + createdTime, finishedTime, diagnosticsInfo, logUrl, exitStatus, state, + nodeHttpAddress); + } + + public static ApplicationAttemptReport convertToApplicationAttemptReport( + TimelineEntity entity) { + String host = null; + int rpcPort = -1; + ContainerId amContainerId = null; + String trackingUrl = null; + String originalTrackingUrl = null; + String diagnosticsInfo = null; + YarnApplicationAttemptState state = null; + Map entityInfo = entity.getInfo(); + long startTime = 0; + long finishTime = 0; + + if (entityInfo != null) { + if (entityInfo.containsKey(AppAttemptMetricsConstants.HOST_INFO)) { + host = + entityInfo.get(AppAttemptMetricsConstants.HOST_INFO) + .toString(); + } + if (entityInfo + .containsKey(AppAttemptMetricsConstants.RPC_PORT_INFO)) { + rpcPort = (Integer) entityInfo.get( + AppAttemptMetricsConstants.RPC_PORT_INFO); + } + if (entityInfo + .containsKey(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO)) { + amContainerId = + ContainerId.fromString(entityInfo.get( + AppAttemptMetricsConstants.MASTER_CONTAINER_INFO) + .toString()); + } + if (entityInfo + .containsKey(AppAttemptMetricsConstants.TRACKING_URL_INFO)) { + trackingUrl = + entityInfo.get( + AppAttemptMetricsConstants.TRACKING_URL_INFO) + .toString(); + } + if (entityInfo + .containsKey( + AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_INFO)) { + originalTrackingUrl = + entityInfo + .get( + AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_INFO) + .toString(); + } + if (entityInfo + .containsKey(AppAttemptMetricsConstants.DIAGNOSTICS_INFO)) { + diagnosticsInfo = + entityInfo.get( + AppAttemptMetricsConstants.DIAGNOSTICS_INFO) + .toString(); + } + if (entityInfo + .containsKey(AppAttemptMetricsConstants.STATE_INFO)) { + state = + YarnApplicationAttemptState.valueOf(entityInfo.get( + AppAttemptMetricsConstants.STATE_INFO) + .toString()); + } + if (entityInfo + .containsKey(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO)) { + amContainerId = + ContainerId.fromString(entityInfo.get( + AppAttemptMetricsConstants.MASTER_CONTAINER_INFO) + .toString()); + } + } + NavigableSet events = entity.getEvents(); + if (events != null) { + for (TimelineEvent event : events) { + if (event.getId().equals( + AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE)) { + startTime = event.getTimestamp(); + } else if (event.getId().equals( + AppAttemptMetricsConstants.FINISHED_EVENT_TYPE)) { + finishTime = event.getTimestamp(); + } + } + } + return ApplicationAttemptReport.newInstance( + ApplicationAttemptId.fromString(entity.getId()), + host, rpcPort, trackingUrl, originalTrackingUrl, diagnosticsInfo, + state, amContainerId, startTime, finishTime); + } + + public static ApplicationReport convertToApplicationReport( + TimelineEntity entity) { + String user = null; + String queue = null; + String name = null; + String type = null; + boolean unmanagedApplication = false; + long createdTime = 0; + long finishedTime = 0; + float progress = 0.0f; + int applicationPriority = 0; + ApplicationAttemptId latestApplicationAttemptId = null; + String diagnosticsInfo = null; + FinalApplicationStatus finalStatus = FinalApplicationStatus.UNDEFINED; + YarnApplicationState state = YarnApplicationState.ACCEPTED; + ApplicationResourceUsageReport appResources = null; + Set appTags = null; + String appNodeLabelExpression = null; + String amNodeLabelExpression = null; + Map entityInfo = entity.getInfo(); + if (entityInfo != null) { + if (entityInfo.containsKey( + ApplicationMetricsConstants.USER_ENTITY_INFO)) { + user = + entityInfo.get(ApplicationMetricsConstants.USER_ENTITY_INFO) + .toString(); + } + if (entityInfo.containsKey( + ApplicationMetricsConstants.QUEUE_ENTITY_INFO)) { + queue = + entityInfo.get(ApplicationMetricsConstants.QUEUE_ENTITY_INFO) + .toString(); + } + if (entityInfo.containsKey( + ApplicationMetricsConstants.NAME_ENTITY_INFO)) { + name = + entityInfo.get(ApplicationMetricsConstants.NAME_ENTITY_INFO) + .toString(); + } + if (entityInfo.containsKey( + ApplicationMetricsConstants.TYPE_ENTITY_INFO)) { + type = + entityInfo.get(ApplicationMetricsConstants.TYPE_ENTITY_INFO) + .toString(); + } + if (entityInfo.containsKey( + ApplicationMetricsConstants.TYPE_ENTITY_INFO)) { + type = + entityInfo.get(ApplicationMetricsConstants.TYPE_ENTITY_INFO) + .toString(); + } + if (entityInfo + .containsKey( + ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO)) { + unmanagedApplication = + Boolean.parseBoolean(entityInfo.get( + ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO) + .toString()); + } + if (entityInfo + .containsKey(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO)) { + applicationPriority = Integer.parseInt(entityInfo.get( + ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO).toString()); + } + if (entityInfo.containsKey(ApplicationMetricsConstants.APP_TAGS_INFO)) { + appTags = new HashSet<>(); + Object obj = entityInfo.get(ApplicationMetricsConstants.APP_TAGS_INFO); + if (obj != null && obj instanceof Collection) { + for(Object o : (Collection)obj) { + if (o != null) { + appTags.add(o.toString()); + } + } + } + } + if (entityInfo + .containsKey( + ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO)) { + latestApplicationAttemptId = ApplicationAttemptId.fromString( + entityInfo.get( + ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO) + .toString()); + } + if (entityInfo + .containsKey(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)) { + diagnosticsInfo = + entityInfo.get( + ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO) + .toString(); + } + if (entityInfo + .containsKey(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO)) { + finalStatus = + FinalApplicationStatus.valueOf(entityInfo.get( + ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO) + .toString()); + } + if (entityInfo + .containsKey(ApplicationMetricsConstants.STATE_EVENT_INFO)) { + state = + YarnApplicationState.valueOf(entityInfo.get( + ApplicationMetricsConstants.STATE_EVENT_INFO).toString()); + } + } + + Map configs = entity.getConfigs(); + if (configs + .containsKey(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION)) { + appNodeLabelExpression = configs + .get(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION); + } + if (configs + .containsKey(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION)) { + amNodeLabelExpression = + configs.get(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION); + } + + Set metrics = entity.getMetrics(); + if (metrics != null) { + long vcoreSeconds = 0; + long memorySeconds = 0; + long preemptedVcoreSeconds = 0; + long preemptedMemorySeconds = 0; + + for (TimelineMetric metric : metrics) { + switch (metric.getId()) { + case ApplicationMetricsConstants.APP_CPU_METRICS: + vcoreSeconds = getAverageValue(metric.getValues().values()); + break; + case ApplicationMetricsConstants.APP_MEM_METRICS: + memorySeconds = getAverageValue(metric.getValues().values()); + break; + case ApplicationMetricsConstants.APP_MEM_PREEMPT_METRICS: + preemptedVcoreSeconds = getAverageValue(metric.getValues().values()); + break; + case ApplicationMetricsConstants.APP_CPU_PREEMPT_METRICS: + preemptedVcoreSeconds = getAverageValue(metric.getValues().values()); + break; + } + } + Map resourceSecondsMap = new HashMap<>(); + Map preemptedResoureSecondsMap = new HashMap<>(); + resourceSecondsMap + .put(ResourceInformation.MEMORY_MB.getName(), memorySeconds); + resourceSecondsMap + .put(ResourceInformation.VCORES.getName(), vcoreSeconds); + preemptedResoureSecondsMap.put(ResourceInformation.MEMORY_MB.getName(), + preemptedMemorySeconds); + preemptedResoureSecondsMap + .put(ResourceInformation.VCORES.getName(), preemptedVcoreSeconds); + + appResources = ApplicationResourceUsageReport + .newInstance(0, 0, null, null, null, resourceSecondsMap, 0, 0, + preemptedResoureSecondsMap); + } + + NavigableSet events = entity.getEvents(); + long updatedTimeStamp = 0L; + if (events != null) { + for (TimelineEvent event : events) { + if (event.getId().equals( + ApplicationMetricsConstants.CREATED_EVENT_TYPE)) { + createdTime = event.getTimestamp(); + } else if (event.getId().equals( + ApplicationMetricsConstants.UPDATED_EVENT_TYPE)) { + // This type of events are parsed in time-stamp descending order + // which means the previous event could override the information + // from the later same type of event. Hence compare timestamp + // before over writing. + if (event.getTimestamp() > updatedTimeStamp) { + updatedTimeStamp = event.getTimestamp(); + } + } else if (event.getId().equals( + ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE)) { + Map eventInfo = event.getInfo(); + if (eventInfo == null) { + continue; + } + if (eventInfo.containsKey( + ApplicationMetricsConstants.STATE_EVENT_INFO)) { + if (state == YarnApplicationState.ACCEPTED) { + state = YarnApplicationState.valueOf(eventInfo.get( + ApplicationMetricsConstants.STATE_EVENT_INFO).toString()); + } + } + } else if (event.getId().equals( + ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) { + progress=1.0F; + state = YarnApplicationState.FINISHED; + finishedTime = event.getTimestamp(); + } + } + } + return ApplicationReport.newInstance( + ApplicationId.fromString(entity.getId()), + latestApplicationAttemptId, user, queue, name, null, -1, null, state, + diagnosticsInfo, null, createdTime, finishedTime, finalStatus, + appResources, null, progress, type, null, appTags, unmanagedApplication, + Priority.newInstance(applicationPriority), appNodeLabelExpression, + amNodeLabelExpression); + } + + private static long getAverageValue(Collection values) { + if (values == null || values.isEmpty()) { + return 0; + } + long sum = 0; + for (Number value : values) { + sum += value.longValue(); + } + return sum/values.size(); + } +}