diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SubmitApplicationResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SubmitApplicationResponse.java index e3d7c9c..1b8086c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SubmitApplicationResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SubmitApplicationResponse.java @@ -23,13 +23,15 @@ import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.util.Records; /** *

The response sent by the ResourceManager to a client on * application submission.

* - *

Currently, this is empty.

+ *

The response includes the {@link ApplicationId} of + * the submitted application

* * @see ApplicationClientProtocol#submitApplication(SubmitApplicationRequest) */ @@ -38,9 +40,23 @@ public abstract class SubmitApplicationResponse { @Private @Unstable - public static SubmitApplicationResponse newInstance() { + public static SubmitApplicationResponse newInstance( + ApplicationId applicationId) { SubmitApplicationResponse response = Records.newRecord(SubmitApplicationResponse.class); + response.setApplicationId(applicationId); return response; } + + /** + * Get the ApplicationId of the submitted application. + * @return ApplicationId of the submitted application + */ + @Private + @Unstable + public abstract ApplicationId getApplicationId(); + + @Private + @Unstable + public abstract void setApplicationId(ApplicationId applicationId); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index a1f6d2e..c18e978 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -112,6 +112,7 @@ message SubmitApplicationRequestProto { } message SubmitApplicationResponseProto { + optional ApplicationIdProto application_id = 1; } message KillApplicationRequestProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index 51a7353..cf4ca6b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -176,7 +176,7 @@ public YarnClientApplication createApplication() SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext); - rmClient.submitApplication(request); + applicationId = rmClient.submitApplication(request).getApplicationId(); int pollCount = 0; long startTime = System.currentTimeMillis(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java index a57d507..1e425cf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java @@ -35,6 +35,9 @@ import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -43,6 +46,7 @@ import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer; +import org.apache.hadoop.yarn.util.Records; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -109,10 +113,7 @@ public void teardown() { private void verifyClientConnection() { int numRetries = 3; while(numRetries-- > 0) { - Configuration conf = new YarnConfiguration(this.conf); - YarnClient client = YarnClient.createYarnClient(); - client.init(conf); - client.start(); + YarnClient client = createAndStartYarnClient(conf); try { client.getApplications(); return; @@ -252,4 +253,112 @@ private void verifyExpectedException(String exceptionMessage){ .contains("Application with id '" + fakeAppId + "' " + "doesn't exist in RM.")); } + + @Test + public void testAppSubmissionWithApplicationId() throws IOException, + InterruptedException, YarnException { + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + cluster.init(conf); + cluster.start(); + getAdminService(0).transitionToActive(req); + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + verifyConnections(); + + // rm1 is active + // get the applicationId from rm1 + ApplicationId oldAppId = createApplication(); + + // Do the failover + explicitFailover(); + verifyConnections(); + + // now rm2 is Active, + // submit the application with ApplicationId which was assigned by rm1 + ApplicationId newAppId = submitApplication(oldAppId); + + // make sure rm2 accept the original ApplicationId + assertEquals(newAppId, oldAppId); + } + + @Test + public void testAppSubmissionWithoutApplicationId() throws IOException, + InterruptedException, YarnException { + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + cluster.init(conf); + cluster.start(); + getAdminService(0).transitionToActive(req); + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + verifyConnections(); + + // submit the application without ApplicationId specified + ApplicationId newAppId = submitApplication(null); + + // make sure this application is submitted successfully + // and rm has assigned an ApplicationId for it. + assertTrue(newAppId != null); + } + + private ApplicationId createApplication() { + int numRetries = 3; + ApplicationId appId = null; + while (numRetries-- > 0) { + YarnClient client = createAndStartYarnClient(this.conf); + try { + appId = client.createApplication().getApplicationSubmissionContext() + .getApplicationId(); + break; + } catch (Exception e) { + LOG.error(e); + } finally { + client.stop(); + } + } + if (appId == null) { + fail("Client couldn't get an applicationId from the Active RM"); + } + return appId; + } + + private ApplicationId submitApplication(ApplicationId oldAppId) + throws IOException { + ApplicationSubmissionContext appContext = + Records.newRecord(ApplicationSubmissionContext.class); + if (oldAppId != null) { + appContext.setApplicationId(oldAppId); + } + ContainerLaunchContext amContainer = + Records.newRecord(ContainerLaunchContext.class); + appContext.setAMContainerSpec(amContainer); + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(10); + capability.setVirtualCores(1); + appContext.setResource(capability); + + int numRetries = 3; + ApplicationId appId = null; + while (numRetries-- > 0) { + YarnClient client = createAndStartYarnClient(this.conf); + try { + appId = client.submitApplication(appContext); + break; + } catch (Exception e) { + LOG.error(e); + } finally { + client.stop(); + } + } + if (appId == null) { + fail("Client couldn't submit an application to the Active RM"); + } + return appId; + } + + private YarnClient createAndStartYarnClient(Configuration conf) { + Configuration configuration = new YarnConfiguration(conf); + YarnClient client = YarnClient.createYarnClient(); + client.init(configuration); + client.start(); + return client; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 7c34966..9947ec0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -46,6 +46,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -117,6 +119,7 @@ public void testSubmitApplication() { System.currentTimeMillis(), i); when(context.getApplicationId()).thenReturn(applicationId); ((MockYarnClient) client).setYarnApplicationState(exitStates[i]); + ((MockYarnClient) client).setCurrentApplicationId(applicationId); try { client.submitApplication(context); } catch (YarnException e) { @@ -290,6 +293,17 @@ public void setYarnApplicationState(YarnApplicationState state) { YarnApplicationState.NEW_SAVING, state); } + public void setCurrentApplicationId(ApplicationId applicationId) { + try { + when(rmClient.submitApplication(any(SubmitApplicationRequest.class))) + .thenReturn(SubmitApplicationResponse.newInstance(applicationId)); + } catch (YarnException e) { + Assert.fail("Exception is not expected."); + } catch (IOException e) { + Assert.fail("Exception is not expected."); + } + } + public List getReports() { return this.reports; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SubmitApplicationResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SubmitApplicationResponsePBImpl.java index 9e12776..b3b1454 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SubmitApplicationResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SubmitApplicationResponsePBImpl.java @@ -22,7 +22,11 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProtoOrBuilder; import com.google.protobuf.TextFormat; @@ -33,6 +37,8 @@ SubmitApplicationResponseProto.Builder builder = null; boolean viaProto = false; + private ApplicationId applicationId = null; + public SubmitApplicationResponsePBImpl() { builder = SubmitApplicationResponseProto.newBuilder(); } @@ -43,6 +49,7 @@ public SubmitApplicationResponsePBImpl(SubmitApplicationResponseProto proto) { } public SubmitApplicationResponseProto getProto() { + mergeLocalToProto(); proto = viaProto ? proto : builder.build(); viaProto = true; return proto; @@ -67,4 +74,55 @@ public boolean equals(Object other) { public String toString() { return TextFormat.shortDebugString(getProto()); } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = SubmitApplicationResponseProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.applicationId != null) { + builder.setApplicationId(convertToProtoFormat(this.applicationId)); + } + } + + @Override + public ApplicationId getApplicationId() { + SubmitApplicationResponseProtoOrBuilder p = viaProto ? proto : builder; + if (this.applicationId != null) { + return this.applicationId; + } + if (!p.hasApplicationId()) { + return null; + } + this.applicationId = convertFromProtoFormat(p.getApplicationId()); + return this.applicationId; + } + + @Override + public void setApplicationId(ApplicationId applicationId) { + maybeInitBuilder(); + if (applicationId == null) + builder.clearApplicationId(); + this.applicationId = applicationId; + } + + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl)t).getProto(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index fdde381..e513ac7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -300,6 +300,13 @@ public SubmitApplicationResponse submitApplication( throw RPCUtil.getRemoteException(ie); } + // if the ApplicationId is not specified, + // assign a new ApplicationId. + if (applicationId == null) { + applicationId = getNewApplicationId(); + submissionContext.setApplicationId(applicationId); + } + // Though duplication will checked again when app is put into rmContext, // but it is good to fail the invalid submission as early as possible. if (rmContext.getRMApps().get(applicationId) != null) { @@ -348,8 +355,8 @@ public SubmitApplicationResponse submitApplication( throw e; } - SubmitApplicationResponse response = recordFactory - .newRecordInstance(SubmitApplicationResponse.class); + SubmitApplicationResponse response = + SubmitApplicationResponse.newInstance(applicationId); return response; }