diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SubmitApplicationResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SubmitApplicationResponse.java index e3d7c9c..1b8086c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SubmitApplicationResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SubmitApplicationResponse.java @@ -23,13 +23,15 @@ import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.util.Records; /** *
The response sent by the ResourceManager to a client on
* application submission.
Currently, this is empty.
+ *The response includes the {@link ApplicationId} of + * the submitted application
* * @see ApplicationClientProtocol#submitApplication(SubmitApplicationRequest) */ @@ -38,9 +40,23 @@ public abstract class SubmitApplicationResponse { @Private @Unstable - public static SubmitApplicationResponse newInstance() { + public static SubmitApplicationResponse newInstance( + ApplicationId applicationId) { SubmitApplicationResponse response = Records.newRecord(SubmitApplicationResponse.class); + response.setApplicationId(applicationId); return response; } + + /** + * Get theApplicationId of the submitted application.
+ * @return ApplicationId of the submitted application
+ */
+ @Private
+ @Unstable
+ public abstract ApplicationId getApplicationId();
+
+ @Private
+ @Unstable
+ public abstract void setApplicationId(ApplicationId applicationId);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index a1f6d2e..c18e978 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -112,6 +112,7 @@ message SubmitApplicationRequestProto {
}
message SubmitApplicationResponseProto {
+ optional ApplicationIdProto application_id = 1;
}
message KillApplicationRequestProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index 51a7353..cf4ca6b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -176,7 +176,7 @@ public YarnClientApplication createApplication()
SubmitApplicationRequest request =
Records.newRecord(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(appContext);
- rmClient.submitApplication(request);
+ applicationId = rmClient.submitApplication(request).getApplicationId();
int pollCount = 0;
long startTime = System.currentTimeMillis();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
index a57d507..6de9826 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
@@ -35,6 +35,9 @@
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -43,6 +46,7 @@
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer;
+import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -109,10 +113,7 @@ public void teardown() {
private void verifyClientConnection() {
int numRetries = 3;
while(numRetries-- > 0) {
- Configuration conf = new YarnConfiguration(this.conf);
- YarnClient client = YarnClient.createYarnClient();
- client.init(conf);
- client.start();
+ YarnClient client = createAndStartYarnClient(conf);
try {
client.getApplications();
return;
@@ -252,4 +253,125 @@ private void verifyExpectedException(String exceptionMessage){
.contains("Application with id '" + fakeAppId + "' " +
"doesn't exist in RM."));
}
+
+ @Test (timeout = 15000)
+ public void testAppSubmissionWithApplicationId() throws IOException,
+ InterruptedException, YarnException {
+ conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
+ cluster.init(conf);
+ cluster.start();
+ getAdminService(0).transitionToActive(req);
+ assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
+ verifyConnections();
+
+ // rm1 is active
+ // get the applicationId from rm1
+ ApplicationId oldAppId = createApplication();
+
+ // Do the failover
+ explicitFailover();
+ verifyConnections();
+
+ // now rm2 is Active,
+ // submit the application with ApplicationId which was assigned by rm1
+ ApplicationId newAppId = submitApplication(oldAppId);
+
+ // make sure rm2 accept the original ApplicationId
+ assertEquals(newAppId, oldAppId);
+ }
+
+ @Test (timeout = 15000)
+ public void testAppSubmissionWithoutApplicationId() throws IOException,
+ InterruptedException, YarnException {
+ conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
+ cluster.init(conf);
+ cluster.start();
+ getAdminService(0).transitionToActive(req);
+ assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
+ verifyConnections();
+
+ // submit the application without ApplicationId specified
+ ApplicationId newAppId = submitApplication(null);
+
+ // make sure this application is submitted successfully
+ // and rm has assigned an ApplicationId for it.
+ assertTrue(newAppId != null);
+
+ // Do the failover
+ explicitFailover();
+ verifyConnections();
+
+ // submit the application without ApplicationId specified again
+ ApplicationId appId = submitApplication(null);
+
+ // make sure this application is submitted successfully
+ // and current active rm has assigned an ApplicationId for it.
+ assertTrue(appId != null);
+
+ // The applicationIds are assigned by two different RMs
+ // they should be different
+ assertFalse(appId == newAppId);
+ }
+
+ private ApplicationId createApplication() {
+ int numRetries = 3;
+ ApplicationId appId = null;
+ while (numRetries-- > 0) {
+ YarnClient client = createAndStartYarnClient(this.conf);
+ try {
+ appId = client.createApplication().getApplicationSubmissionContext()
+ .getApplicationId();
+ break;
+ } catch (Exception e) {
+ LOG.error(e);
+ } finally {
+ client.stop();
+ }
+ }
+ if (appId == null) {
+ fail("Client couldn't get an applicationId from the Active RM");
+ }
+ return appId;
+ }
+
+ private ApplicationId submitApplication(ApplicationId oldAppId)
+ throws IOException {
+ ApplicationSubmissionContext appContext =
+ Records.newRecord(ApplicationSubmissionContext.class);
+ appContext.setApplicationId(oldAppId);
+ ContainerLaunchContext amContainer =
+ Records.newRecord(ContainerLaunchContext.class);
+ appContext.setAMContainerSpec(amContainer);
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(10);
+ capability.setVirtualCores(1);
+ appContext.setResource(capability);
+
+ int numRetries = 3;
+ ApplicationId appId = null;
+ while (numRetries-- > 0) {
+ YarnClient client = createAndStartYarnClient(this.conf);
+ try {
+ appId = client.submitApplication(appContext);
+ break;
+ } catch (Exception e) {
+ LOG.error(e);
+ } finally {
+ client.stop();
+ }
+ }
+ if (appId == null) {
+ fail("Client couldn't submit an application to the Active RM");
+ }
+ return appId;
+ }
+
+ private YarnClient createAndStartYarnClient(Configuration conf) {
+ Configuration configuration = new YarnConfiguration(conf);
+ YarnClient client = YarnClient.createYarnClient();
+ client.init(configuration);
+ client.start();
+ return client;
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
index 7c34966..9947ec0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
@@ -46,6 +46,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -117,6 +119,7 @@ public void testSubmitApplication() {
System.currentTimeMillis(), i);
when(context.getApplicationId()).thenReturn(applicationId);
((MockYarnClient) client).setYarnApplicationState(exitStates[i]);
+ ((MockYarnClient) client).setCurrentApplicationId(applicationId);
try {
client.submitApplication(context);
} catch (YarnException e) {
@@ -290,6 +293,17 @@ public void setYarnApplicationState(YarnApplicationState state) {
YarnApplicationState.NEW_SAVING, state);
}
+ public void setCurrentApplicationId(ApplicationId applicationId) {
+ try {
+ when(rmClient.submitApplication(any(SubmitApplicationRequest.class)))
+ .thenReturn(SubmitApplicationResponse.newInstance(applicationId));
+ } catch (YarnException e) {
+ Assert.fail("Exception is not expected.");
+ } catch (IOException e) {
+ Assert.fail("Exception is not expected.");
+ }
+ }
+
public List