diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java index 0abaafb..d7e1a9f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.io.retry.Idempotent; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; @@ -58,7 +59,9 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; /** *

The protocol between clients and the ResourceManager @@ -107,7 +110,12 @@ public GetNewApplicationResponse getNewApplication( * {@link SubmitApplicationResponse} on accepting the submission and throws * an exception if it rejects the submission. However, this call needs to be * followed by {@link #getApplicationReport(GetApplicationReportRequest)} - * to make sure that the application gets properly submitted.

+ * to make sure that the application gets properly submitted. If Failover + * or RM restart happens before RMStateStore saves ApplicationState, + * {@link #getApplicationReport(GetApplicationReportRequest)} will throw + * the {@link ApplicationNotFoundException}. The Clients need to re-submit + * the application with the same {@link ApplicationSubmissionContext} when + * catch the {@link ApplicationNotFoundException}

* *

In secure mode,the ResourceManager verifies access to * queues etc. before accepting the application submission.

@@ -186,6 +194,7 @@ public KillApplicationResponse forceKillApplication( */ @Public @Stable + @Idempotent public GetApplicationReportResponse getApplicationReport( GetApplicationReportRequest request) throws YarnException, IOException; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java index 0149130..27658a2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java @@ -29,6 +29,9 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.io.Text; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -45,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl; import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -94,6 +98,17 @@ public abstract YarnClientApplication createApplication() * otherwise, it will throw the {@link ApplicationIdNotProvidedException} *

* + *

