diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java index a9bbdf5..0481b35 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java @@ -20,15 +20,12 @@ import java.io.File; import java.io.IOException; - import java.util.EnumSet; import java.util.List; -import java.util.Set; import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -38,9 +35,9 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; @@ -48,7 +45,6 @@ import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; - import org.junit.Assert; import org.junit.Test; @@ -204,7 +200,7 @@ public void testMRNewTimelineServiceEventHandling() throws Exception { ApplicationReport appReport = apps.get(0); firstAppId = appReport.getApplicationId(); - checkNewTimelineEvent(firstAppId); + checkNewTimelineEvent(firstAppId, appReport); LOG.info("Run 2nd job which should be failed."); job = UtilsForTests.runJobFail(new JobConf(conf), inDir, outDir); @@ -213,11 +209,10 @@ public void testMRNewTimelineServiceEventHandling() throws Exception { apps = yarnClient.getApplications(appStates); Assert.assertEquals(apps.size(), 2); - - ApplicationId secAppId = null; - secAppId = apps.get(0).getApplicationId() == firstAppId ? - apps.get(1).getApplicationId() : apps.get(0).getApplicationId(); - checkNewTimelineEvent(firstAppId); + + appReport = apps.get(0).getApplicationId().equals(firstAppId) ? + apps.get(0) : apps.get(1); + checkNewTimelineEvent(firstAppId, appReport); } finally { if (cluster != null) { @@ -234,7 +229,8 @@ public void testMRNewTimelineServiceEventHandling() throws Exception { } } - private void checkNewTimelineEvent(ApplicationId appId) throws IOException { + private void checkNewTimelineEvent(ApplicationId appId, + ApplicationReport appReport) throws IOException { String tmpRoot = FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT + "/entities/"; @@ -242,15 +238,18 @@ private void checkNewTimelineEvent(ApplicationId appId) throws IOException { File tmpRootFolder = new File(tmpRoot); Assert.assertTrue(tmpRootFolder.isDirectory()); - String basePath = tmpRoot + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" + - UserGroupInformation.getCurrentUser().getShortUserName() + - "/" + TimelineUtils.generateDefaultFlowNameBasedOnAppId(appId) + - "/1/1/" + appId.toString(); + String basePath = tmpRoot + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + + "/" + UserGroupInformation.getCurrentUser().getShortUserName() + + "/" + appReport.getName() + + "/" + TimelineUtils.DEFAULT_FLOW_VERSION + + "/" + appReport.getStartTime() + + "/" + appId.toString(); // for this test, we expect MAPREDUCE_JOB and MAPREDUCE_TASK dirs String outputDirJob = basePath + "/MAPREDUCE_JOB/"; File entityFolder = new File(outputDirJob); - Assert.assertTrue("Job output directory: " + outputDirJob + " is not exist.", + Assert.assertTrue("Job output directory: " + outputDirJob + + " does not exist.", entityFolder.isDirectory()); // check for job event file @@ -259,13 +258,15 @@ private void checkNewTimelineEvent(ApplicationId appId) throws IOException { String jobEventFilePath = outputDirJob + jobEventFileName; File jobEventFile = new File(jobEventFilePath); - Assert.assertTrue("jobEventFilePath: " + jobEventFilePath + " is not exist.", + Assert.assertTrue("jobEventFilePath: " + jobEventFilePath + + " does not exist.", jobEventFile.exists()); // check for task event file String outputDirTask = basePath + "/MAPREDUCE_TASK/"; File taskFolder = new File(outputDirTask); - Assert.assertTrue("Task output directory: " + outputDirTask + " is not exist.", + Assert.assertTrue("Task output directory: " + outputDirTask + + " does not exist.", taskFolder.isDirectory()); String taskEventFileName = appId.toString().replaceAll("application", "task") @@ -273,14 +274,15 @@ private void checkNewTimelineEvent(ApplicationId appId) throws IOException { String taskEventFilePath = outputDirTask + taskEventFileName; File taskEventFile = new File(taskEventFilePath); - Assert.assertTrue("taskEventFileName: " + taskEventFilePath + " is not exist.", + Assert.assertTrue("taskEventFileName: " + taskEventFilePath + + " does not exist.", taskEventFile.exists()); // check for task attempt event file String outputDirTaskAttempt = basePath + "/MAPREDUCE_TASK_ATTEMPT/"; File taskAttemptFolder = new File(outputDirTaskAttempt); Assert.assertTrue("TaskAttempt output directory: " + outputDirTaskAttempt + - " is not exist.", taskAttemptFolder.isDirectory()); + " does not exist.", taskAttemptFolder.isDirectory()); String taskAttemptEventFileName = appId.toString().replaceAll( "application", "attempt") + "_m_000000_0" + @@ -290,7 +292,7 @@ private void checkNewTimelineEvent(ApplicationId appId) throws IOException { taskAttemptEventFileName; File taskAttemptEventFile = new File(taskAttemptEventFilePath); Assert.assertTrue("taskAttemptEventFileName: " + taskAttemptEventFilePath + - " is not exist.", taskAttemptEventFile.exists()); + " does not exist.", taskAttemptEventFile.exists()); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index f141ca2..15e21f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -283,13 +283,14 @@ public void run() { boolean verified = false; String errorMessage = ""; ApplicationId appId = null; + ApplicationReport appReport = null; while(!verified) { List apps = yarnClient.getApplications(); if (apps.size() == 0 ) { Thread.sleep(10); continue; } - ApplicationReport appReport = apps.get(0); + appReport = apps.get(0); appId = appReport.getApplicationId(); if(appReport.getHost().equals("N/A")) { Thread.sleep(10); @@ -316,7 +317,7 @@ public void run() { if (!isTestingTimelineV2) { checkTimelineV1(haveDomain); } else { - checkTimelineV2(haveDomain, appId, defaultFlow); + checkTimelineV2(haveDomain, appId, defaultFlow, appReport); } } @@ -366,7 +367,7 @@ private void checkTimelineV1(boolean haveDomain) throws Exception { } private void checkTimelineV2(boolean haveDomain, ApplicationId appId, - boolean defaultFlow) throws Exception { + boolean defaultFlow, ApplicationReport appReport) throws Exception { LOG.info("Started checkTimelineV2 "); // For PoC check in /tmp/timeline_service_data YARN-3264 String tmpRoot = @@ -379,10 +380,13 @@ private void checkTimelineV2(boolean haveDomain, ApplicationId appId, String basePath = tmpRoot + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" + UserGroupInformation.getCurrentUser().getShortUserName() + - (defaultFlow ? "/" + - TimelineUtils.generateDefaultFlowNameBasedOnAppId(appId) + - "/1/1/" : "/test_flow_name/test_flow_version/12345678/") + - appId.toString(); + (defaultFlow ? + "/" + appReport.getName() + "/" + + TimelineUtils.DEFAULT_FLOW_VERSION +"/" + + appReport.getStartTime() +"/" : + "/test_flow_name/test_flow_version/12345678/") + + appId.toString(); + LOG.info("basePath: " + basePath); // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs // Verify DS_APP_ATTEMPT entities posted by the client diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java index 53f5af2..ccb7105 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java @@ -49,6 +49,7 @@ "TIMELINE_FLOW_VERSION_TAG"; public static final String FLOW_RUN_ID_TAG_PREFIX = "TIMELINE_FLOW_RUN_ID_TAG"; + public final static String DEFAULT_FLOW_VERSION = "1"; private static ObjectMapper mapper; @@ -127,9 +128,12 @@ public static Text buildTimelineTokenService(Configuration conf) { return SecurityUtil.buildTokenService(timelineServiceAddr); } - public static String generateDefaultFlowNameBasedOnAppId( + public static String generateDefaultFlowName(String appName, ApplicationId appId) { - return "flow_" + appId.getClusterTimestamp() + "_" + appId.getId(); + return (appName != null && + !appName.equals(YarnConfiguration.DEFAULT_APPLICATION_NAME)) ? + appName : + "flow_" + appId.getClusterTimestamp() + "_" + appId.getId(); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index 9a7638c..2db956a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; @@ -210,21 +211,49 @@ protected void setupTokens( // Set AppSubmitTime to be consumable by the AM. ApplicationId applicationId = application.getAppAttemptId().getApplicationId(); + RMApp app = rmContext.getRMApps().get(applicationId); environment.put( ApplicationConstants.APP_SUBMIT_TIME_ENV, - String.valueOf(rmContext.getRMApps() - .get(applicationId) - .getSubmitTime())); + String.valueOf(app.getSubmitTime())); if (YarnConfiguration.timelineServiceV2Enabled(conf)) { - // Set flow context info - for (String tag : - rmContext.getRMApps().get(applicationId).getApplicationTags()) { - setFlowTags(environment, TimelineUtils.FLOW_NAME_TAG_PREFIX, tag); - setFlowTags(environment, TimelineUtils.FLOW_VERSION_TAG_PREFIX, tag); - setFlowTags(environment, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, tag); + // initialize the flow in the environment with default values for those + // that do not specify the flow tags + // flow name: app name (or app id if app name is missing), + // flow version: "1", flow run id: start time + setFlowTags(environment, TimelineUtils.FLOW_NAME_TAG_PREFIX, + TimelineUtils.generateDefaultFlowName(app.getName(), applicationId)); + setFlowTags(environment, TimelineUtils.FLOW_VERSION_TAG_PREFIX, + TimelineUtils.DEFAULT_FLOW_VERSION); + setFlowTags(environment, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, + String.valueOf(app.getStartTime())); + + // Set flow context info: the flow context is received via the application + // tags + for (String tag : app.getApplicationTags()) { + String[] parts = tag.split(":", 2); + if (parts.length != 2 || parts[1].isEmpty()) { + continue; + } + switch (parts[0].toUpperCase()) { + case TimelineUtils.FLOW_NAME_TAG_PREFIX: + setFlowTags(environment, TimelineUtils.FLOW_NAME_TAG_PREFIX, + parts[1]); + break; + case TimelineUtils.FLOW_VERSION_TAG_PREFIX: + setFlowTags(environment, TimelineUtils.FLOW_VERSION_TAG_PREFIX, + parts[1]); + break; + case TimelineUtils.FLOW_RUN_ID_TAG_PREFIX: + setFlowTags(environment, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, + parts[1]); + break; + default: + break; + } } } + Credentials credentials = new Credentials(); DataInputByteBuffer dibb = new DataInputByteBuffer(); ByteBuffer tokens = container.getTokens(); @@ -246,13 +275,9 @@ protected void setupTokens( } private static void setFlowTags( - Map environment, String tagPrefix, String tag) { - if (tag.startsWith(tagPrefix + ":") || - tag.startsWith(tagPrefix.toLowerCase() + ":")) { - String value = tag.substring(tagPrefix.length() + 1); - if (!value.isEmpty()) { - environment.put(tagPrefix, value); - } + Map environment, String tagPrefix, String value) { + if (!value.isEmpty()) { + environment.put(tagPrefix, value); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java index ff055a1..fca38cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java @@ -18,13 +18,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.timelineservice; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; @@ -35,6 +39,9 @@ @InterfaceAudience.Private @InterfaceStability.Unstable public class RMTimelineCollectorManager extends TimelineCollectorManager { + private static final Log LOG = + LogFactory.getLog(RMTimelineCollectorManager.class); + private RMContext rmContext; public RMTimelineCollectorManager(RMContext rmContext) { @@ -51,9 +58,21 @@ public void postPut(ApplicationId appId, TimelineCollector collector) { "non-existing app " + appId); } String userId = app.getUser(); + TimelineCollectorContext context = collector.getTimelineEntityContext(); if (userId != null && !userId.isEmpty()) { - collector.getTimelineEntityContext().setUserId(userId); + context.setUserId(userId); } + + // initialize the flow in the environment with default values for those + // that do not specify the flow tags + // flow name: app name (or app id if app name is missing), + // flow version: "1", flow run id: start time + context.setFlowName(TimelineUtils.generateDefaultFlowName( + app.getName(), appId)); + context.setFlowVersion(TimelineUtils.DEFAULT_FLOW_VERSION); + context.setFlowRunId(app.getStartTime()); + + // the flow context is received via the application tags for (String tag : app.getApplicationTags()) { String[] parts = tag.split(":", 2); if (parts.length != 2 || parts[1].isEmpty()) { @@ -61,14 +80,22 @@ public void postPut(ApplicationId appId, TimelineCollector collector) { } switch (parts[0].toUpperCase()) { case TimelineUtils.FLOW_NAME_TAG_PREFIX: - collector.getTimelineEntityContext().setFlowName(parts[1]); + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the flow name: " + parts[1]); + } + context.setFlowName(parts[1]); break; case TimelineUtils.FLOW_VERSION_TAG_PREFIX: - collector.getTimelineEntityContext().setFlowVersion(parts[1]); + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the flow version: " + parts[1]); + } + context.setFlowVersion(parts[1]); break; case TimelineUtils.FLOW_RUN_ID_TAG_PREFIX: - collector.getTimelineEntityContext().setFlowRunId( - Long.parseLong(parts[1])); + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the flow run id: " + parts[1]); + } + context.setFlowRunId(Long.parseLong(parts[1])); break; default: break; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java index 1a58cb2..7a018ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java @@ -83,8 +83,6 @@ private static TimelineServiceV2Publisher metricsPublisher; private static DrainDispatcher dispatcher = new DrainDispatcher(); - private static final String DEFAULT_FLOW_VERSION = "1"; - private static final long DEFAULT_FLOW_RUN = 1; private static ConcurrentMap rmAppsMapInContext; @@ -316,16 +314,14 @@ private static void verifyEntity(File entityFile, long expectedEvents, private String getTimelineEntityDir(RMApp app) { String outputDirApp = - testRootDir.getAbsolutePath()+"/" - + FileSystemTimelineWriterImpl.ENTITIES_DIR - + "/" - + YarnConfiguration.DEFAULT_RM_CLUSTER_ID - + "/" - + app.getUser() - + "/" - + TimelineUtils.generateDefaultFlowNameBasedOnAppId(app - .getApplicationId()) + "/" + DEFAULT_FLOW_VERSION + "/" - + DEFAULT_FLOW_RUN + "/" + app.getApplicationId(); + testRootDir.getAbsolutePath() + "/" + + FileSystemTimelineWriterImpl.ENTITIES_DIR + "/" + + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" + + app.getUser() + "/" + + app.getName() + "/" + + TimelineUtils.DEFAULT_FLOW_VERSION + "/" + + app.getStartTime() + "/" + + app.getApplicationId(); return outputDirApp; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java index 36dd7b0..4fe445a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java @@ -18,14 +18,14 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; -import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.util.timeline.TimelineUtils; + +import com.google.common.base.Preconditions; /** * Service that handles writes to the timeline service and writes them to the @@ -54,13 +54,6 @@ protected void serviceInit(Configuration conf) throws Exception { // context info from NM. // Current user usually is not the app user, but keep this field non-null context.setUserId(UserGroupInformation.getCurrentUser().getShortUserName()); - // Use app ID to generate a default flow name for orphan app - context.setFlowName( - TimelineUtils.generateDefaultFlowNameBasedOnAppId(appId)); - // Set the flow version to string 1 if it's an orphan app - context.setFlowVersion("1"); - // Set the flow run ID to 1 if it's an orphan app - context.setFlowRunId(1L); context.setAppId(appId.toString()); super.serviceInit(conf); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java index 785fb19..75557a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java @@ -164,18 +164,30 @@ private void updateTimelineCollectorContext( getNMCollectorService().getTimelineCollectorContext(request); String userId = response.getUserId(); if (userId != null && !userId.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the user in the context: " + userId); + } collector.getTimelineEntityContext().setUserId(userId); } String flowName = response.getFlowName(); if (flowName != null && !flowName.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the flow name: " + flowName); + } collector.getTimelineEntityContext().setFlowName(flowName); } String flowVersion = response.getFlowVersion(); if (flowVersion != null && !flowVersion.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the flow version: " + flowVersion); + } collector.getTimelineEntityContext().setFlowVersion(flowVersion); } long flowRunId = response.getFlowRunId(); if (flowRunId != 0L) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the flow run id: " + flowRunId); + } collector.getTimelineEntityContext().setFlowRunId(flowRunId); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java index 58d68df..981ee2a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java @@ -19,12 +19,12 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; import org.apache.hadoop.yarn.server.timelineservice.TimelineContext; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; /** * Encapsulates context information required by collector during a put. */ public class TimelineCollectorContext extends TimelineContext { - private String flowVersion; public TimelineCollectorContext() { @@ -34,7 +34,8 @@ public TimelineCollectorContext() { public TimelineCollectorContext(String clusterId, String userId, String flowName, String flowVersion, Long flowRunId, String appId) { super(clusterId, userId, flowName, flowRunId, appId); - this.flowVersion = flowVersion; + this.flowVersion = flowVersion == null ? + TimelineUtils.DEFAULT_FLOW_VERSION : flowVersion; } @Override