diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 7855042..983bfcf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -425,4 +425,9 @@ public void handle(RMAppManagerEvent event) { LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } } + + @VisibleForTesting + public RMContext getRMContext() { + return this.rmContext; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 63efe8f..50c9ad5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -289,6 +289,73 @@ public SubmitApplicationResponse run() { return getRMContext().getRMApps().get(appId); } + public RMApp submitAppWithoutWaitForAccepted(int masterMemory, String name, + String user, Map acls, boolean unmanaged, + String queue, int maxAppAttempts, Credentials ts, String appType, + boolean waitForAccepted, boolean keepContainers) + throws Exception { + ApplicationClientProtocol client = getClientRMService(); + GetNewApplicationResponse resp = client.getNewApplication(Records + .newRecord(GetNewApplicationRequest.class)); + ApplicationId appId = resp.getApplicationId(); + + SubmitApplicationRequest req = Records + .newRecord(SubmitApplicationRequest.class); + ApplicationSubmissionContext sub = Records + .newRecord(ApplicationSubmissionContext.class); + sub.setKeepContainersAcrossApplicationAttempts(keepContainers); + sub.setApplicationId(appId); + sub.setApplicationName(name); + sub.setMaxAppAttempts(maxAppAttempts); + if(unmanaged) { + sub.setUnmanagedAM(true); + } + if (queue != null) { + sub.setQueue(queue); + } + sub.setApplicationType(appType); + ContainerLaunchContext clc = Records + .newRecord(ContainerLaunchContext.class); + final Resource capability = Records.newRecord(Resource.class); + capability.setMemory(masterMemory); + sub.setResource(capability); + clc.setApplicationACLs(acls); + if (ts != null && UserGroupInformation.isSecurityEnabled()) { + DataOutputBuffer dob = new DataOutputBuffer(); + ts.writeTokenStorageToStream(dob); + ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + clc.setTokens(securityTokens); + } + sub.setAMContainerSpec(clc); + req.setApplicationSubmissionContext(sub); + UserGroupInformation fakeUser = + UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"}); + PrivilegedAction action = + new PrivilegedAction() { + ApplicationClientProtocol client; + SubmitApplicationRequest req; + @Override + public SubmitApplicationResponse run() { + try { + return client.submitApplication(req); + } catch (YarnException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + PrivilegedAction setClientReq( + ApplicationClientProtocol client, SubmitApplicationRequest req) { + this.client = client; + this.req = req; + return this; + } + }.setClientReq(client, req); + fakeUser.doAs(action); + return getRMContext().getRMApps().get(appId); + } + public MockNM registerNode(String nodeIdStr, int memory) throws Exception { MockNM nm = new MockNM(nodeIdStr, memory, getResourceTrackerService()); nm.registerNode(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestKillApplicationWithRMHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestKillApplicationWithRMHA.java new file mode 100644 index 0000000..7aaa618 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestKillApplicationWithRMHA.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import static org.junit.Assert.fail; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.ClientBaseWithFixes; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + + +public class TestKillApplicationWithRMHA extends ClientBaseWithFixes{ + + public static final Log LOG = LogFactory + .getLog(TestKillApplicationWithRMHA.class); + private static final int ZK_TIMEOUT_MS = 1000; + private static StateChangeRequestInfo requestInfo = + new StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + private Configuration configuration = new YarnConfiguration(); + static MockRM rm1 = null; + static MockRM rm2 = null; + + @Before + public void setup() throws Exception { + configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + configuration.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2"); + configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + configuration.set(YarnConfiguration.RM_STORE, + ZKRMStateStore.class.getName()); + configuration.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort); + configuration.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS); + configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + configuration.set(YarnConfiguration.RM_CLUSTER_ID, "test-yarn-cluster"); + int base = 100; + for (String confKey : YarnConfiguration + .getServiceAddressConfKeys(configuration)) { + configuration.set(HAUtil.addSuffix(confKey, "rm1"), "0.0.0.0:" + + (base + 20)); + configuration.set(HAUtil.addSuffix(confKey, "rm2"), "0.0.0.0:" + + (base + 40)); + base = base * 2; + } + Configuration conf1 = new Configuration(configuration); + conf1.set(YarnConfiguration.RM_HA_ID, "rm1"); + Configuration conf2 = new Configuration(configuration); + conf2.set(YarnConfiguration.RM_HA_ID, "rm2"); + } + + @After + public void teardown() { + if (rm1 != null) { + rm1.stop(); + } + if (rm2 != null) { + rm2.stop(); + } + } + + @Test + public void testRMHAKillAppWhenFailOverHappensBeforeNewSavingState() + throws Exception { + startRMsWithCustomizedRMAppManager(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app0 = + rm1.submitAppWithoutWaitForAccepted(200, "", UserGroupInformation + .getCurrentUser().getShortUserName(), null, false, null, + configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, + true, false); + // failover and kill application + try { + failOverAndKillAppWithoutAttempt(app0.getApplicationId()); + fail("Should get an exception here"); + } catch (ApplicationNotFoundException ex) { + Assert.assertTrue(ex.getMessage().contains( + "Trying to kill an absent application " + app0.getApplicationId())); + } + } + + @Test + public void testRMHAKillAppWhenFailoverHappensBeforeFinalState() + throws Exception { + startRMs(); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, + rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + + // failover and kill application + failOverAndKillApp(app0.getApplicationId(), + am0.getApplicationAttemptId()); + } + + @Test + public void testRMHAKillAppWhenFailoverHappensAtFinalState() + throws Exception { + startRMs(); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, + rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + + // kill the app. + rm1.killApp(app0.getApplicationId()); + rm1.waitForState(app0.getApplicationId(), RMAppState.KILLED); + rm1.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.KILLED); + + // failover and kill application + failOverAndKillApp(app0.getApplicationId(), + am0.getApplicationAttemptId()); + } + + private MockAM launchAM(RMApp app, MockRM rm, MockNM nm) + throws Exception { + RMAppAttempt attempt = app.getCurrentAppAttempt(); + nm.nodeHeartbeat(true); + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + am.registerAppAttempt(); + rm.waitForState(app.getApplicationId(), RMAppState.RUNNING); + return am; + } + + private void failOverAndKillApp(ApplicationId appId, + ApplicationAttemptId appAttemptId) throws Exception { + rm1.adminService.transitionToStandby(requestInfo); + rm2.adminService.transitionToActive(requestInfo); + Assert.assertTrue(rm1.getRMContext().getHAServiceState() + == HAServiceState.STANDBY); + Assert.assertTrue(rm2.getRMContext().getHAServiceState() + == HAServiceState.ACTIVE); + rm2.killApp(appId); + RMApp loadedApp0 = + rm2.getRMContext().getRMApps().get(appId); + rm2.waitForState(appId, RMAppState.KILLED); + rm2.waitForState(appAttemptId, RMAppAttemptState.KILLED); + // no new attempt is created. + Assert.assertEquals(1, loadedApp0.getAppAttempts().size()); + } + + private void failOverAndKillAppWithoutAttempt(ApplicationId appId) + throws Exception { + rm1.adminService.transitionToStandby(requestInfo); + rm2.adminService.transitionToActive(requestInfo); + Assert.assertTrue(rm1.getRMContext().getHAServiceState() + == HAServiceState.STANDBY); + Assert.assertTrue(rm2.getRMContext().getHAServiceState() + == HAServiceState.ACTIVE); + rm2.killApp(appId); + RMApp loadedApp0 = + rm2.getRMContext().getRMApps().get(appId); + rm2.waitForState(appId, RMAppState.KILLED); + // no new attempt is created. + Assert.assertEquals(1, loadedApp0.getAppAttempts().size()); + } + + private void startRMs() throws IOException { + Configuration conf1 = new Configuration(configuration); + conf1.set(YarnConfiguration.RM_HA_ID, "rm1"); + Configuration conf2 = new Configuration(configuration); + conf2.set(YarnConfiguration.RM_HA_ID, "rm2"); + + rm1 = new MockRM(conf1); + rm1.init(conf1); + rm1.start(); + Assert.assertTrue(rm1.getRMContext().getHAServiceState() + == HAServiceState.STANDBY); + + rm2 = new MockRM(conf2); + rm2.init(conf1); + rm2.start(); + Assert.assertTrue(rm2.getRMContext().getHAServiceState() + == HAServiceState.STANDBY); + + rm1.adminService.transitionToActive(requestInfo); + Assert.assertTrue(rm1.getRMContext().getHAServiceState() + == HAServiceState.ACTIVE); + } + + private void startRMsWithCustomizedRMAppManager() throws IOException { + final Configuration conf1 = new Configuration(configuration); + conf1.set(YarnConfiguration.RM_HA_ID, "rm1"); + Configuration conf2 = new Configuration(configuration); + conf2.set(YarnConfiguration.RM_HA_ID, "rm2"); + + rm1 = new MockRM(conf1) { + @Override + protected RMAppManager createRMAppManager() { + return new MyRMAppManager(this.rmContext, this.scheduler, + this.masterService, this.applicationACLsManager, conf1); + } + }; + rm1.init(conf1); + rm1.start(); + Assert.assertTrue(rm1.getRMContext().getHAServiceState() + == HAServiceState.STANDBY); + + rm2 = new MockRM(conf2); + rm2.init(conf1); + rm2.start(); + Assert.assertTrue(rm2.getRMContext().getHAServiceState() + == HAServiceState.STANDBY); + + rm1.adminService.transitionToActive(requestInfo); + Assert.assertTrue(rm1.getRMContext().getHAServiceState() + == HAServiceState.ACTIVE); + } + + private static class MyRMAppManager extends RMAppManager { + + private Configuration conf; + public MyRMAppManager(RMContext context, YarnScheduler scheduler, + ApplicationMasterService masterService, + ApplicationACLsManager applicationACLsManager, Configuration conf) { + super(context, scheduler, masterService, applicationACLsManager, conf); + this.conf = conf; + } + + @Override + protected void submitApplication( + ApplicationSubmissionContext submissionContext, long submitTime, + String user, boolean isRecovered, RMState state) throws YarnException { + //Do nothing, just add the application to RMContext + RMContext rmContext = getRMContext(); + RMAppImpl application = + new RMAppImpl(submissionContext.getApplicationId(), rmContext, + this.conf, submissionContext.getApplicationName(), user, + submissionContext.getQueue(), submissionContext, + rmContext.getScheduler(), + rmContext.getApplicationMasterService(), + submitTime, submissionContext.getApplicationType(), + submissionContext.getApplicationTags()); + rmContext.getRMApps().put(submissionContext.getApplicationId(), + application); + //Do not send RMAppEventType.START event + //so the state of Application will not reach to NEW_SAVING state. + } + } +}