Index: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java =================================================================== --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (revision 1594452) +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (working copy) @@ -119,6 +119,11 @@ if (allocateResponse.getAMCommand() != null) { switch(allocateResponse.getAMCommand()) { case AM_RESYNC: + LOG.info("ApplicationMaster is out of sync with ResourceManager," + + " hence resyncing."); + this.lastResponseID = 0; + register(); + break; case AM_SHUTDOWN: LOG.info("Event from RM: shutting down Application Master"); // This can happen if the RM has been restarted. If it is in that state, Index: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java =================================================================== --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (revision 1594452) +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (working copy) @@ -586,6 +586,17 @@ if (response.getAMCommand() != null) { switch(response.getAMCommand()) { case AM_RESYNC: + if (rmWorkPreservingRestartEnabled) { + LOG.info("ApplicationMaster is out of sync with ResourceManager," + + " hence resyncing."); + lastResponseID = 0; + + // Registering to allow RM to discover an active AM for this + // application + register(); + addOutstandingAllocateRequestOnResync(); + break; + } case AM_SHUTDOWN: // This can happen if the RM has been restarted. If it is in that state, // this application must clean itself up. Index: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java =================================================================== --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (revision 1594452) +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (working copy) @@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -56,7 +57,8 @@ private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class); - private int lastResponseID; + protected int lastResponseID; + protected boolean rmWorkPreservingRestartEnabled; private Resource availableResources; private final RecordFactory recordFactory = @@ -131,6 +133,9 @@ @Override protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); + rmWorkPreservingRestartEnabled = + conf.getBoolean(MRJobConfig.MR_RM_WORKPRESERVING_RESTART_ENABLED, + MRJobConfig.DEFAULT_MR_RM_WORKPRESERVING_RESTART_ENABLED); nodeBlacklistingEnabled = conf.getBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true); LOG.info("nodeBlacklistingEnabled:" + nodeBlacklistingEnabled); @@ -163,6 +168,10 @@ } catch (YarnException e) { throw new IOException(e); } + + if (rmWorkPreservingRestartEnabled && isResyncCommand(allocateResponse)) { + return allocateResponse; + } lastResponseID = allocateResponse.getResponseId(); availableResources = allocateResponse.getAvailableResources(); lastClusterNmCount = clusterNmCount; @@ -190,7 +199,25 @@ blacklistRemovals.clear(); return allocateResponse; } - + + private boolean isResyncCommand(AllocateResponse allocateResponse) { + return allocateResponse.getAMCommand() != null + && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC; + } + + protected void addOutstandingAllocateRequestOnResync() { + for (Map> remoteRequests : remoteRequestsTable + .values()) { + for (Map capabalities : remoteRequests + .values()) { + for (ResourceRequest rr : capabalities.values()) { + addResourceRequestToAsk(rr); + } + } + } + blacklistAdditions.addAll(blacklistedNodes); + } + // May be incorrect if there's multiple NodeManagers running on a single host. // knownNodeCount is based on node managers, not hosts. blacklisting is // currently based on hosts. Index: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java =================================================================== --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (revision 1594452) +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (working copy) @@ -532,6 +532,11 @@ public static final String MR_AM_TO_RM_WAIT_INTERVAL_MS = MR_AM_PREFIX + "scheduler.connection.wait.interval-ms"; public static final int DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS = 360000; + + public static final String MR_RM_WORKPRESERVING_RESTART_ENABLED = + MR_AM_PREFIX + "rm.workpreservingrestart.enabled"; + public static final boolean DEFAULT_MR_RM_WORKPRESERVING_RESTART_ENABLED = + true; /** * How long to wait in milliseconds for the output committer to cancel Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (revision 1594452) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (working copy) @@ -354,6 +354,10 @@ public static final String RM_HA_PREFIX = RM_PREFIX + "ha."; public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled"; public static final boolean DEFAULT_RM_HA_ENABLED = false; + + public static final String RM_WORKPRESERVING_RESTART_ENABLED = RM_PREFIX + + "workpreservingrestart.enabled"; + public static final boolean DEFAULT_RM_WORKPRESERVING_RESTART_ENABLED = true; public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids"; public static final String RM_HA_ID = RM_HA_PREFIX + "id"; Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java (revision 1594452) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java (working copy) @@ -234,8 +234,7 @@ while (true) { try { responseQueue.put(response); - if (response.getAMCommand() == AMCommand.AM_RESYNC - || response.getAMCommand() == AMCommand.AM_SHUTDOWN) { + if (response.getAMCommand() == AMCommand.AM_SHUTDOWN) { return; } break; @@ -281,6 +280,7 @@ if (response.getAMCommand() != null) { switch(response.getAMCommand()) { case AM_RESYNC: + continue; case AM_SHUTDOWN: handler.onShutdownRequest(); LOG.info("Shutdown requested. Stopping callback."); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java (revision 1594452) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java (working copy) @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; @@ -76,7 +77,10 @@ Collections.singletonList(ResourceRequest.ANY); private int lastResponseId = 0; - + protected boolean rmWorkPreservingRestartEnabled; + protected String appHostName; + protected int appHostPort; + protected String appTrackingUrl; protected ApplicationMasterProtocol rmClient; protected Resource clusterAvailableResources; protected int clusterNodeCount; @@ -157,6 +161,9 @@ @Override protected void serviceInit(Configuration conf) throws Exception { + rmWorkPreservingRestartEnabled = + conf.getBoolean(YarnConfiguration.RM_WORKPRESERVING_RESTART_ENABLED, + YarnConfiguration.DEFAULT_RM_WORKPRESERVING_RESTART_ENABLED); RackResolver.init(conf); super.serviceInit(conf); } @@ -185,6 +192,9 @@ public RegisterApplicationMasterResponse registerApplicationMaster( String appHostName, int appHostPort, String appTrackingUrl) throws YarnException, IOException { + this.appHostName = appHostName; + this.appHostPort = appHostPort; + this.appTrackingUrl = appTrackingUrl; Preconditions.checkArgument(appHostName != null, "The host name should not be null"); Preconditions.checkArgument(appHostPort >= -1, "Port number of the host" @@ -249,7 +259,24 @@ } allocateResponse = rmClient.allocate(allocateRequest); - + if (isResyncCommand(allocateResponse) && rmWorkPreservingRestartEnabled) { + LOG.warn("ApplicationMaster is out of sync with ResourceManager," + + " hence resyncing."); + // reset lastResponseId to 0 + lastResponseId = 0; + // re register with RM + registerApplicationMaster(appHostName, appHostPort, appTrackingUrl); + for (Map> remoteRequests : remoteRequestsTable + .values()) { + for (Map capabalities : remoteRequests + .values()) { + for (ResourceRequestInfo rr : capabalities.values()) { + addResourceRequestToAsk(rr.remoteRequest); + } + } + } + return allocateResponse; + } synchronized (this) { // update these on successful RPC clusterNodeCount = allocateResponse.getNumClusterNodes(); @@ -261,7 +288,8 @@ } } finally { // TODO how to differentiate remote yarn exception vs error in rpc - if(allocateResponse == null) { + if (allocateResponse == null + || (isResyncCommand(allocateResponse) && rmWorkPreservingRestartEnabled)) { // we hit an exception in allocate() // preserve ask and release for next call to allocate() synchronized (this) { @@ -287,6 +315,11 @@ } return allocateResponse; } + + private boolean isResyncCommand(AllocateResponse allocateResponse) { + return allocateResponse.getAMCommand() != null + && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC; + } @Private @VisibleForTesting Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java (revision 1594452) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java (working copy) @@ -212,7 +212,7 @@ final AllocateResponse rebootResponse = createAllocateResponse( new ArrayList(), new ArrayList(), null); - rebootResponse.setAMCommand(AMCommand.AM_RESYNC); + rebootResponse.setAMCommand(AMCommand.AM_SHUTDOWN); when(client.allocate(anyFloat())).thenReturn(rebootResponse); AMRMClientAsync asyncClient =