diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java index 864980b..6c630e2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java @@ -24,6 +24,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.io.retry.AtMostOnce; +import org.apache.hadoop.io.retry.Idempotent; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; @@ -87,6 +89,7 @@ */ @Public @Stable + @AtMostOnce public GetNewApplicationResponse getNewApplication( GetNewApplicationRequest request) throws YarnException, IOException; @@ -121,6 +124,7 @@ public GetNewApplicationResponse getNewApplication( */ @Public @Stable + @AtMostOnce public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnException, IOException; @@ -184,6 +188,7 @@ public KillApplicationResponse forceKillApplication( */ @Public @Stable + @Idempotent public GetApplicationReportResponse getApplicationReport( GetApplicationReportRequest request) throws YarnException, IOException; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index a5ff9f6..7f0df56 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -145,30 +146,38 @@ public YarnClientApplication createApplication() submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException { ApplicationId applicationId = appContext.getApplicationId(); - appContext.setApplicationId(applicationId); + if (applicationId == null) { + applicationId = getNewApplication().getApplicationId(); + appContext.setApplicationId(applicationId); + } SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext); + rmClient.submitApplication(request); int pollCount = 0; while (true) { - YarnApplicationState state = - getApplicationReport(applicationId).getYarnApplicationState(); - if (!state.equals(YarnApplicationState.NEW) && - !state.equals(YarnApplicationState.NEW_SAVING)) { - break; - } - // Notify the client through the log every 10 poll, in case the client - // is blocked here too long. - if (++pollCount % 10 == 0) { - LOG.info("Application submission is not finished, " + - "submitted application " + applicationId + - " is still in " + state); - } try { - Thread.sleep(submitPollIntervalMillis); - } catch (InterruptedException ie) { + YarnApplicationState state = + getApplicationReport(applicationId).getYarnApplicationState(); + if (!state.equals(YarnApplicationState.NEW) && + !state.equals(YarnApplicationState.NEW_SAVING)) { + break; + } + // Notify the client through the log every 10 poll, in case the client + // is blocked here too long. + if (++pollCount % 10 == 0) { + LOG.info("Application submission is not finished, " + + "submitted application " + applicationId + + " is still in " + state); + } + try { + Thread.sleep(submitPollIntervalMillis); + } catch (InterruptedException ie) { + } + } catch (ApplicationNotFoundException ex) { + rmClient.submitApplication(request); } }