diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index f2da1fe..bc83286 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -1360,6 +1360,9 @@ public long getMaximumApplicationLifetime(String queueName) { return -1; } + @VisibleForTesting + public abstract void killContainer(RMContainer container); + /** * Update internal state of the scheduler. This can be useful for scheduler * implementations that maintain some state that needs to be periodically diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index d91aa55..871b8c0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1759,6 +1759,12 @@ public void markContainerForPreemption(ApplicationAttemptId aid, } } + @VisibleForTesting + @Override + public void killContainer(RMContainer container) { + markContainerForKillable(container); + } + public void markContainerForKillable( RMContainer killableContainer) { try { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 37f583e..178087a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -783,6 +783,17 @@ public Resource getNormalizedResource(Resource requestedResource) { incrAllocation); } + @VisibleForTesting + @Override + public void killContainer(RMContainer container) { + ContainerStatus status = SchedulerUtils.createPreemptedContainerStatus( + container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); + + LOG.info("Killing container " + container); + completedContainer( + container, status, RMContainerEventType.KILL); + } + @Override public Allocation allocate(ApplicationAttemptId appAttemptId, List ask, List release, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 185d426..1fa7e2f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -975,6 +975,16 @@ protected synchronized void nodeUpdate(RMNode nm) { updateAvailableResourcesMetrics(); } + @VisibleForTesting + @Override + public void killContainer(RMContainer container) { + completedContainer(container, + SchedulerUtils.createAbnormalContainerStatus( + container.getContainerId(), + SchedulerUtils.PREEMPTED_CONTAINER), + RMContainerEventType.KILL); + } + @Override public synchronized void recoverContainersOnNode( List containerReports, RMNode nm) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 9d0d879..c43069b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -51,10 +51,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; @@ -376,8 +374,6 @@ public void testNMTokensRebindOnAMRestart() throws Exception { @Test(timeout = 100000) public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { YarnConfiguration conf = new YarnConfiguration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); @@ -389,12 +385,12 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { RMApp app1 = rm1.submitApp(200); RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - CapacityScheduler scheduler = - (CapacityScheduler) rm1.getResourceScheduler(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm1.getResourceScheduler(); ContainerId amContainer = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); // Preempt the next attempt; - scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer)); + scheduler.killContainer(scheduler.getRMContainer(amContainer)); rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler, @@ -414,7 +410,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { // Preempt the second attempt. ContainerId amContainer2 = ContainerId.newContainerId(am2.getApplicationAttemptId(), 1); - scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer2)); + scheduler.killContainer(scheduler.getRMContainer(amContainer2)); rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED); TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler, @@ -503,8 +499,6 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { @Test(timeout = 100000) public void testMaxAttemptOneMeansOne() throws Exception { YarnConfiguration conf = new YarnConfiguration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); @@ -516,12 +510,12 @@ public void testMaxAttemptOneMeansOne() throws Exception { RMApp app1 = rm1.submitApp(200); RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - CapacityScheduler scheduler = - (CapacityScheduler) rm1.getResourceScheduler(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm1.getResourceScheduler(); ContainerId amContainer = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); // Preempt the attempt; - scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer)); + scheduler.killContainer(scheduler.getRMContainer(amContainer)); rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler, @@ -539,8 +533,6 @@ public void testMaxAttemptOneMeansOne() throws Exception { @Test(timeout = 60000) public void testPreemptedAMRestartOnRMRestart() throws Exception { YarnConfiguration conf = new YarnConfiguration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false); @@ -556,8 +548,8 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception { RMApp app1 = rm1.submitApp(200); RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - CapacityScheduler scheduler = - (CapacityScheduler) rm1.getResourceScheduler(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm1.getResourceScheduler(); ContainerId amContainer = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); @@ -577,7 +569,7 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception { // Forcibly preempt the am container; amContainer = ContainerId.newContainerId(am2.getApplicationAttemptId(), 1); - scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer)); + scheduler.killContainer(scheduler.getRMContainer(amContainer)); rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED); Assert.assertFalse(attempt2.shouldCountTowardsMaxAttemptRetry()); @@ -619,8 +611,6 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception { public void testRMRestartOrFailoverNotCountedForAMFailures() throws Exception { YarnConfiguration conf = new YarnConfiguration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false); @@ -631,8 +621,8 @@ public void testRMRestartOrFailoverNotCountedForAMFailures() MockRM rm1 = new MockRM(conf); MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); rm1.start(); - CapacityScheduler scheduler = - (CapacityScheduler) rm1.getResourceScheduler(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm1.getResourceScheduler(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); nm1.registerNode(); @@ -694,8 +684,6 @@ public void testRMRestartOrFailoverNotCountedForAMFailures() @Test (timeout = 120000) public void testRMAppAttemptFailuresValidityInterval() throws Exception { YarnConfiguration conf = new YarnConfiguration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);