Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (revision 1594122) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (working copy) @@ -354,6 +354,10 @@ public static final String RM_HA_PREFIX = RM_PREFIX + "ha."; public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled"; public static final boolean DEFAULT_RM_HA_ENABLED = false; + + public static final String RM_WORKPRESERVING_RESTART_ENABLED = RM_PREFIX + + "workpreservingrestart.enabled"; + public static final boolean DEFAULT_RM_WORKPRESERVING_RESTART_ENABLED = true; public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids"; public static final String RM_HA_ID = RM_HA_PREFIX + "id"; Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java (revision 1594122) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java (working copy) @@ -281,6 +281,7 @@ if (response.getAMCommand() != null) { switch(response.getAMCommand()) { case AM_RESYNC: + continue; case AM_SHUTDOWN: handler.onShutdownRequest(); LOG.info("Shutdown requested. Stopping callback."); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java (revision 1594122) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java (working copy) @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; @@ -76,7 +77,10 @@ Collections.singletonList(ResourceRequest.ANY); private int lastResponseId = 0; - + protected boolean rmWorkPreservingRestartEnabled; + protected String appHostName; + protected int appHostPort; + protected String appTrackingUrl; protected ApplicationMasterProtocol rmClient; protected Resource clusterAvailableResources; protected int clusterNodeCount; @@ -157,6 +161,9 @@ @Override protected void serviceInit(Configuration conf) throws Exception { + rmWorkPreservingRestartEnabled = + conf.getBoolean(YarnConfiguration.RM_WORKPRESERVING_RESTART_ENABLED, + YarnConfiguration.DEFAULT_RM_WORKPRESERVING_RESTART_ENABLED); RackResolver.init(conf); super.serviceInit(conf); } @@ -185,6 +192,9 @@ public RegisterApplicationMasterResponse registerApplicationMaster( String appHostName, int appHostPort, String appTrackingUrl) throws YarnException, IOException { + this.appHostName = appHostName; + this.appHostPort = appHostPort; + this.appTrackingUrl = appTrackingUrl; Preconditions.checkArgument(appHostName != null, "The host name should not be null"); Preconditions.checkArgument(appHostPort >= -1, "Port number of the host" @@ -249,7 +259,15 @@ } allocateResponse = rmClient.allocate(allocateRequest); - + if (isResyncCommand(allocateResponse) && rmWorkPreservingRestartEnabled) { + LOG.warn("ApplicationMaster is out of sync with ResourceManager," + + " hence resyncing."); + // reset lastResponseId to 0 + lastResponseId = 0; + // re register with RM + registerApplicationMaster(appHostName, appHostPort, appTrackingUrl); + return allocateResponse; + } synchronized (this) { // update these on successful RPC clusterNodeCount = allocateResponse.getNumClusterNodes(); @@ -261,7 +279,8 @@ } } finally { // TODO how to differentiate remote yarn exception vs error in rpc - if(allocateResponse == null) { + if (allocateResponse == null + || (isResyncCommand(allocateResponse) && rmWorkPreservingRestartEnabled)) { // we hit an exception in allocate() // preserve ask and release for next call to allocate() synchronized (this) { @@ -287,6 +306,11 @@ } return allocateResponse; } + + private boolean isResyncCommand(AllocateResponse allocateResponse) { + return allocateResponse.getAMCommand() != null + && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC; + } @Private @VisibleForTesting