diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index 51a7353..9a1bc12 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -172,7 +172,10 @@ public YarnClientApplication createApplication() submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException { ApplicationId applicationId = appContext.getApplicationId(); - appContext.setApplicationId(applicationId); + if (applicationId == null) { + applicationId = getNewApplication().getApplicationId(); + appContext.setApplicationId(applicationId); + } SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java index a57d507..e7222f8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java @@ -35,6 +35,9 @@ import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -43,6 +46,7 @@ import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer; +import org.apache.hadoop.yarn.util.Records; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -109,10 +113,7 @@ public void teardown() { private void verifyClientConnection() { int numRetries = 3; while(numRetries-- > 0) { - Configuration conf = new YarnConfiguration(this.conf); - YarnClient client = YarnClient.createYarnClient(); - client.init(conf); - client.start(); + YarnClient client = createAndStartYarnClient(conf); try { client.getApplications(); return; @@ -252,4 +253,101 @@ private void verifyExpectedException(String exceptionMessage){ .contains("Application with id '" + fakeAppId + "' " + "doesn't exist in RM.")); } + + @Test + public void testAppSubmissionWithApplicationId() throws IOException, + InterruptedException, YarnException { + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + cluster.init(conf); + cluster.start(); + getAdminService(0).transitionToActive(req); + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + + ApplicationId oldAppId = createApplication(); + + // Do the failover + explicitFailover(); + + ApplicationId newAppId = submitApplication(oldAppId); + assertEquals(newAppId, oldAppId); + } + + @Test + public void testAppSubmissionWithoutApplicationId() throws IOException, + InterruptedException, YarnException { + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + cluster.init(conf); + cluster.start(); + getAdminService(0).transitionToActive(req); + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + + ApplicationId newAppId = submitApplication(null); + assertTrue(newAppId != null); + } + + private ApplicationId createApplication() { + int numRetries = 3; + ApplicationId appId = null; + while (numRetries-- > 0) { + YarnClient client = createAndStartYarnClient(this.conf); + try { + appId = client.createApplication().getApplicationSubmissionContext() + .getApplicationId(); + break; + } catch (Exception e) { + LOG.error(e); + } finally { + client.stop(); + } + } + if (appId == null) { + fail("Client couldn't get an applicationId from the Active RM"); + } + return appId; + } + + private ApplicationId submitApplication(ApplicationId oldAppId) + throws IOException { + ApplicationSubmissionContext appContext = + Records.newRecord(ApplicationSubmissionContext.class); + if (oldAppId != null) { + appContext.setApplicationId(oldAppId); + } + ContainerLaunchContext amContainer = + Records.newRecord(ContainerLaunchContext.class); + appContext.setAMContainerSpec(amContainer); + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(10); + capability.setVirtualCores(1); + appContext.setResource(capability); + + explicitFailover(); + + int numRetries = 3; + ApplicationId appId = null; + while (numRetries-- > 0) { + YarnClient client = createAndStartYarnClient(this.conf); + try { + appId = client.submitApplication(appContext); + break; + } catch (Exception e) { + LOG.error(e); + } finally { + client.stop(); + } + } + if (appId == null) { + fail("Client couldn't submit an application to the Active RM"); + } + return appId; + } + + private YarnClient createAndStartYarnClient(Configuration conf) { + Configuration configuration = new YarnConfiguration(conf); + YarnClient client = YarnClient.createYarnClient(); + client.init(configuration); + client.start(); + return client; + } + }