diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 34709104264..ad63720d9fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -1102,24 +1102,8 @@ protected void startContainerInternal( // Create the application // populate the flow context from the launch context if the timeline // service v.2 is enabled - FlowContext flowContext = null; - if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { - String flowName = launchContext.getEnvironment() - .get(TimelineUtils.FLOW_NAME_TAG_PREFIX); - String flowVersion = launchContext.getEnvironment() - .get(TimelineUtils.FLOW_VERSION_TAG_PREFIX); - String flowRunIdStr = launchContext.getEnvironment() - .get(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX); - long flowRunId = 0L; - if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) { - flowRunId = Long.parseLong(flowRunIdStr); - } - flowContext = new FlowContext(flowName, flowVersion, flowRunId); - if (LOG.isDebugEnabled()) { - LOG.debug("Flow context: " + flowContext - + " created for an application " + applicationID); - } - } + FlowContext flowContext = + getFlowContext(launchContext, applicationID); Application application = new ApplicationImpl(dispatcher, user, flowContext, @@ -1138,6 +1122,31 @@ protected void startContainerInternal( dispatcher.getEventHandler().handle(new ApplicationInitEvent( applicationID, appAcls, logAggregationContext)); } + } else if (containerTokenIdentifier.getContainerType() + == ContainerType.APPLICATION_MASTER) { + FlowContext flowContext = + getFlowContext(launchContext, applicationID); + if (flowContext != null) { + ApplicationImpl application = + (ApplicationImpl) context.getApplications().get(applicationID); + + // update flowContext reference in ApplicationImpl + application.setFlowContext(flowContext); + + // Required to update state store for recovery. + context.getNMStateStore().storeApplication(applicationID, + buildAppProto(applicationID, user, credentials, + container.getLaunchContext().getApplicationACLs(), + containerTokenIdentifier.getLogAggregationContext(), + flowContext)); + + LOG.info( + "Updated application reference with flowContext " + flowContext + + " for app " + applicationID); + } else { + LOG.info("TimelineService V2.0 is not enabled. Skipping updating " + + "flowContext for application " + applicationID); + } } this.context.getNMStateStore().storeContainer(containerId, @@ -1163,6 +1172,30 @@ protected void startContainerInternal( } } + private FlowContext getFlowContext(ContainerLaunchContext launchContext, + ApplicationId applicationID) { + FlowContext flowContext = null; + if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { + String flowName = launchContext.getEnvironment() + .get(TimelineUtils.FLOW_NAME_TAG_PREFIX); + String flowVersion = launchContext.getEnvironment() + .get(TimelineUtils.FLOW_VERSION_TAG_PREFIX); + String flowRunIdStr = launchContext.getEnvironment() + .get(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX); + long flowRunId = 0L; + if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) { + flowRunId = Long.parseLong(flowRunIdStr); + } + flowContext = new FlowContext(flowName, flowVersion, flowRunId); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Flow context: " + flowContext + " created for an application " + + applicationID); + } + } + return flowContext; + } + protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier( org.apache.hadoop.yarn.api.records.Token token, ContainerTokenIdentifier containerTokenIdentifier) throws YarnException, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index 39be7a790c4..0c09f28f923 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -666,4 +666,8 @@ public String getFlowVersion() { public long getFlowRunId() { return flowContext == null ? 0L : flowContext.getFlowRunId(); } + + public void setFlowContext(FlowContext fc) { + this.flowContext = fc; + } }