diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java index 864980b..6c630e2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java @@ -24,6 +24,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.io.retry.AtMostOnce; +import org.apache.hadoop.io.retry.Idempotent; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; @@ -87,6 +89,7 @@ */ @Public @Stable + @AtMostOnce public GetNewApplicationResponse getNewApplication( GetNewApplicationRequest request) throws YarnException, IOException; @@ -121,6 +124,7 @@ public GetNewApplicationResponse getNewApplication( */ @Public @Stable + @AtMostOnce public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnException, IOException; @@ -184,6 +188,7 @@ public KillApplicationResponse forceKillApplication( */ @Public @Stable + @Idempotent public GetApplicationReportResponse getApplicationReport( GetApplicationReportRequest request) throws YarnException, IOException; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index a5ff9f6..2ff2d37 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; @@ -63,9 +64,13 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; @@ -145,30 +150,53 @@ public YarnClientApplication createApplication() submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException { ApplicationId applicationId = appContext.getApplicationId(); - appContext.setApplicationId(applicationId); + if (applicationId == null) { + applicationId = getNewApplication().getApplicationId(); + appContext.setApplicationId(applicationId); + } SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext); - rmClient.submitApplication(request); + // check whether the applicationId is present or not + // before we submit the application + try { + getApplicationReport(applicationId); + String message = "Application with id " + applicationId + + " is already present! Cannot add a duplicate!"; + LOG.warn(message); + RMAuditLogger.logFailure(UserGroupInformation.getCurrentUser() + .getShortUserName(), AuditConstants.SUBMIT_APP_REQUEST, + message, "YarnClient", "Exception in submitting application", + applicationId); + throw RPCUtil.getRemoteException(message); + } catch (ApplicationNotFoundException ex) { + // The applicationId is not present. + // submit the application with this applicationId + rmClient.submitApplication(request); + } int pollCount = 0; while (true) { - YarnApplicationState state = - getApplicationReport(applicationId).getYarnApplicationState(); - if (!state.equals(YarnApplicationState.NEW) && - !state.equals(YarnApplicationState.NEW_SAVING)) { - break; - } - // Notify the client through the log every 10 poll, in case the client - // is blocked here too long. - if (++pollCount % 10 == 0) { - LOG.info("Application submission is not finished, " + - "submitted application " + applicationId + - " is still in " + state); - } try { - Thread.sleep(submitPollIntervalMillis); - } catch (InterruptedException ie) { + YarnApplicationState state = + getApplicationReport(applicationId).getYarnApplicationState(); + if (!state.equals(YarnApplicationState.NEW) && + !state.equals(YarnApplicationState.NEW_SAVING)) { + break; + } + // Notify the client through the log every 10 poll, in case the client + // is blocked here too long. + if (++pollCount % 10 == 0) { + LOG.info("Application submission is not finished, " + + "submitted application " + applicationId + + " is still in " + state); + } + try { + Thread.sleep(submitPollIntervalMillis); + } catch (InterruptedException ie) { + } + } catch (ApplicationNotFoundException ex) { + rmClient.submitApplication(request); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java index 8900b16..049ae14 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java @@ -35,6 +35,9 @@ import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -43,6 +46,7 @@ import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer; +import org.apache.hadoop.yarn.util.Records; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -252,4 +256,77 @@ private void verifyExpectedException(String exceptionMessage){ .contains("Application with id '" + fakeAppId + "' " + "doesn't exist in RM.")); } + + @Test + public void testAppSubmission() throws IOException, InterruptedException, + YarnException { + cluster.init(conf); + cluster.start(); + getAdminService(0).transitionToActive(req); + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + + ApplicationId oldAppId = createApplication(); + ApplicationSubmissionContext appContext = + Records.newRecord(ApplicationSubmissionContext.class); + appContext.setApplicationId(oldAppId); + ContainerLaunchContext amContainer = + Records.newRecord(ContainerLaunchContext.class); + appContext.setAMContainerSpec(amContainer); + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(10); + capability.setVirtualCores(1); + appContext.setResource(capability); + + explicitFailover(); + ApplicationId newAppId = submitApplication(appContext); + assertEquals(newAppId, oldAppId); + } + + private ApplicationId createApplication() { + int numRetries = 3; + ApplicationId appId = null; + while (numRetries-- > 0) { + Configuration conf = new YarnConfiguration(this.conf); + YarnClient client = YarnClient.createYarnClient(); + client.init(conf); + client.start(); + try { + appId = client.createApplication().getApplicationSubmissionContext() + .getApplicationId(); + break; + } catch (Exception e) { + LOG.error(e); + } finally { + client.stop(); + } + } + if (appId == null) { + fail("Client couldn't get an applicationId from the Active RM"); + } + return appId; + } + + private ApplicationId submitApplication( + ApplicationSubmissionContext appContext) { + int numRetries = 3; + ApplicationId appId = null; + while (numRetries-- > 0) { + Configuration conf = new YarnConfiguration(this.conf); + YarnClient client = YarnClient.createYarnClient(); + client.init(conf); + client.start(); + try { + appId = client.submitApplication(appContext); + break; + } catch (Exception e) { + LOG.error(e); + } finally { + client.stop(); + } + } + if (appId == null) { + fail("Client couldn't submit an application to the Active RM"); + } + return appId; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index cd2226f..a2361c2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -288,16 +288,13 @@ public SubmitApplicationResponse submitApplication( throw RPCUtil.getRemoteException(ie); } - // Though duplication will checked again when app is put into rmContext, - // but it is good to fail the invalid submission as early as possible. + // The duplication has been checked before application submission if (rmContext.getRMApps().get(applicationId) != null) { - String message = "Application with id " + applicationId + - " is already present! Cannot add a duplicate!"; - LOG.warn(message); - RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, - message, "ClientRMService", "Exception in submitting application", - applicationId); - throw RPCUtil.getRemoteException(message); + // handle the case when failover happens before yarnClient gets + // SubmitApplicationResponse, but application has been accepted. + if (rmContext.isHAEnabled()) { + return SubmitApplicationResponse.newInstance(); + } } if (submissionContext.getQueue() == null) {