diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index f52e654..72e3063 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -1081,10 +1081,10 @@ public void run() { // jobId, timestamp and entityType. private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity createJobEntity(HistoryEvent event, long timestamp, JobId jobId, - String entityType) { - - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = - createBaseEntity(event, timestamp, entityType); + String entityType, boolean setCreatedTime) { + + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + createBaseEntity(event, timestamp, entityType, setCreatedTime); entity.setId(jobId.toString()); return entity; } @@ -1092,8 +1092,9 @@ public void run() { // create BaseEntity from HistoryEvent with adding other info, like: // timestamp and entityType. private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity - createBaseEntity(HistoryEvent event, long timestamp, String entityType) { - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent tEvent = + createBaseEntity(HistoryEvent event, long timestamp, String entityType, + boolean setCreatedTime) { + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent tEvent = event.toTimelineEvent(); tEvent.setTimestamp(timestamp); @@ -1101,6 +1102,10 @@ public void run() { new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); entity.addEvent(tEvent); entity.setType(entityType); + if (setCreatedTime) { + entity.setCreatedTime(timestamp); + } + entity.setModifiedTime(timestamp); return entity; } @@ -1108,9 +1113,10 @@ public void run() { // taskId, jobId, timestamp, entityType and relatedJobEntity. private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity createTaskEntity(HistoryEvent event, long timestamp, String taskId, - String entityType, String relatedJobEntity, JobId jobId) { - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = - createBaseEntity(event, timestamp, entityType); + String entityType, String relatedJobEntity, JobId jobId, + boolean setCreatedTime) { + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + createBaseEntity(event, timestamp, entityType, setCreatedTime); entity.setId(taskId); entity.addIsRelatedToEntity(relatedJobEntity, jobId.toString()); return entity; @@ -1121,9 +1127,9 @@ public void run() { private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity createTaskAttemptEntity(HistoryEvent event, long timestamp, String taskAttemptId, String entityType, String relatedTaskEntity, - String taskId) { - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = - createBaseEntity(event, timestamp, entityType); + String taskId, boolean setCreatedTime) { + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + createBaseEntity(event, timestamp, entityType, setCreatedTime); entity.setId(taskAttemptId); entity.addIsRelatedToEntity(relatedTaskEntity, taskId); return entity; @@ -1134,10 +1140,13 @@ private void processEventForNewTimelineService(HistoryEvent event, JobId jobId, org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity tEntity = null; String taskId = null; String taskAttemptId = null; + boolean setCreatedTime = false; switch (event.getEventType()) { // Handle job events case JOB_SUBMITTED: + setCreatedTime = true; + break; case JOB_STATUS_CHANGED: case JOB_INFO_CHANGED: case JOB_INITED: @@ -1152,6 +1161,7 @@ private void processEventForNewTimelineService(HistoryEvent event, JobId jobId, break; // Handle task events case TASK_STARTED: + setCreatedTime = true; taskId = ((TaskStartedEvent)event).getTaskId().toString(); break; case TASK_FAILED: @@ -1164,8 +1174,13 @@ private void processEventForNewTimelineService(HistoryEvent event, JobId jobId, taskId = ((TaskFinishedEvent)event).getTaskId().toString(); break; case MAP_ATTEMPT_STARTED: - case CLEANUP_ATTEMPT_STARTED: case REDUCE_ATTEMPT_STARTED: + setCreatedTime = true; + taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString(); + taskAttemptId = ((TaskAttemptStartedEvent)event). + getTaskAttemptId().toString(); + break; + case CLEANUP_ATTEMPT_STARTED: case SETUP_ATTEMPT_STARTED: taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString(); taskAttemptId = ((TaskAttemptStartedEvent)event). @@ -1204,17 +1219,18 @@ private void processEventForNewTimelineService(HistoryEvent event, JobId jobId, if (taskId == null) { // JobEntity tEntity = createJobEntity(event, timestamp, jobId, - MAPREDUCE_JOB_ENTITY_TYPE); + MAPREDUCE_JOB_ENTITY_TYPE, setCreatedTime); } else { if (taskAttemptId == null) { // TaskEntity tEntity = createTaskEntity(event, timestamp, taskId, - MAPREDUCE_TASK_ENTITY_TYPE, MAPREDUCE_JOB_ENTITY_TYPE, jobId); + MAPREDUCE_TASK_ENTITY_TYPE, MAPREDUCE_JOB_ENTITY_TYPE, + jobId, setCreatedTime); } else { // TaskAttemptEntity tEntity = createTaskAttemptEntity(event, timestamp, taskAttemptId, MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE, MAPREDUCE_TASK_ENTITY_TYPE, - taskId); + taskId, setCreatedTime); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java index 2c5c300..561a8ba 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java @@ -119,12 +119,12 @@ protected void handleNMTimelineEvent(NMTimelineEvent event) { @SuppressWarnings("unchecked") public void reportContainerResourceUsage(Container container, - long createdTime, String pId, Long pmemUsage, + long currentTime, String pId, Long pmemUsage, Float cpuUsageTotalCoresPercentage) { if (publishSystemMetrics && (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE || cpuUsageTotalCoresPercentage != ResourceCalculatorProcessTree.UNAVAILABLE)) { ContainerEntity entity = - createContainerEntity(container.getContainerId()); + createContainerEntity(container.getContainerId(), currentTime); long currentTimeMillis = System.currentTimeMillis(); if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE) { TimelineMetric memoryMetric = new TimelineMetric(); @@ -168,13 +168,14 @@ private void publishContainerCreatedEvent(ContainerEntity entity, tEvent.setTimestamp(timestamp); entity.addEvent(tEvent); + entity.setCreatedTime(timestamp); putEntity(entity, containerId.getApplicationAttemptId().getApplicationId()); } private void publishContainerFinishedEvent(ContainerStatus containerStatus, long timeStamp) { ContainerId containerId = containerStatus.getContainerId(); - TimelineEntity entity = createContainerEntity(containerId); + TimelineEntity entity = createContainerEntity(containerId, timeStamp); Map eventInfo = new HashMap(); eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, @@ -193,7 +194,8 @@ private void publishContainerFinishedEvent(ContainerStatus containerStatus, putEntity(entity, containerId.getApplicationAttemptId().getApplicationId()); } - private static ContainerEntity createContainerEntity(ContainerId containerId) { + private static ContainerEntity createContainerEntity(ContainerId containerId, + long timestamp) { ContainerEntity entity = new ContainerEntity(); entity.setId(containerId.toString()); Identifier parentIdentifier = new Identifier(); @@ -201,6 +203,7 @@ private static ContainerEntity createContainerEntity(ContainerId containerId) { .setType(TimelineEntityType.YARN_APPLICATION_ATTEMPT.name()); parentIdentifier.setId(containerId.getApplicationAttemptId().toString()); entity.setParent(parentIdentifier); + entity.setModifiedTime(timestamp); return entity; } @@ -308,7 +311,7 @@ public void handle(ContainerEvent event) { ContainerId containerId = event.getContainerID(); Container container = context.getContainers().get(containerId); long timestamp = event.getTimestamp(); - ContainerEntity entity = createContainerEntity(containerId); + ContainerEntity entity = createContainerEntity(containerId, timestamp); switch (event.getType()) { case INIT_CONTAINER: diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java index e0c593d..7d4a631 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java @@ -92,7 +92,8 @@ boolean isPublishContainerMetrics() { @SuppressWarnings("unchecked") @Override public void appCreated(RMApp app, long createdTime) { - ApplicationEntity entity = createApplicationEntity(app.getApplicationId()); + ApplicationEntity entity = + createApplicationEntity(app.getApplicationId(), createdTime); entity.setQueue(app.getQueue()); entity.setCreatedTime(createdTime); @@ -130,7 +131,8 @@ public void appCreated(RMApp app, long createdTime) { @SuppressWarnings("unchecked") @Override public void appFinished(RMApp app, RMAppState state, long finishedTime) { - ApplicationEntity entity = createApplicationEntity(app.getApplicationId()); + ApplicationEntity entity = + createApplicationEntity(app.getApplicationId(), finishedTime); RMAppMetrics appMetrics = app.getRMAppMetrics(); entity.addInfo(ApplicationMetricsConstants.APP_CPU_METRICS, appMetrics.getVcoreSeconds()); @@ -164,7 +166,8 @@ public void appFinished(RMApp app, RMAppState state, long finishedTime) { @SuppressWarnings("unchecked") @Override public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) { - ApplicationEntity entity = createApplicationEntity(app.getApplicationId()); + ApplicationEntity entity = + createApplicationEntity(app.getApplicationId(), updatedTime); Map entityInfo = new HashMap(); entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, appViewACLs); @@ -177,7 +180,8 @@ public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) { @SuppressWarnings("unchecked") @Override public void appUpdated(RMApp app, long currentTimeMillis) { - ApplicationEntity entity = createApplicationEntity(app.getApplicationId()); + ApplicationEntity entity = + createApplicationEntity(app.getApplicationId(), currentTimeMillis); Map eventInfo = new HashMap(); eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, app.getQueue()); @@ -193,9 +197,10 @@ public void appUpdated(RMApp app, long currentTimeMillis) { } private static ApplicationEntity createApplicationEntity( - ApplicationId applicationId) { + ApplicationId applicationId, long timestamp) { ApplicationEntity entity = new ApplicationEntity(); entity.setId(applicationId.toString()); + entity.setModifiedTime(timestamp); return entity; } @@ -204,7 +209,7 @@ private static ApplicationEntity createApplicationEntity( public void appAttemptRegistered(RMAppAttempt appAttempt, long registeredTime) { TimelineEntity entity = - createAppAttemptEntity(appAttempt.getAppAttemptId()); + createAppAttemptEntity(appAttempt.getAppAttemptId(), registeredTime); entity.setCreatedTime(registeredTime); TimelineEvent tEvent = new TimelineEvent(); @@ -235,7 +240,7 @@ public void appAttemptFinished(RMAppAttempt appAttempt, RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) { ApplicationAttemptEntity entity = - createAppAttemptEntity(appAttempt.getAppAttemptId()); + createAppAttemptEntity(appAttempt.getAppAttemptId(), finishedTime); TimelineEvent tEvent = new TimelineEvent(); tEvent.setId(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE); @@ -262,11 +267,12 @@ public void appAttemptFinished(RMAppAttempt appAttempt, } private static ApplicationAttemptEntity createAppAttemptEntity( - ApplicationAttemptId appAttemptId) { + ApplicationAttemptId appAttemptId, long timestamp) { ApplicationAttemptEntity entity = new ApplicationAttemptEntity(); entity.setId(appAttemptId.toString()); entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION.name(), appAttemptId.getApplicationId().toString())); + entity.setModifiedTime(timestamp); return entity; } @@ -274,7 +280,8 @@ private static ApplicationAttemptEntity createAppAttemptEntity( @Override public void containerCreated(RMContainer container, long createdTime) { if (publishContainerMetrics) { - TimelineEntity entity = createContainerEntity(container.getContainerId()); + TimelineEntity entity = + createContainerEntity(container.getContainerId(), createdTime); entity.setCreatedTime(createdTime); TimelineEvent tEvent = new TimelineEvent(); @@ -309,7 +316,8 @@ public void containerCreated(RMContainer container, long createdTime) { @Override public void containerFinished(RMContainer container, long finishedTime) { if (publishContainerMetrics) { - TimelineEntity entity = createContainerEntity(container.getContainerId()); + TimelineEntity entity = + createContainerEntity(container.getContainerId(), finishedTime); TimelineEvent tEvent = new TimelineEvent(); tEvent.setId(ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE); @@ -330,11 +338,13 @@ public void containerFinished(RMContainer container, long finishedTime) { } } - private static ContainerEntity createContainerEntity(ContainerId containerId) { + private static ContainerEntity createContainerEntity(ContainerId containerId, + long timestamp) { ContainerEntity entity = new ContainerEntity(); entity.setId(containerId.toString()); entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION_ATTEMPT .name(), containerId.getApplicationAttemptId().toString())); + entity.setModifiedTime(timestamp); return entity; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java index 20a5b13..3c07336 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.metrics; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -44,10 +45,15 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; @@ -58,6 +64,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.AfterClass; @@ -197,8 +204,7 @@ public void testPublishApplicationMetrics() throws Exception { + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; File appFile = new File(outputDirApp, timelineServiceFileName); Assert.assertTrue(appFile.exists()); - Assert.assertEquals("Expected 3 events to be published", 3, - getNumOfNonEmptyLines(appFile)); + verifyEntity(appFile, 3, ApplicationMetricsConstants.CREATED_EVENT_TYPE); } @Test(timeout = 10000) @@ -232,8 +238,7 @@ public void testPublishAppAttemptMetrics() throws Exception { + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; File appFile = new File(outputDirApp, timelineServiceFileName); Assert.assertTrue(appFile.exists()); - Assert.assertEquals("Expected 2 events to be published", 2, - getNumOfNonEmptyLines(appFile)); + verifyEntity(appFile,2, AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE); } @Test(timeout = 10000) @@ -264,8 +269,8 @@ public void testPublishContainerMetrics() throws Exception { + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; File appFile = new File(outputDirApp, timelineServiceFileName); Assert.assertTrue(appFile.exists()); - Assert.assertEquals("Expected 2 events to be published", 2, - getNumOfNonEmptyLines(appFile)); + verifyEntity(appFile, 2, + ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE); } private RMApp createAppAndRegister(ApplicationId appId) { @@ -278,20 +283,32 @@ private RMApp createAppAndRegister(ApplicationId appId) { return app; } - private long getNumOfNonEmptyLines(File entityFile) throws IOException { + private static void verifyEntity(File entityFile, long expectedEvents, + String eventForCreatedTime) throws IOException { BufferedReader reader = null; String strLine; long count = 0; try { reader = new BufferedReader(new FileReader(entityFile)); while ((strLine = reader.readLine()) != null) { - if (strLine.trim().length() > 0) + if (strLine.trim().length() > 0) { + TimelineEntity entity = FileSystemTimelineReaderImpl. + getTimelineRecordFromJSON(strLine.trim(), TimelineEntity.class); + for (TimelineEvent event : entity.getEvents()) { + if (event.getId().equals(eventForCreatedTime)) { + assertTrue(entity.getCreatedTime() > 0); + break; + } + } + assertTrue(entity.getModifiedTime() > 0); count++; + } } } finally { reader.close(); } - return count; + assertEquals("Expected " + expectedEvents + " events to be published", + count, expectedEvents); } private String getTimelineEntityDir(RMApp app) {