diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index f65f1ac..1ed25fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -85,6 +85,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.ContainerType; @@ -381,10 +382,18 @@ private void recoverApplication(ContainerManagerApplicationProto p) new LogAggregationContextPBImpl(p.getLogAggregationContext()); } - LOG.info("Recovering application " + appId); - //TODO: Recover flow and flow run ID - ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId, - creds, context, p.getAppLogAggregationInitedTime()); + FlowContext fc = null; + if (p.getFlowContext() != null) { + FlowContextProto fcp = p.getFlowContext(); + fc = new FlowContext(fcp.getFlowName(), fcp.getFlowVersion(), + fcp.getFlowRunId()); + } + + LOG.info("Recovering application " + appId + "Flow context is fName=" + + fc.getFlowName() + " fVersion=" + fc.getFlowVersion() + " fRunId=" + + fc.getFlowRunId()); + ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), fc, + appId, creds, context, p.getAppLogAggregationInitedTime()); context.getApplications().put(appId, app); app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext)); } @@ -936,7 +945,7 @@ private void performContainerPreStartChecks( private ContainerManagerApplicationProto buildAppProto(ApplicationId appId, String user, Credentials credentials, Map appAcls, - LogAggregationContext logAggregationContext) { + LogAggregationContext logAggregationContext, FlowContext flowContext) { ContainerManagerApplicationProto.Builder builder = ContainerManagerApplicationProto.newBuilder(); @@ -971,6 +980,15 @@ private ContainerManagerApplicationProto buildAppProto(ApplicationId appId, } } + builder.clearFlowContext(); + if (flowContext != null) { + FlowContextProto fcp = + FlowContextProto.newBuilder().setFlowName(flowContext.getFlowName()) + .setFlowVersion(flowContext.getFlowVersion()) + .setFlowRunId(flowContext.getFlowRunId()).build(); + builder.setFlowContext(fcp); + } + return builder.build(); } @@ -1048,7 +1066,7 @@ protected void startContainerInternal( container.getLaunchContext().getApplicationACLs(); context.getNMStateStore().storeApplication(applicationID, buildAppProto(applicationID, user, credentials, appAcls, - logAggregationContext)); + logAggregationContext, flowContext)); dispatcher.getEventHandler().handle(new ApplicationInitEvent( applicationID, appAcls, logAggregationContext)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index 444581c..0799a09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType; @@ -106,13 +107,6 @@ public ApplicationImpl(Dispatcher dispatcher, String user, } public ApplicationImpl(Dispatcher dispatcher, String user, - ApplicationId appId, Credentials credentials, Context context, - long recoveredLogInitedTime) { - this(dispatcher, user, null, appId, credentials, context, - recoveredLogInitedTime); - } - - public ApplicationImpl(Dispatcher dispatcher, String user, FlowContext flowContext, ApplicationId appId, Credentials credentials, Context context, long recoveredLogInitedTime) { this.dispatcher = dispatcher; @@ -390,6 +384,15 @@ static ContainerManagerApplicationProto buildAppProto(ApplicationImpl app) builder.setAppLogAggregationInitedTime(app.applicationLogInitedTimestamp); + builder.clearFlowContext(); + if (app.flowContext != null) { + FlowContextProto fcp = FlowContextProto.newBuilder() + .setFlowName(app.flowContext.getFlowName()) + .setFlowVersion(app.flowContext.getFlowVersion()) + .setFlowRunId(app.flowContext.getFlowRunId()).build(); + builder.setFlowContext(fcp); + } + return builder.build(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto index 0dfa20e..7831711 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto @@ -31,6 +31,7 @@ message ContainerManagerApplicationProto { repeated ApplicationACLMapProto acls = 4; optional LogAggregationContextProto log_aggregation_context = 5; optional int64 appLogAggregationInitedTime = 6 [ default = -1 ]; + optional FlowContextProto flowContext = 7; } message DeletionServiceDeleteTaskProto { @@ -52,3 +53,9 @@ message LogDeleterProto { optional string user = 1; optional int64 deletionTime = 2; } + +message FlowContextProto { + optional string flowName = 1; + optional string flowVersion = 2; + optional int64 flowRunId = 3; +}