diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index b8cc4fd..96db748 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2128,6 +2128,8 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE = 10; + /** default version for any flow. */ + public static final String DEFAULT_FLOW_VERSION = "1"; /** * The time period for which timeline v2 client will wait for draining diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 125b046..617ff20 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -385,13 +385,42 @@ private void recoverApplication(ContainerManagerApplicationProto p) } LOG.info("Recovering application " + appId); - //TODO: Recover flow and flow run ID - ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId, - creds, context, p.getAppLogAggregationInitedTime()); + FlowContext flowContext = recoverFlowContextFromStateStore(appId, + context.getConf()); + ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), + flowContext, appId, creds, context, p.getAppLogAggregationInitedTime()); context.getApplications().put(appId, app); app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext)); } + private FlowContext getDefaultFlowContext(ApplicationId appId) { + FlowContext fc = new FlowContext(appId.toString(), + YarnConfiguration.DEFAULT_FLOW_VERSION, appId.getClusterTimestamp()); + return fc; + } + + private FlowContext recoverFlowContextFromStateStore(ApplicationId appId, + Configuration conf) { + // TODO: Recover flow and flow run ID from state store + // currently, we are using the default flow context. + // Note: + // in upgrade situations, where there is no prior existing flow context, + // default would be used. + FlowContext defaultFlowContext = getDefaultFlowContext(appId); + if (YarnConfiguration.timelineServiceV2Enabled(conf)) { + if (LOG.isWarnEnabled()) { + LOG.warn("Using default flow context " + defaultFlowContext + + " for app " + appId + ". " + + "This indicates that no existing flow context information " + + "was found for this app. " + + "This also implies aggregations in Timeline Service V2 " + + "will not reflect correct flow statistics since " + + "flow info is missing."); + } + } + return defaultFlowContext; + } + private void recoverContainer(RecoveredContainerState rcs) throws IOException { StartContainerRequest req = rcs.getStartRequest(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index 4e14eb0..3b8c450 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -172,6 +172,12 @@ public String getFlowVersion() { public long getFlowRunId() { return flowRunId; } + + @Override + public String toString() { + return "{flowName=" + flowName + " flowVersion=" + flowVersion + + " flowRunId=" + flowRunId + "}"; + } } @Override