After {@link ApplicationClientProtocol#submitApplication + * (SubmitApplicationRequest)} call, we need to call + * {@link ApplicationClientProtocol#getApplicationReport + * (GetApplicationReportRequest)} to make sure that the application + * gets properly submitted. If Failover or RM restart happens before + * RMStateStore saves ApplicationState, {@link ApplicationClientProtocol + * #getApplicationReport(GetApplicationReportRequest)} will throw + * the {@link ApplicationNotFoundException}. We automatically re-submit + * the application with the same {@link ApplicationSubmissionContext} when + * catch the {@link ApplicationNotFoundException}

+ * * @param appContext * {@link ApplicationSubmissionContext} containing all the details * needed to submit a new application diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index cb88fde..f7f955e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -187,35 +187,43 @@ public YarnClientApplication createApplication() int pollCount = 0; long startTime = System.currentTimeMillis(); - //TODO: YARN-1764:Handle RM fail overs after the submitApplication call. while (true) { - YarnApplicationState state = - getApplicationReport(applicationId).getYarnApplicationState(); - if (!state.equals(YarnApplicationState.NEW) && - !state.equals(YarnApplicationState.NEW_SAVING)) { - LOG.info("Submitted application " + applicationId); - break; - } + try { + YarnApplicationState state = + getApplicationReport(applicationId).getYarnApplicationState(); + if (!state.equals(YarnApplicationState.NEW) && + !state.equals(YarnApplicationState.NEW_SAVING)) { + LOG.info("Submitted application " + applicationId); + break; + } - long elapsedMillis = System.currentTimeMillis() - startTime; - if (enforceAsyncAPITimeout() && - elapsedMillis >= asyncApiPollTimeoutMillis) { - throw new YarnException("Timed out while waiting for application " + - applicationId + " to be submitted successfully"); - } + long elapsedMillis = System.currentTimeMillis() - startTime; + if (enforceAsyncAPITimeout() && + elapsedMillis >= asyncApiPollTimeoutMillis) { + throw new YarnException("Timed out while waiting for application " + + applicationId + " to be submitted successfully"); + } - // Notify the client through the log every 10 poll, in case the client - // is blocked here too long. - if (++pollCount % 10 == 0) { - LOG.info("Application submission is not finished, " + - "submitted application " + applicationId + - " is still in " + state); - } - try { - Thread.sleep(submitPollIntervalMillis); - } catch (InterruptedException ie) { - LOG.error("Interrupted while waiting for application " + applicationId - + " to be successfully submitted."); + // Notify the client through the log every 10 poll, in case the client + // is blocked here too long. + if (++pollCount % 10 == 0) { + LOG.info("Application submission is not finished, " + + "submitted application " + applicationId + + " is still in " + state); + } + try { + Thread.sleep(submitPollIntervalMillis); + } catch (InterruptedException ie) { + LOG.error("Interrupted while waiting for application " + + applicationId + + " to be successfully submitted."); + } + } catch (ApplicationNotFoundException ex) { + // FailOver or RM restart happens before RMStateStore saves + // ApplicationState + LOG.info("Re-submit application " + applicationId + "with the " + + "same ApplicationSubmissionContext"); + rmClient.submitApplication(request); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java index d02d43b..0c7da89 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java @@ -24,8 +24,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.junit.Test; @@ -80,10 +82,141 @@ private void verifySubmitApp(MockRM rm, RMApp app, count++; } // Verify submittion is successful - Assert.assertFalse(rm.getApplicationReport(app.getApplicationId()) - .getYarnApplicationState() == YarnApplicationState.NEW); - Assert.assertFalse(rm.getApplicationReport(app.getApplicationId()) - .getYarnApplicationState() == YarnApplicationState.NEW_SAVING); + YarnApplicationState state = + rm.getApplicationReport(app.getApplicationId()) + .getYarnApplicationState(); + Assert.assertTrue(state == YarnApplicationState.ACCEPTED + || state == YarnApplicationState.SUBMITTED); Assert.assertEquals(expectedAppId, app.getApplicationId()); } + + // There are two scenarios when RM failover happens + // after SubmitApplication Call: + // 1) RMStateStore already saved the ApplicationState when failover happens + // 2) RMStateStore did not save the ApplicationState when failover happens + + @Test + public void + testHandleRMHAafterSubmitApplicationCallWithSavedApplicationState() + throws Exception { + // Test scenario 1 when RM failover happens + // after SubmitApplication Call: + // RMStateStore already saved the ApplicationState when failover happens + startRMs(); + + // Submit Application + // After submission, the applicationState will be saved in RMStateStore. + RMApp app0 = rm1.submitApp(200); + + // Do the failover + explicitFailover(); + + // Since the applicationState has already been saved in RMStateStore + // before failover happens, the current active rm can load the previous + // applicationState. + ApplicationReport appReport = + rm2.getApplicationReport(app0.getApplicationId()); + + // verify previous submission is successful. + Assert.assertTrue(appReport.getYarnApplicationState() + == YarnApplicationState.ACCEPTED || + appReport.getYarnApplicationState() + == YarnApplicationState.SUBMITTED); + } + + @Test + public void + testHandleRMHAafterSubmitApplicationCallWithoutSavedApplicationState() + throws Exception { + // Test scenario 2 when RM failover happens + // after SubmitApplication Call: + // RMStateStore did not save the ApplicationState when failover happens. + // Using customized RMAppManager. + startRMsWithCustomizedRMAppManager(); + + // Submit Application + // After submission, the applicationState will + // not be saved in RMStateStore + RMApp app0 = + rm1.submitApp(200, "", UserGroupInformation + .getCurrentUser().getShortUserName(), null, false, null, + configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, + false, false); + + // Do the failover + explicitFailover(); + + // Since the applicationState is not saved in RMStateStore + // when failover happens. The current active RM can not load + // previous applicationState. + // Expect ApplicationNotFoundException by calling getApplicationReport(). + try { + rm2.getApplicationReport(app0.getApplicationId()); + Assert.fail("Should get ApplicationNotFoundException here"); + } catch (ApplicationNotFoundException ex) { + // expected ApplicationNotFoundException + } + + // Submit the application with previous ApplicationId to current active RM + // This will mimic the similar behavior of YarnClient which will re-submit + // Application with previous applicationId + // when catches the ApplicationNotFoundException + RMApp app1 = + rm2.submitApp(200, "", UserGroupInformation + .getCurrentUser().getShortUserName(), null, false, null, + configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, + false, false, true, app0.getApplicationId()); + + verifySubmitApp(rm2, app1, app0.getApplicationId()); + } + + /** + * Test multiple calls of getApplicationReport, to make sure + * it is idempotent + */ + @Test + public void testGetApplicationReportIdempotent() throws Exception{ + // start two RMs, and transit rm1 to active, rm2 to standby + startRMs(); + + // Submit Application + // After submission, the applicationState will be saved in RMStateStore. + RMApp app = rm1.submitApp(200); + + ApplicationReport appReport1 = + rm1.getApplicationReport(app.getApplicationId()); + Assert.assertTrue(appReport1.getYarnApplicationState() == + YarnApplicationState.ACCEPTED || + appReport1.getYarnApplicationState() == + YarnApplicationState.SUBMITTED); + + // call getApplicationReport again + ApplicationReport appReport2 = + rm1.getApplicationReport(app.getApplicationId()); + Assert.assertEquals(appReport1.getApplicationId(), + appReport2.getApplicationId()); + Assert.assertEquals(appReport1.getYarnApplicationState(), + appReport2.getYarnApplicationState()); + + // Do the failover + explicitFailover(); + + // call getApplicationReport + ApplicationReport appReport3 = + rm2.getApplicationReport(app.getApplicationId()); + Assert.assertEquals(appReport1.getApplicationId(), + appReport3.getApplicationId()); + Assert.assertEquals(appReport1.getYarnApplicationState(), + appReport3.getYarnApplicationState()); + + // call getApplicationReport again + ApplicationReport appReport4 = + rm2.getApplicationReport(app.getApplicationId()); + Assert.assertEquals(appReport3.getApplicationId(), + appReport4.getApplicationId()); + Assert.assertEquals(appReport3.getYarnApplicationState(), + appReport4.getYarnApplicationState()); + } }