diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 744776a..44aad67 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1162,10 +1162,11 @@ private void attemptToAssignReservedResourcesOrPromoteOpportunisticContainers( for (RMContainer rmContainer : promoted) { FSAppAttempt appAttempt = getSchedulerApp( rmContainer.getApplicationAttemptId()); - appAttempt.opportunisticContainerPromoted(rmContainer); - - promotion.put(rmContainer.getContainer(), - ContainerUpdateType.PROMOTE_EXECUTION_TYPE); + if (appAttempt != null) { + appAttempt.opportunisticContainerPromoted(rmContainer); + promotion.put(rmContainer.getContainer(), + ContainerUpdateType.PROMOTE_EXECUTION_TYPE); + } } if (!promotion.isEmpty()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 5847ca9..ec4f082 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -119,6 +119,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; @@ -3948,6 +3949,110 @@ public void testOpportunisticContainerPromotionWithPostReservation() } @Test + public void testKillingApplicationWithOpportunisticContainersAssigned() + throws IOException { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + "yarn.resource-types.memory-mb.increment-allocation", 1024); + conf.setInt("yarn.resource-types.memory-mb.increment-allocation", 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation + // threshold of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(4096, 4), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create two scheduling requests that leave no unallocated resources + ApplicationAttemptId appAttempt1 = + createSchedulingRequest(2048, "queue1", "user1", 1, false); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + ApplicationAttemptId appAttempt2 = + createSchedulingRequest(2048, "queue1", "user1", 1, false); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(4096, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers2.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers2.get(0).getExecutionType()); + + // node utilization is low after the two container run on the node + ContainerStatus container1Status = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + ContainerStatus container2Status = ContainerStatus.newInstance( + allocatedContainers2.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + List containerStatuses = new ArrayList<>(2); + containerStatuses.add(container1Status); + containerStatuses.add(container2Status); + node.updateContainersInfoAndUtilization( + new UpdatedContainerInfo(containerStatuses, Collections.emptyList()), + ResourceUtilization.newInstance(1024, 0, 0.1f)); + + // create another scheduling request that asks for more than what's left + // unallocated on the node but can be served with overallocation. + ApplicationAttemptId appAttempt3 = + createSchedulingRequest(1024, "queue2", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getMemorySize()); + List allocatedContainers3 = + scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers3.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.OPPORTUNISTIC, + allocatedContainers3.get(0).getExecutionType()); + assertTrue("No reservation should be made for the third request", + scheduler.getNode(node.getNodeID()).getReservedContainer() == null); + + // the third app attempt, which has an opportunistic container assigned, + // is killed. + scheduler.handle(new AppRemovedSchedulerEvent( + appAttempt3.getApplicationId(), RMAppState.KILLED)); + + // the first GUARANTEED container finishes + List finishedContainers = Collections.singletonList( + ContainerStatus.newInstance(allocatedContainers1.get(0).getId(), + ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS)); + node.updateContainersInfoAndUtilization( + new UpdatedContainerInfo(Collections.emptyList(), finishedContainers), + ResourceUtilization.newInstance(1024, 0, 0.1f)); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt("yarn.resource-types.memory-mb.increment-allocation", + memoryAllocationIncrement); + } + } + + @Test public void testAclSubmitApplication() throws Exception { // Set acl's conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);