diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java index 0abaafb..257292d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java @@ -24,6 +24,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.io.retry.AtMostOnce; +import org.apache.hadoop.io.retry.Idempotent; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; @@ -89,6 +91,7 @@ */ @Public @Stable + @Idempotent public GetNewApplicationResponse getNewApplication( GetNewApplicationRequest request) throws YarnException, IOException; @@ -123,6 +126,7 @@ public GetNewApplicationResponse getNewApplication( */ @Public @Stable + @AtMostOnce public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnException, IOException; @@ -186,6 +190,7 @@ public KillApplicationResponse forceKillApplication( */ @Public @Stable + @Idempotent public GetApplicationReportResponse getApplicationReport( GetApplicationReportRequest request) throws YarnException, IOException; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index feb3bb7..3559d10 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -172,43 +172,66 @@ public YarnClientApplication createApplication() submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException { ApplicationId applicationId = appContext.getApplicationId(); - appContext.setApplicationId(applicationId); + + if (applicationId == null) { + applicationId = getNewApplication().getApplicationId(); + appContext.setApplicationId(applicationId); + } SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext); - rmClient.submitApplication(request); + // check whether the applicationId is present or not + // before we submit the application. + try { + getApplicationReport(applicationId); + String message = "Application with id " + applicationId + + " is already present! Cannot add a duplicate!"; + LOG.error(message); + throw new YarnException(message); + } catch (ApplicationNotFoundException ex) { + // The applicationId is not present. + // submit the application with this applicationId + rmClient.submitApplication(request); + } int pollCount = 0; long startTime = System.currentTimeMillis(); while (true) { - YarnApplicationState state = - getApplicationReport(applicationId).getYarnApplicationState(); - if (!state.equals(YarnApplicationState.NEW) && - !state.equals(YarnApplicationState.NEW_SAVING)) { - LOG.info("Submitted application " + applicationId); - break; - } + try { + YarnApplicationState state = + getApplicationReport(applicationId).getYarnApplicationState(); + if (!state.equals(YarnApplicationState.NEW) && + !state.equals(YarnApplicationState.NEW_SAVING)) { + LOG.info("Submitted application " + applicationId); + break; + } - long elapsedMillis = System.currentTimeMillis() - startTime; - if (enforceAsyncAPITimeout() && - elapsedMillis >= asyncApiPollTimeoutMillis) { - throw new YarnException("Timed out while waiting for application " + - applicationId + " to be submitted successfully"); - } + long elapsedMillis = System.currentTimeMillis() - startTime; + if (enforceAsyncAPITimeout() && + elapsedMillis >= asyncApiPollTimeoutMillis) { + throw new YarnException("Timed out while waiting for application " + + applicationId + " to be submitted successfully"); + } - // Notify the client through the log every 10 poll, in case the client - // is blocked here too long. - if (++pollCount % 10 == 0) { - LOG.info("Application submission is not finished, " + - "submitted application " + applicationId + - " is still in " + state); - } - try { - Thread.sleep(submitPollIntervalMillis); - } catch (InterruptedException ie) { - LOG.error("Interrupted while waiting for application " + applicationId - + " to be successfully submitted."); + // Notify the client through the log every 10 poll, in case the client + // is blocked here too long. + if (++pollCount % 10 == 0) { + LOG.info("Application submission is not finished, " + + "submitted application " + applicationId + + " is still in " + state); + } + try { + Thread.sleep(submitPollIntervalMillis); + } catch (InterruptedException ie) { + LOG.error("Interrupted while waiting for application " + + applicationId + + " to be successfully submitted."); + } + } catch (ApplicationNotFoundException ex) { + // FailOver or RM restart happens before RMStateStore saves + // ApplicationState + rmClient.submitApplication(request); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java index a57d507..7705564 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java @@ -35,6 +35,9 @@ import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -43,6 +46,7 @@ import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer; +import org.apache.hadoop.yarn.util.Records; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -109,10 +113,7 @@ public void teardown() { private void verifyClientConnection() { int numRetries = 3; while(numRetries-- > 0) { - Configuration conf = new YarnConfiguration(this.conf); - YarnClient client = YarnClient.createYarnClient(); - client.init(conf); - client.start(); + YarnClient client = createAndStartYarnClient(this.conf); try { client.getApplications(); return; @@ -252,4 +253,96 @@ private void verifyExpectedException(String exceptionMessage){ .contains("Application with id '" + fakeAppId + "' " + "doesn't exist in RM.")); } + + @Test + public void testAppSubmissionWithApplicationId() throws IOException, + InterruptedException, YarnException { + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + cluster.init(conf); + cluster.start(); + getAdminService(0).transitionToActive(req); + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + + ApplicationId oldAppId = createApplication(); + ApplicationId newAppId = submitApplication(oldAppId); + assertEquals(newAppId, oldAppId); + } + + @Test + public void testAppSubmissionWithoutApplicationId() throws IOException, + InterruptedException, YarnException { + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + cluster.init(conf); + cluster.start(); + getAdminService(0).transitionToActive(req); + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + + ApplicationId newAppId = submitApplication(null); + assertTrue(newAppId != null); + } + + private ApplicationId createApplication() { + int numRetries = 3; + ApplicationId appId = null; + while (numRetries-- > 0) { + YarnClient client = createAndStartYarnClient(this.conf); + try { + appId = client.createApplication().getApplicationSubmissionContext() + .getApplicationId(); + break; + } catch (Exception e) { + LOG.error(e); + } finally { + client.stop(); + } + } + if (appId == null) { + fail("Client couldn't get an applicationId from the Active RM"); + } + return appId; + } + + private ApplicationId submitApplication(ApplicationId oldAppId) + throws IOException { + ApplicationSubmissionContext appContext = + Records.newRecord(ApplicationSubmissionContext.class); + if (oldAppId != null) { + appContext.setApplicationId(oldAppId); + } + ContainerLaunchContext amContainer = + Records.newRecord(ContainerLaunchContext.class); + appContext.setAMContainerSpec(amContainer); + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(10); + capability.setVirtualCores(1); + appContext.setResource(capability); + + explicitFailover(); + + int numRetries = 3; + ApplicationId appId = null; + while (numRetries-- > 0) { + YarnClient client = createAndStartYarnClient(this.conf); + try { + appId = client.submitApplication(appContext); + break; + } catch (Exception e) { + LOG.error(e); + } finally { + client.stop(); + } + } + if (appId == null) { + fail("Client couldn't submit an application to the Active RM"); + } + return appId; + } + + private YarnClient createAndStartYarnClient(Configuration conf) { + Configuration configuration = new YarnConfiguration(conf); + YarnClient client = YarnClient.createYarnClient(); + client.init(configuration); + client.start(); + return client; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 7c34966..1c80fdc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -46,6 +46,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -59,6 +61,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -237,6 +240,8 @@ public void testGetApplications() throws YarnException, IOException { private List reports; GetApplicationsResponse mockAppResponse = mock(GetApplicationsResponse.class); + GetApplicationReportResponse mockResponse = + mock(GetApplicationReportResponse.class); public MockYarnClient() { super(); @@ -246,8 +251,6 @@ public MockYarnClient() { @Override public void start() { rmClient = mock(ApplicationClientProtocol.class); - GetApplicationReportResponse mockResponse = - mock(GetApplicationReportResponse.class); mockReport = mock(ApplicationReport.class); try{ when(rmClient.getApplicationReport(any( @@ -259,6 +262,8 @@ public void start() { KillApplicationRequest.class))) .thenReturn(KillApplicationResponse.newInstance(false)).thenReturn( KillApplicationResponse.newInstance(true)); + when(rmClient.submitApplication(any(SubmitApplicationRequest.class))) + .thenReturn(SubmitApplicationResponse.newInstance()); } catch (YarnException e) { Assert.fail("Exception is not expected."); } catch (IOException e) { @@ -272,6 +277,18 @@ public ApplicationClientProtocol getRMClient() { } @Override + public ApplicationId submitApplication( + ApplicationSubmissionContext appContext) throws YarnException, + IOException { + when(rmClient.getApplicationReport( + any(GetApplicationReportRequest.class))).thenThrow( + new ApplicationNotFoundException( + "Throw ApplicationNotFoundException")).thenReturn( + mockResponse); + return super.submitApplication(appContext); + } + + @Override public List getApplications( Set applicationTypes, EnumSet applicationStates) throws YarnException, IOException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index fdde381..c983f86 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -300,16 +300,13 @@ public SubmitApplicationResponse submitApplication( throw RPCUtil.getRemoteException(ie); } - // Though duplication will checked again when app is put into rmContext, - // but it is good to fail the invalid submission as early as possible. + // The duplication has been checked before application submission. + // If it is true, that means the failover or RM restart happens + // after RMStateStore saves ApplicationState. + // Since the applicationSubmit is successful, + // return the SubmitApplicationResponse if (rmContext.getRMApps().get(applicationId) != null) { - String message = "Application with id " + applicationId + - " is already present! Cannot add a duplicate!"; - LOG.warn(message); - RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, - message, "ClientRMService", "Exception in submitting application", - applicationId); - throw RPCUtil.getRemoteException(message); + return SubmitApplicationResponse.newInstance(); } if (submissionContext.getQueue() == null) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 7c49681..30424a6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -407,15 +407,6 @@ public void handle(Event event) {} Assert.assertEquals("app name doesn't match", name, app2.getName()); Assert.assertEquals("app queue doesn't match", queue, app2.getQueue()); - // duplicate appId - try { - rmService.submitApplication(submitRequest2); - Assert.fail("Exception is expected."); - } catch (YarnException e) { - Assert.assertTrue("The thrown exception is not expected.", - e.getMessage().contains("Cannot add a duplicate!")); - } - GetApplicationsRequest getAllAppsRequest = GetApplicationsRequest.newInstance(new HashSet()); GetApplicationsResponse getAllApplicationsResponse =