diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index c36d0c4..02ac31b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -181,34 +181,43 @@ public YarnClientApplication createApplication() int pollCount = 0; long startTime = System.currentTimeMillis(); + // TODO: revisit after YARN-1410. + // The applicationId might be changed while (true) { - YarnApplicationState state = - getApplicationReport(applicationId).getYarnApplicationState(); - if (!state.equals(YarnApplicationState.NEW) && - !state.equals(YarnApplicationState.NEW_SAVING)) { - LOG.info("Submitted application " + applicationId); - break; - } + try { + YarnApplicationState state = + getApplicationReport(applicationId).getYarnApplicationState(); + if (!state.equals(YarnApplicationState.NEW) && + !state.equals(YarnApplicationState.NEW_SAVING)) { + LOG.info("Submitted application " + applicationId); + break; + } - long elapsedMillis = System.currentTimeMillis() - startTime; - if (enforceAsyncAPITimeout() && - elapsedMillis >= asyncApiPollTimeoutMillis) { - throw new YarnException("Timed out while waiting for application " + - applicationId + " to be submitted successfully"); - } + long elapsedMillis = System.currentTimeMillis() - startTime; + if (enforceAsyncAPITimeout() && + elapsedMillis >= asyncApiPollTimeoutMillis) { + throw new YarnException("Timed out while waiting for application " + + applicationId + " to be submitted successfully"); + } - // Notify the client through the log every 10 poll, in case the client - // is blocked here too long. - if (++pollCount % 10 == 0) { - LOG.info("Application submission is not finished, " + - "submitted application " + applicationId + - " is still in " + state); - } - try { - Thread.sleep(submitPollIntervalMillis); - } catch (InterruptedException ie) { - LOG.error("Interrupted while waiting for application " + applicationId - + " to be successfully submitted."); + // Notify the client through the log every 10 poll, in case the client + // is blocked here too long. + if (++pollCount % 10 == 0) { + LOG.info("Application submission is not finished, " + + "submitted application " + applicationId + + " is still in " + state); + } + try { + Thread.sleep(submitPollIntervalMillis); + } catch (InterruptedException ie) { + LOG.error("Interrupted while waiting for application " + + applicationId + + " to be successfully submitted."); + } + } catch (ApplicationNotFoundException ex) { + // FailOver or RM restart happens before RMStateStore saves + // ApplicationState + rmClient.submitApplication(request); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index c91ce35..d6ffee7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -31,6 +31,8 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; @@ -40,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; @@ -221,13 +224,24 @@ public RMApp submitApp(int masterMemory, String name, String user, public RMApp submitApp(int masterMemory, String name, String user, Map acls, boolean unmanaged, String queue, int maxAppAttempts, Credentials ts, String appType, - boolean waitForAccepted, boolean keepContainers) - throws Exception { - ApplicationClientProtocol client = getClientRMService(); - GetNewApplicationResponse resp = client.getNewApplication(Records - .newRecord(GetNewApplicationRequest.class)); - ApplicationId appId = resp.getApplicationId(); + boolean waitForAccepted, boolean keepContainers) throws Exception { + return submitApp(masterMemory, name, user, acls, unmanaged, queue, + maxAppAttempts, ts, appType, waitForAccepted, keepContainers, + false, null); + } + public RMApp submitApp(int masterMemory, String name, String user, + Map acls, boolean unmanaged, String queue, + int maxAppAttempts, Credentials ts, String appType, + boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, + ApplicationId applicationId) throws Exception { + ApplicationId appId = isAppIdProvided ? applicationId : null; + ApplicationClientProtocol client = getClientRMService(); + if (! isAppIdProvided) { + GetNewApplicationResponse resp = client.getNewApplication(Records + .newRecord(GetNewApplicationRequest.class)); + appId = resp.getApplicationId(); + } SubmitApplicationRequest req = Records .newRecord(SubmitApplicationRequest.class); ApplicationSubmissionContext sub = Records @@ -502,4 +516,12 @@ public static MockAM launchAndRegisterAM(RMApp app, MockRM rm, MockNM nm) return am; } + public ApplicationReport getApplicationReport(ApplicationId appId) + throws YarnException, IOException { + ApplicationClientProtocol client = getClientRMService(); + GetApplicationReportResponse response = + client.getApplicationReport(GetApplicationReportRequest + .newInstance(appId)); + return response.getApplicationReport(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java new file mode 100644 index 0000000..6cbea25 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.ClientBaseWithFixes; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + + +public class RMHATestBase extends ClientBaseWithFixes{ + + private static final int ZK_TIMEOUT_MS = 5000; + private static StateChangeRequestInfo requestInfo = + new StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + protected Configuration configuration = new YarnConfiguration(); + static MockRM rm1 = null; + static MockRM rm2 = null; + Configuration confForRM1; + Configuration confForRM2; + + @Before + public void setup() throws Exception { + configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + configuration.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2"); + configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + configuration.set(YarnConfiguration.RM_STORE, + ZKRMStateStore.class.getName()); + configuration.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort); + configuration.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS); + configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + configuration.set(YarnConfiguration.RM_CLUSTER_ID, "test-yarn-cluster"); + int base = 100; + for (String confKey : YarnConfiguration + .getServiceAddressConfKeys(configuration)) { + configuration.set(HAUtil.addSuffix(confKey, "rm1"), "0.0.0.0:" + + (base + 20)); + configuration.set(HAUtil.addSuffix(confKey, "rm2"), "0.0.0.0:" + + (base + 40)); + base = base * 2; + } + confForRM1 = new Configuration(configuration); + confForRM1.set(YarnConfiguration.RM_HA_ID, "rm1"); + confForRM2 = new Configuration(configuration); + confForRM2.set(YarnConfiguration.RM_HA_ID, "rm2"); + } + + @After + public void teardown() { + if (rm1 != null) { + rm1.stop(); + } + if (rm2 != null) { + rm2.stop(); + } + } + + protected MockAM launchAM(RMApp app, MockRM rm, MockNM nm) + throws Exception { + RMAppAttempt attempt = app.getCurrentAppAttempt(); + nm.nodeHeartbeat(true); + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + am.registerAppAttempt(); + rm.waitForState(app.getApplicationId(), RMAppState.RUNNING); + rm.waitForState(app.getCurrentAppAttempt().getAppAttemptId(), + RMAppAttemptState.RUNNING); + return am; + } + + protected void startRMs() throws IOException { + rm1 = new MockRM(confForRM1); + rm2 = new MockRM(confForRM2); + startRMs(rm1, confForRM1, rm2, confForRM2); + + } + + protected void startRMsWithCustomizedRMAppManager() throws IOException { + final Configuration conf1 = new Configuration(confForRM1); + + rm1 = new MockRM(conf1) { + @Override + protected RMAppManager createRMAppManager() { + return new MyRMAppManager(this.rmContext, this.scheduler, + this.masterService, this.applicationACLsManager, conf1); + } + }; + + rm2 = new MockRM(confForRM2); + + startRMs(rm1, conf1, rm2, confForRM2); + } + + private static class MyRMAppManager extends RMAppManager { + + private Configuration conf; + private RMContext rmContext; + + public MyRMAppManager(RMContext context, YarnScheduler scheduler, + ApplicationMasterService masterService, + ApplicationACLsManager applicationACLsManager, Configuration conf) { + super(context, scheduler, masterService, applicationACLsManager, conf); + this.conf = conf; + this.rmContext = context; + } + + @Override + protected void submitApplication( + ApplicationSubmissionContext submissionContext, long submitTime, + String user, boolean isRecovered, RMState state) throws YarnException { + //Do nothing, just add the application to RMContext + RMAppImpl application = + new RMAppImpl(submissionContext.getApplicationId(), this.rmContext, + this.conf, submissionContext.getApplicationName(), user, + submissionContext.getQueue(), submissionContext, + this.rmContext.getScheduler(), + this.rmContext.getApplicationMasterService(), + submitTime, submissionContext.getApplicationType(), + submissionContext.getApplicationTags()); + this.rmContext.getRMApps().put(submissionContext.getApplicationId(), + application); + //Do not send RMAppEventType.START event + //so the state of Application will not reach to NEW_SAVING state. + } + } + + protected boolean isFinalState(RMAppState state) { + return state.equals(RMAppState.FINISHING) + || state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED) + || state.equals(RMAppState.KILLED); + } + + protected void explicitFailover() throws IOException { + rm1.adminService.transitionToStandby(requestInfo); + rm2.adminService.transitionToActive(requestInfo); + Assert.assertTrue(rm1.getRMContext().getHAServiceState() + == HAServiceState.STANDBY); + Assert.assertTrue(rm2.getRMContext().getHAServiceState() + == HAServiceState.ACTIVE); + } + + protected void startRMs(MockRM rm1, Configuration confForRM1, MockRM rm2, + Configuration confForRM2) throws IOException { + rm1.init(confForRM1); + rm1.start(); + Assert.assertTrue(rm1.getRMContext().getHAServiceState() + == HAServiceState.STANDBY); + + rm2.init(confForRM2); + rm2.start(); + Assert.assertTrue(rm2.getRMContext().getHAServiceState() + == HAServiceState.STANDBY); + + rm1.adminService.transitionToActive(requestInfo); + Assert.assertTrue(rm1.getRMContext().getHAServiceState() + == HAServiceState.ACTIVE); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestKillApplicationWithRMHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestKillApplicationWithRMHA.java index 918de80..c0a623d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestKillApplicationWithRMHA.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestKillApplicationWithRMHA.java @@ -24,86 +24,29 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ha.ClientBaseWithFixes; -import org.apache.hadoop.ha.HAServiceProtocol; -import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; -import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; -public class TestKillApplicationWithRMHA extends ClientBaseWithFixes{ +public class TestKillApplicationWithRMHA extends RMHATestBase{ public static final Log LOG = LogFactory .getLog(TestKillApplicationWithRMHA.class); - private static final int ZK_TIMEOUT_MS = 5000; - private static StateChangeRequestInfo requestInfo = - new StateChangeRequestInfo( - HAServiceProtocol.RequestSource.REQUEST_BY_USER); - private Configuration configuration = new YarnConfiguration(); - static MockRM rm1 = null; - static MockRM rm2 = null; - Configuration confForRM1; - Configuration confForRM2; - - @Before - public void setup() throws Exception { - configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); - configuration.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2"); - configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); - configuration.set(YarnConfiguration.RM_STORE, - ZKRMStateStore.class.getName()); - configuration.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort); - configuration.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS); - configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); - configuration.set(YarnConfiguration.RM_CLUSTER_ID, "test-yarn-cluster"); - int base = 100; - for (String confKey : YarnConfiguration - .getServiceAddressConfKeys(configuration)) { - configuration.set(HAUtil.addSuffix(confKey, "rm1"), "0.0.0.0:" - + (base + 20)); - configuration.set(HAUtil.addSuffix(confKey, "rm2"), "0.0.0.0:" - + (base + 40)); - base = base * 2; - } - confForRM1 = new Configuration(configuration); - confForRM1.set(YarnConfiguration.RM_HA_ID, "rm1"); - confForRM2 = new Configuration(configuration); - confForRM2.set(YarnConfiguration.RM_HA_ID, "rm2"); - } - - @After - public void teardown() { - if (rm1 != null) { - rm1.stop(); - } - if (rm2 != null) { - rm2.stop(); - } - } @Test (timeout = 20000) public void testKillAppWhenFailoverHappensAtNewState() @@ -221,18 +164,6 @@ public void testKillAppWhenFailOverHappensDuringApplicationKill() } - private MockAM launchAM(RMApp app, MockRM rm, MockNM nm) - throws Exception { - RMAppAttempt attempt = app.getCurrentAppAttempt(); - nm.nodeHeartbeat(true); - MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); - am.registerAppAttempt(); - rm.waitForState(app.getApplicationId(), RMAppState.RUNNING); - rm.waitForState(app.getCurrentAppAttempt().getAppAttemptId(), - RMAppAttemptState.RUNNING); - return am; - } - private void failOverAndKillApp(ApplicationId appId, ApplicationAttemptId appAttemptId, RMAppState initialRMAppState, RMAppAttemptState initialRMAppAttemptState, @@ -256,29 +187,6 @@ private void failOverAndKillApp(ApplicationId appId, killApplication(rm2, appId, null, initialRMAppState); } - private void startRMs() throws IOException { - rm1 = new MockRM(confForRM1); - rm2 = new MockRM(confForRM2); - startRMs(rm1, confForRM1, rm2, confForRM2); - - } - - private void startRMsWithCustomizedRMAppManager() throws IOException { - final Configuration conf1 = new Configuration(confForRM1); - - rm1 = new MockRM(conf1) { - @Override - protected RMAppManager createRMAppManager() { - return new MyRMAppManager(this.rmContext, this.scheduler, - this.masterService, this.applicationACLsManager, conf1); - } - }; - - rm2 = new MockRM(confForRM2); - - startRMs(rm1, conf1, rm2, confForRM2); - } - private void startRMsWithCustomizedClientRMService() throws IOException { final Configuration conf1 = new Configuration(confForRM1); @@ -296,39 +204,6 @@ protected ClientRMService createClientRMService() { startRMs(rm1, conf1, rm2, confForRM2); } - private static class MyRMAppManager extends RMAppManager { - - private Configuration conf; - private RMContext rmContext; - - public MyRMAppManager(RMContext context, YarnScheduler scheduler, - ApplicationMasterService masterService, - ApplicationACLsManager applicationACLsManager, Configuration conf) { - super(context, scheduler, masterService, applicationACLsManager, conf); - this.conf = conf; - this.rmContext = context; - } - - @Override - protected void submitApplication( - ApplicationSubmissionContext submissionContext, long submitTime, - String user, boolean isRecovered, RMState state) throws YarnException { - //Do nothing, just add the application to RMContext - RMAppImpl application = - new RMAppImpl(submissionContext.getApplicationId(), this.rmContext, - this.conf, submissionContext.getApplicationName(), user, - submissionContext.getQueue(), submissionContext, - this.rmContext.getScheduler(), - this.rmContext.getApplicationMasterService(), - submitTime, submissionContext.getApplicationType(), - submissionContext.getApplicationTags()); - this.rmContext.getRMApps().put(submissionContext.getApplicationId(), - application); - //Do not send RMAppEventType.START event - //so the state of Application will not reach to NEW_SAVING state. - } - } - private static class MyClientRMService extends ClientRMService { private RMContext rmContext; @@ -366,21 +241,6 @@ public KillApplicationResponse forceKillApplication( } } - private boolean isFinalState(RMAppState state) { - return state.equals(RMAppState.FINISHING) - || state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED) - || state.equals(RMAppState.KILLED); - } - - private void explicitFailover() throws IOException { - rm1.adminService.transitionToStandby(requestInfo); - rm2.adminService.transitionToActive(requestInfo); - Assert.assertTrue(rm1.getRMContext().getHAServiceState() - == HAServiceState.STANDBY); - Assert.assertTrue(rm2.getRMContext().getHAServiceState() - == HAServiceState.ACTIVE); - } - private void killApplication(MockRM rm, ApplicationId appId, ApplicationAttemptId appAttemptId, RMAppState rmAppState) throws Exception { @@ -396,21 +256,4 @@ private void killApplication(MockRM rm, ApplicationId appId, // no new attempt is created. Assert.assertEquals(1, loadedApp0.getAppAttempts().size()); } - - private void startRMs(MockRM rm1, Configuration confForRM1, MockRM rm2, - Configuration confForRM2) throws IOException { - rm1.init(confForRM1); - rm1.start(); - Assert.assertTrue(rm1.getRMContext().getHAServiceState() - == HAServiceState.STANDBY); - - rm2.init(confForRM2); - rm2.start(); - Assert.assertTrue(rm2.getRMContext().getHAServiceState() - == HAServiceState.STANDBY); - - rm1.adminService.transitionToActive(requestInfo); - Assert.assertTrue(rm1.getRMContext().getHAServiceState() - == HAServiceState.ACTIVE); - } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java new file mode 100644 index 0000000..a1f9407 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.junit.Test; + + +public class TestSubmitApplicationWithRMHA extends RMHATestBase{ + + public static final Log LOG = LogFactory + .getLog(TestSubmitApplicationWithRMHA.class); + + + // There are two scenarios when RM failover happens + // after SubmitApplication Call: + // 1) RMStateStore already saved the ApplicationState when failover happens + // 2) RMStateStore did not save the ApplicationState when failover happens + + @Test + public void + testHandleRMHAafterSubmitApplicationCallWithSavedApplicationState() + throws Exception { + // Test scenario 1 when RM failover happens after SubmitApplication Call: + // RMStateStore already saved the ApplicationState when failover happens + startRMs(); + + // Submit Application + // After submission, the applicationState will be saved in RMStateStore. + RMApp app0 = rm1.submitApp(200); + + // Do the failover + explicitFailover(); + + // Since the applicationState has already been saved in RMStateStore + // before failover happens, the current active rm can load the previous + // applicationState. + ApplicationReport appReport = + rm2.getApplicationReport(app0.getApplicationId()); + + // verify previous submission is successful. + Assert.assertFalse(appReport.getYarnApplicationState() + == YarnApplicationState.NEW); + Assert.assertFalse(appReport.getYarnApplicationState() + == YarnApplicationState.NEW_SAVING); + } + + @Test + public void + testHandleRMHAafterSubmitApplicationCallWithoutSavedApplicationState() + throws Exception { + // Test scenario 2 when RM failover happens after SubmitApplication Call: + // RMStateStore did not save the ApplicationState when failover happens. + // Using customized RMAppManager. + startRMsWithCustomizedRMAppManager(); + + // Submit Application + // After submission, the applicationState will not be saved in RMStateStore + RMApp app0 = + rm1.submitApp(200, "", UserGroupInformation + .getCurrentUser().getShortUserName(), null, false, null, + configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, + false, false); + + // Do the failover + explicitFailover(); + + // Since the applicationState is not saved in RMStateStore + // when failover happens. The current active RM can not load + // previous applicationState. + // Expect ApplicationNotFoundException by calling getApplicationReport(). + try { + rm2.getApplicationReport(app0.getApplicationId()); + Assert.fail("Should get ApplicationNotFoundException here"); + } catch (ApplicationNotFoundException ex) { + // expected ApplicationNotFoundException + } + + // Submit the application with previous ApplicationId from current active RM + // This will mimic the similar behavior of YarnClient which will re-submit + // Application with previous applicationId + // when catches the ApplicationNotFoundException + RMApp app1 = + rm2.submitApp(200, "", UserGroupInformation + .getCurrentUser().getShortUserName(), null, false, null, + configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, + false, false, true, app0.getApplicationId()); + + int maxWaittingTimes = 20; + int count = 0; + while (true) { + YarnApplicationState state = + rm2.getApplicationReport(app1.getApplicationId()) + .getYarnApplicationState(); + if (!state.equals(YarnApplicationState.NEW) && + !state.equals(YarnApplicationState.NEW_SAVING)) { + break; + } + if (count > maxWaittingTimes) { + break; + } + Thread.sleep(200); + count ++; + } + + // Verify submittion is successful + Assert.assertFalse(rm2.getApplicationReport(app1.getApplicationId()) + .getYarnApplicationState() == YarnApplicationState.NEW); + Assert.assertFalse(rm2.getApplicationReport(app1.getApplicationId()) + .getYarnApplicationState() == YarnApplicationState.NEW_SAVING); + Assert.assertEquals(app0.getApplicationId(), app1.getApplicationId()); + } +}