diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java index 864980b..8f1f81b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.io.retry.Idempotent; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; @@ -121,6 +122,7 @@ public GetNewApplicationResponse getNewApplication( */ @Public @Stable + @Idempotent public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnException, IOException; @@ -184,6 +186,7 @@ public KillApplicationResponse forceKillApplication( */ @Public @Stable + @Idempotent public GetApplicationReportResponse getApplicationReport( GetApplicationReportRequest request) throws YarnException, IOException; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationIdNotAssignedByCurrentRMException.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationIdNotAssignedByCurrentRMException.java new file mode 100644 index 0000000..710a1ca --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationIdNotAssignedByCurrentRMException.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.exceptions; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Exception to be thrown when Client submit an application with + * the applicationId which is not assigned by current RM. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ApplicationIdNotAssignedByCurrentRMException extends YarnException { + + private static final long serialVersionUID = 28694408L; + + public ApplicationIdNotAssignedByCurrentRMException(Throwable cause) { + super(cause); + } + + public ApplicationIdNotAssignedByCurrentRMException(String message) { + super(message); + } + + public ApplicationIdNotAssignedByCurrentRMException(String message, + Throwable cause) { + super(message, cause); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 2257c42..06846a4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -629,7 +629,7 @@ public boolean run() throws IOException, YarnException { // or an exception thrown to denote some form of a failure LOG.info("Submitting application to ASM"); - yarnClient.submitApplication(appContext); + appId = yarnClient.submitApplication(appContext); // TODO // Try submitting the same request again diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java index 155ba5d..094c395 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java @@ -68,7 +68,12 @@ protected YarnClient(String name) { * {@link org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse} * objects. *

- * + *

+ * Do not use {@link ApplicationId} from {@link YarnClientApplication} + * to monitor the application. + * The applicationId might be changed after application is submitted to + * and accepted by RM. + *

* @return {@link YarnClientApplication} built for a new application * @throws YarnException * @throws IOException @@ -82,7 +87,9 @@ public abstract YarnClientApplication createApplication() * that it will not return {@link ApplicationId} until the submitted * application has been submitted and accepted by the ResourceManager. *

- * + *

+ * Use the return {@link ApplicationId} to monitor the Application + *

* @param appContext * {@link ApplicationSubmissionContext} containing all the details * needed to submit a new application diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index a5ff9f6..1f5fec1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationIdNotAssignedByCurrentRMException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -144,13 +145,23 @@ public YarnClientApplication createApplication() public ApplicationId submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException { + boolean appSubmitComplete = false; ApplicationId applicationId = appContext.getApplicationId(); appContext.setApplicationId(applicationId); SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext); - rmClient.submitApplication(request); - + while (! appSubmitComplete) { + try { + rmClient.submitApplication(request); + appSubmitComplete = true; + } catch (ApplicationIdNotAssignedByCurrentRMException ex) { + GetNewApplicationResponse newApp = getNewApplication(); + applicationId = newApp.getApplicationId(); + appContext.setApplicationId(applicationId); + request.setApplicationSubmissionContext(appContext); + } + } int pollCount = 0; while (true) { YarnApplicationState state = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java index fed26d7..7880bd3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java @@ -29,12 +29,19 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.service.Service.STATE; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; +import org.apache.hadoop.yarn.util.Records; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -50,9 +57,10 @@ private static final HAServiceProtocol.StateChangeRequestInfo req = new HAServiceProtocol.StateChangeRequestInfo( HAServiceProtocol.RequestSource.REQUEST_BY_USER_FORCED); - + private static final int maxWaitTimes = 3; private static Configuration conf; private static MiniYARNCluster cluster; + private static YarnClient client; private static void setConfForRM(String rmId, String prefix, String value) { conf.set(HAUtil.addSuffix(prefix, rmId), value); @@ -93,7 +101,18 @@ public static void setup() throws IOException { cluster = new MiniYARNCluster(TestRMFailover.class.getName(), 2, 1, 1, 1); cluster.init(conf); cluster.start(); - + int numTries = maxWaitTimes; + while ((cluster.getServiceState() != STATE.STARTED) && (numTries > 0) ) { + numTries --; + try { + Thread.sleep(500); + } catch (Exception e) { + // Do nothing + } + } + client = YarnClient.createYarnClient(); + client.init(conf); + client.start(); cluster.getResourceManager(0).getRMContext().getRMAdminService() .transitionToActive(req); assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); @@ -102,22 +121,17 @@ public static void setup() throws IOException { @AfterClass public static void teardown() { cluster.stop(); + client.stop(); } private void verifyClientConnection() { - int numRetries = 3; + int numRetries = maxWaitTimes; while(numRetries-- > 0) { - Configuration conf = new YarnConfiguration(TestRMFailover.conf); - YarnClient client = YarnClient.createYarnClient(); - client.init(conf); - client.start(); try { client.getApplications(); return; } catch (Exception e) { LOG.error(e); - } finally { - client.stop(); } } fail("Client couldn't connect to the Active RM"); @@ -150,4 +164,44 @@ public void testExplicitFailover() cluster.waitForNodeManagersToConnect(5000)); verifyClientConnection(); } + + @Test + public void testApplicationSubmission() throws YarnException, + InterruptedException, IOException { + assertTrue("NMs failed to connect to the RM", + cluster.waitForNodeManagersToConnect(5000)); + + try { + YarnClientApplication app = client.createApplication(); + ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); + ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); + appContext.setAMContainerSpec(amContainer); + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(10); + capability.setVirtualCores(1); + appContext.setResource(capability); + ApplicationId oldAppId = appContext.getApplicationId(); + assertTrue(oldAppId != null); + + // Failover to the second RM + getRMAdminService(0).transitionToStandby(req); + getRMAdminService(1).transitionToActive(req); + assertEquals("Wrong ResourceManager is active", + HAServiceProtocol.HAServiceState.ACTIVE, + getRMAdminService(1).getServiceStatus().getState()); + assertTrue("NMs failed to connect to the RM", + cluster.waitForNodeManagersToConnect(5000)); + + // Submit the Application + ApplicationId newAppId = client.submitApplication(appContext); + assertTrue(newAppId != null); + + assertTrue(newAppId != oldAppId); + assertTrue(newAppId.getClusterTimestamp() != oldAppId + .getClusterTimestamp()); + } catch (Exception e) { + LOG.error(e); + fail("Client couldn't submit application to the Active RM"); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index cd2226f..b17d454 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationIdNotAssignedByCurrentRMException; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -288,6 +289,12 @@ public SubmitApplicationResponse submitApplication( throw RPCUtil.getRemoteException(ie); } + if (submissionContext.getApplicationId().getClusterTimestamp() != + ResourceManager.getClusterTimeStamp()) { + throw new ApplicationIdNotAssignedByCurrentRMException( + "The applicationId " + submissionContext.getApplicationId() + + " is not assigned by current RM. "); + } // Though duplication will checked again when app is put into rmContext, // but it is good to fail the invalid submission as early as possible. if (rmContext.getRMApps().get(applicationId) != null) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index ca6dc3e..4be4336 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -633,7 +633,7 @@ private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext) } private static ApplicationId getApplicationId(int id) { - return ApplicationId.newInstance(123456, id); + return ApplicationId.newInstance(ResourceManager.getClusterTimeStamp(), id); } private static ApplicationAttemptId getApplicationAttemptId(int id) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index b09ed0b..81d6f32 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -185,7 +185,7 @@ public void testFencing() throws Exception { Map mockMap = mock(Map.class); ApplicationSubmissionContext asc = ApplicationSubmissionContext.newInstance( - ApplicationId.newInstance(1000, 1), + ApplicationId.newInstance(rm1.getClusterTimeStamp(), 1), "testApplication", // app Name "default", // queue name Priority.newInstance(0), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index a6ad9b6..6c20604 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -747,7 +747,7 @@ public void testAppSubmissionWithInvalidDelegationToken() throws Exception { new HashMap()); ApplicationSubmissionContext appSubContext = ApplicationSubmissionContext.newInstance( - ApplicationId.newInstance(1234121, 0), + ApplicationId.newInstance(rm.getClusterTimeStamp(), 0), "BOGUS", "default", Priority.UNDEFINED, amContainer, false, true, 1, Resource.newInstance(1024, 1), "BOGUS"); SubmitApplicationRequest request =