diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index b9d283f..5a30277 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -586,6 +586,16 @@ public void rampDownReduces(int rampDown) { if (response.getAMCommand() != null) { switch(response.getAMCommand()) { case AM_RESYNC: + if (rmWorkPreservingRestartEnabled) { + LOG.info("ApplicationMaster is out of sync with ResourceManager," + + " hence resyncing."); + lastResponseID = 0; + + // Registering to allow RM to discover an active AM for this application + + register(); + break; + } case AM_SHUTDOWN: // This can happen if the RM has been restarted. If it is in that state, // this application must clean itself up. diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index a9b5ce5..aa9329a 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -56,7 +57,8 @@ private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class); - private int lastResponseID; + protected int lastResponseID; + protected boolean rmWorkPreservingRestartEnabled; private Resource availableResources; private final RecordFactory recordFactory = @@ -131,6 +133,9 @@ public String toString() { @Override protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); + rmWorkPreservingRestartEnabled = conf.getBoolean(MRJobConfig + .MR_RM_WORK_PRESERVING_RECOVERY_ENABLED, + MRJobConfig.DEFAULT_MR_RM_WORK_PRESERVING_RECOVERY_ENABLED); nodeBlacklistingEnabled = conf.getBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true); LOG.info("nodeBlacklistingEnabled:" + nodeBlacklistingEnabled); @@ -163,6 +168,13 @@ protected AllocateResponse makeRemoteRequest() throws IOException { } catch (YarnException e) { throw new IOException(e); } + + if (rmWorkPreservingRestartEnabled && isResyncCommand(allocateResponse)) { + LOG.warn("ApplicationMaster is out of sync with ResourceManager," + + " hence resyncing."); + return allocateResponse; + } + lastResponseID = allocateResponse.getResponseId(); availableResources = allocateResponse.getAvailableResources(); lastClusterNmCount = clusterNmCount; @@ -190,6 +202,12 @@ protected AllocateResponse makeRemoteRequest() throws IOException { blacklistRemovals.clear(); return allocateResponse; } + + private boolean isResyncCommand(AllocateResponse allocateResponse) + { + return allocateResponse.getAMCommand() != null + && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC; + } // May be incorrect if there's multiple NodeManagers running on a single host. // knownNodeCount is based on node managers, not hosts. blacklisting is diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 33d79ee..c4a8c4c 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -533,6 +533,12 @@ MR_AM_PREFIX + "scheduler.connection.wait.interval-ms"; public static final int DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS = 360000; + + public static final String MR_RM_WORK_PRESERVING_RECOVERY_ENABLED = + MR_AM_PREFIX + "rm.work.preserving.recovery.enabled"; + public static final boolean DEFAULT_MR_RM_WORK_PRESERVING_RECOVERY_ENABLED = + true; + /** * How long to wait in milliseconds for the output committer to cancel * an operation when the job is being killed