diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 03039daae96..ef854088025 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -1508,7 +1508,8 @@ public void transition(RMAppAttemptImpl appAttempt, && !appAttempt.submissionContext.getUnmanagedAM()) { int numberOfFailure = ((RMAppImpl)appAttempt.rmApp) .getNumFailedAppAttempts(); - if (numberOfFailure < appAttempt.rmApp.getMaxAppAttempts()) { + if (appAttempt.rmApp.getMaxAppAttempts() > 1 + && numberOfFailure < appAttempt.rmApp.getMaxAppAttempts()) { keepContainersAcrossAppAttempts = true; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 9f122cb34bd..1e117154909 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -60,6 +60,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; @@ -1167,4 +1171,72 @@ public void testContainersFromPreviousAttemptsWithRMRestart() rm2.stop(); rm1.stop(); } + + // Test to verify that there is no queue resource leak after app fail. + // + // 1. Submit an app which is configured to keep containers across app + // attempts and should fail after AM finished (am-max-attempts=1). + // 2. App is started with 2 containers running on NM1 node. + // 3. Preempt the AM of the application which should not count towards max + // attempt retry but app will fail immediately. + // 4. Verify that the used resource of queue should be cleared normally after + // app fail. + @Test(timeout = 30000) + public void testQueueResourceLeakForCapacityScheduler() throws Exception { + getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + getConf() + .set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + MockRM rm1 = new MockRM(getConf()); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200, 0, true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + allocateContainers(nm1, am1, 1); + + // launch the 2nd container, for testing running container transferred. + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING); + ContainerId containerId2 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); + + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm1.getResourceScheduler(); + ContainerId amContainer = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); + // Preempt AM container + scheduler.killContainer(scheduler.getRMContainer(amContainer)); + + rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); + TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler, + am1.getApplicationAttemptId()); + + Assert.assertTrue(!attempt1.shouldCountTowardsMaxAttemptRetry()); + + // AM should not be restarted. + rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED); + + // After app1 failed, used resource of this queue should + // be clean, otherwise resource leak happened. + if (getSchedulerType() == SchedulerType.CAPACITY) { + LeafQueue queue = + (LeafQueue) ((CapacityScheduler) scheduler).getQueue("default"); + Assert.assertEquals(0, + queue.getQueueResourceUsage().getUsed().getMemorySize()); + Assert.assertEquals(0, + queue.getQueueResourceUsage().getUsed().getVirtualCores()); + } else if (getSchedulerType() == SchedulerType.FAIR) { + FSLeafQueue queue = ((FairScheduler) scheduler).getQueueManager() + .getLeafQueue("root.default", false); + Assert + .assertEquals(0, queue.getGuaranteedResourceUsage().getMemorySize()); + Assert.assertEquals(0, + queue.getGuaranteedResourceUsage().getVirtualCores()); + } + + rm1.stop(); + } }