diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 60257b1..4b21526 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -671,7 +671,8 @@ public void showRequests() { } public Resource getCurrentConsumption() { - return attemptResourceUsage.getUsed(); + return Resources.add(attemptResourceUsage.getUsed(), + attemptOpportunisticResourceUsage.getUsed()); } private Container updateContainerAndNMToken(RMContainer rmContainer, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 7da01fd..731ac18 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -276,6 +276,18 @@ private boolean containerResourceAllocated(Resource allocated, return true; } + public void promoteOpportunisticContainer(RMContainer rmContainer) { + assert (rmContainer.getExecutionType() == ExecutionType.OPPORTUNISTIC); + + Resource resource = rmContainer.getContainer().getResource(); + Resources.subtractFrom(allocatedResourceGuaranteed, resource); + numOpportunisticContainers--; + + Resources.addTo(allocatedResourceGuaranteed, resource); + numGuaranteedContainers++; + Resources.subtractFrom(unallocatedResource, resource); + } + /** * Get resources that are not allocated to GUARANTEED containers on the node. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 32998b6..4c875f2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -505,6 +505,36 @@ public RMContainer allocate(NodeType type, FSSchedulerNode node, } /** + * Try to promote an OPPORTUNISTIC container of this FSAppAttempt. + * @param rmContainer the OPPORTUNISTIC container to promote + * @param node the node on which the container is allocated + * @return + */ + public boolean tryToPromoteOpportunisticContainer( + RMContainer rmContainer, FSSchedulerNode node) { + // only an OPPORTUNISTIC container can be promoted + assert (ExecutionType.OPPORTUNISTIC == rmContainer.getExecutionType()); + // the container to be promoted must be allocated on the given node + assert (rmContainer.getNodeId().equals(node.getNodeID())); + // the container to be promoted must belong to the current app attempt + assert (rmContainer.getApplicationAttemptId().equals( + getApplicationAttemptId())); + + boolean containerPromoted = false; + + Resource resource = rmContainer.getContainer().getResource(); + if (Resources.fitsIn(resource, node.getUnallocatedResource())) { + node.promoteOpportunisticContainer(rmContainer); + attemptOpportunisticResourceUsage.decUsed(resource); + attemptResourceUsage.incUsed(resource); + getQueue().incUsedGuaranteedResource(resource); + containerPromoted = true; + } + + return containerPromoted; + } + + /** * Should be called when the scheduler assigns a container at a higher * degree of locality than the current threshold. Reset the allowed locality * level to a higher degree of locality. @@ -1160,7 +1190,7 @@ private boolean isValidReservation(FSSchedulerNode node) { * * @param node * Node that the application has an existing reservation on - * @return whether the reservation on the given node is valid. + * @return true if the reservation is turned into an allocation */ boolean assignReservedContainer(FSSchedulerNode node) { RMContainer rmContainer = node.getReservedContainer(); @@ -1187,8 +1217,10 @@ boolean assignReservedContainer(FSSchedulerNode node) { if (Resources.fitsIn(node.getReservedContainer().getReservedResource(), node.getUnallocatedResource())) { assignContainer(node, false, true); + + return true; } - return true; + return false; } /** @@ -1356,7 +1388,7 @@ public Resource getMaxShare() { @Override public Resource getGuaranteedResourceUsage() { - return getCurrentConsumption(); + return attemptResourceUsage.getUsed(); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index a53dda4..fb0c2e0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -35,10 +36,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; @@ -66,6 +71,13 @@ // slated for preemption private Resource totalResourcesPreempted = Resource.newInstance(0, 0); + // The set of containers that need to be handled before resource + // available on the node can be assigned to resource requests. + // This is a FIFO queue of reserved and opportunistic containers + // on the node. + private final LinkedHashSet priorityContainers = + new LinkedHashSet(1); + @VisibleForTesting public FSSchedulerNode(RMNode node, boolean usePortForNodeName) { super(node, usePortForNodeName); @@ -124,6 +136,7 @@ public synchronized void reserveResource( + application.getApplicationId()); } setReservedContainer(container); + priorityContainers.add(container); this.reservedAppSchedulable = (FSAppAttempt) application; } @@ -142,11 +155,17 @@ public synchronized void unreserveResource( " for application " + reservedApplication.getApplicationId() + " on node " + this); } - + priorityContainers.remove(getReservedContainer()); setReservedContainer(null); this.reservedAppSchedulable = null; } + @Override + public void promoteOpportunisticContainer(RMContainer rmContainer) { + super.promoteOpportunisticContainer(rmContainer); + priorityContainers.remove(rmContainer); + } + synchronized FSAppAttempt getReservedAppSchedulable() { return reservedAppSchedulable; } @@ -274,6 +293,13 @@ protected synchronized void allocateContainer(RMContainer rmContainer, } else { LOG.error("Allocated empty container" + rmContainer.getContainerId()); } + + // keep track of opportunistic containers allocated so that we can promote + // them before we assign resources available to resource requests. + if (ExecutionType.OPPORTUNISTIC.equals( + rmContainer.getContainer().getExecutionType())) { + priorityContainers.add(rmContainer); + } } /** @@ -292,4 +318,8 @@ public synchronized void releaseContainer(ContainerId containerId, containersForPreemption.remove(container); } } + + public List getPriorityContainers() { + return new ArrayList<>(priorityContainers); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index e6c0d9d..b49c3bb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -69,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -102,6 +104,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -1118,7 +1121,8 @@ void attemptScheduling(FSSchedulerNode node) { // when C does not qualify for preemption itself. attemptToAssignPreemptedResources(node); - boolean validReservation = attemptToAssignReservedResources(node); + //boolean validReservation = attemptToAssignReservedResources(node); + boolean validReservation = beforeAttemptToAssignGuaranteedResources(node); if (!validReservation) { // only attempt to assign GUARANTEED containers if there is no // reservation on the node because @@ -1137,6 +1141,41 @@ void attemptScheduling(FSSchedulerNode node) { } } + private boolean beforeAttemptToAssignGuaranteedResources( + FSSchedulerNode node) { + boolean assigned = false; + Map promotion = new HashMap<>(0); + + for (RMContainer rmContainer : node.getPriorityContainers()) { + boolean isReservedContainer = + rmContainer.getReservedSchedulerKey() != null; + if (isReservedContainer) { + // attempt to assign resources that have been reserved + assigned = attemptToAssignReservedResources(node); + } else { + FSAppAttempt appAttempt = getSchedulerApp( + rmContainer.getApplicationAttemptId()); + // attempt to promote the OPPORTUNISTIC container + if (appAttempt.tryToPromoteOpportunisticContainer(rmContainer, node)) { + assigned = true; + promotion.put(rmContainer.getContainer(), + ContainerUpdateType.PROMOTE_EXECUTION_TYPE); + } + } + if (!assigned) { + // break out of the loop because assigned being false + // indicates there is no more resources are available. + break; + } + } + + if (!promotion.isEmpty()) { + rmContext.getDispatcher().getEventHandler().handle( + new RMNodeUpdateContainerEvent(node.getNodeID(), promotion)); + } + return assigned; + } + /** * Assign the reserved resource to the application that have reserved it. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 5878ccd..e51c759 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -3353,6 +3353,581 @@ public void testMaxOverallocationPerNode() throws Exception { } } + /** + * Test promotion of a single OPPORTUNISTIC container when no resources are + * reserved on the node where the container is allocated. + */ + @Test + public void testSingleOpportunisticContainerPromotionWithoutReservation() + throws Exception { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + "yarn.resource-types.memory-mb.increment-allocation", 1024); + conf.setInt("yarn.resource-types.memory-mb.increment-allocation", 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation + // threshold of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(4096, 4), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create two scheduling requests that leave no unallocated resources + ApplicationAttemptId appAttempt1 = + createSchedulingRequest(2048, "queue1", "user1", 1, false); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + ApplicationAttemptId appAttempt2 = + createSchedulingRequest(2048, "queue1", "user1", 1, false); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(4096, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers2.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers2.get(0).getExecutionType()); + + // node utilization is low after the two container run on the node + ContainerStatus container1Status = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + ContainerStatus container2Status = ContainerStatus.newInstance( + allocatedContainers2.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + List containerStatuses = new ArrayList<>(2); + containerStatuses.add(container1Status); + containerStatuses.add(container2Status); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(containerStatuses, Collections.emptyList()), + ResourceUtilization.newInstance(1024, 0, 0.1f)); + + // create another scheduling request that asks for more than what's left + // unallocated on the node but can be served with overallocation. + ApplicationAttemptId appAttempt3 = + createSchedulingRequest(1024, "queue2", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getMemorySize()); + List allocatedContainers3 = + scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers3.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.OPPORTUNISTIC, + allocatedContainers3.get(0).getExecutionType()); + assertTrue("No reservation should be made for the third request", + scheduler.getNode(node.getNodeID()).getReservedContainer() == null); + + // now the first GUARANTEED container finishes + List finishedContainers = Collections.singletonList( + ContainerStatus.newInstance(allocatedContainers1.get(0).getId(), + ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS)); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.emptyList(), finishedContainers), + ResourceUtilization.newInstance(1024, 0, 0.1f)); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + + // the OPPORTUNISTIC container should be promoted + assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). + getGuaranteedResourceUsage().getMemorySize()); + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getMemorySize()); + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + memoryAllocationIncrement); + } + } + + /** + * Test promotion of two OPPORTUNISTIC containers when no resources are + * reserved on the node where the container is allocated. + */ + @Test + public void testMultipleOpportunisticContainerPromotionWithoutReservation() + throws Exception { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + "yarn.resource-types.memory-mb.increment-allocation", 1024); + conf.setInt("yarn.resource-types.memory-mb.increment-allocation", 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation + // threshold of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(4096, 4), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create two scheduling requests that leave no unallocated resources + ApplicationAttemptId appAttempt1 = + createSchedulingRequest(2048, "queue1", "user1", 1, false); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + ApplicationAttemptId appAttempt2 = + createSchedulingRequest(2048, "queue1", "user1", 1, false); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(4096, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers2.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers2.get(0).getExecutionType()); + + // node utilization is low after the two container run on the node + ContainerStatus container1Status = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + ContainerStatus container2Status = ContainerStatus.newInstance( + allocatedContainers2.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + List containerStatuses = new ArrayList<>(2); + containerStatuses.add(container1Status); + containerStatuses.add(container2Status); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(containerStatuses, Collections.emptyList()), + ResourceUtilization.newInstance(1024, 0, 0.1f)); + + // create another scheduling request that asks for more than what's left + // unallocated on the node but can be served with overallocation. + ApplicationAttemptId appAttempt3 = + createSchedulingRequest(1536, "queue2", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(1536, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getMemorySize()); + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers3 = + scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers3.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.OPPORTUNISTIC, + allocatedContainers3.get(0).getExecutionType()); + assertTrue("No reservation should be made for the third request", + scheduler.getNode(node.getNodeID()).getReservedContainer() == null); + + // node utilization is low after the third container run on the node + ContainerStatus container3Status = ContainerStatus.newInstance( + allocatedContainers3.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.singletonList(container3Status), + Collections.emptyList()), + ResourceUtilization.newInstance(2000, 0, 0.2f)); + + // create another scheduling request that asks for more than what's left + // unallocated on the node but can be served with overallocation. + ApplicationAttemptId appAttempt4 = + createSchedulingRequest(1024, "queue3", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(1024, scheduler.getQueueManager().getQueue("queue3"). + getOpportunisticResourceUsage().getMemorySize()); + assertEquals(0, scheduler.getQueueManager().getQueue("queue3"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers4 = + scheduler.getSchedulerApp(appAttempt4).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers4.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.OPPORTUNISTIC, + allocatedContainers4.get(0).getExecutionType()); + + // now the first GUARANTEED container finishes + List finishedContainers = Collections.singletonList( + ContainerStatus.newInstance(allocatedContainers1.get(0).getId(), + ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS)); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.emptyList(), finishedContainers), + ResourceUtilization.newInstance(1024, 0, 0.1f)); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + + // the first OPPORTUNISTIC container should be promoted + assertEquals(1536, scheduler.getQueueManager().getQueue("queue2"). + getGuaranteedResourceUsage().getMemorySize()); + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getMemorySize()); + // the second OPPORLTUNISTIC container should not be promoted + assertEquals(1024, scheduler.getQueueManager().getQueue("queue3"). + getOpportunisticResourceUsage().getMemorySize()); + assertEquals(0, scheduler.getQueueManager().getQueue("queue3"). + getGuaranteedResourceUsage().getMemorySize()); + + // now the second GUARANTEED container finishes + finishedContainers = Collections.singletonList( + ContainerStatus.newInstance(allocatedContainers2.get(0).getId(), + ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS)); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.emptyList(), finishedContainers), + ResourceUtilization.newInstance(3000, 0, 0.1f)); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + + // the second OPPORTUNISTIC container should be promoted + assertEquals(1024, scheduler.getQueueManager().getQueue("queue3"). + getGuaranteedResourceUsage().getMemorySize()); + assertEquals(0, scheduler.getQueueManager().getQueue("queue3"). + getOpportunisticResourceUsage().getMemorySize()); + + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + memoryAllocationIncrement); + } + + } + /** + * Test promotion of OPPORTUNISTIC container when there is resources + * reserved before the container is allocated. The scheduler should + * satisfy the reservation first before it promotes the OPPORTUNISTIC + * container when resources are released. + */ + @Test + public void testOpportunisticContainerPromotionWithPriorReservation() + throws Exception { + + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + "yarn.resource-types.memory-mb.increment-allocation", 1024); + conf.setInt("yarn.resource-types.memory-mb.increment-allocation", 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation + // threshold of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(4096, 4), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create two scheduling requests that leave no unallocated resources + ApplicationAttemptId appAttempt1 = + createSchedulingRequest(2048, "queue1", "user1", 1, false); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + ApplicationAttemptId appAttempt2 = + createSchedulingRequest(2048, "queue1", "user1", 1, false); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(4096, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers2.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers2.get(0).getExecutionType()); + + // node utilization is low after the two container run on the node + ContainerStatus container1Status = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + ContainerStatus container2Status = ContainerStatus.newInstance( + allocatedContainers2.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + List containerStatuses = new ArrayList<>(2); + containerStatuses.add(container1Status); + containerStatuses.add(container2Status); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(containerStatuses, Collections.emptyList()), + ResourceUtilization.newInstance(1024, 0, 0.1f)); + + // create another scheduling request that opts out of oversubscription + ApplicationAttemptId appAttempt3 = + createSchedulingRequest(2000, "queue2", "user1", 1, true); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers3 = + scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers3.size() == 0); + // verify that a reservation is made for the second request + assertTrue("A reservation should be made for the third request", + scheduler.getNode(node.getNodeID()).getReservedContainer(). + getReservedResource().equals(Resource.newInstance(2000, 1))); + + // create another scheduling request that asks for more than what's left + // unallocated on the node but can be served with overallocation. + ApplicationAttemptId appAttempt4 = + createSchedulingRequest(1024, "queue3", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(1024, scheduler.getQueueManager().getQueue("queue3"). + getOpportunisticResourceUsage().getMemorySize()); + List allocatedContainers4 = + scheduler.getSchedulerApp(appAttempt4).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers4.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.OPPORTUNISTIC, + allocatedContainers4.get(0).getExecutionType()); + assertTrue("A reservation should still be made for the second request", + scheduler.getNode(node.getNodeID()).getReservedContainer(). + getReservedResource().equals(Resource.newInstance(2000, 1))); + + // now the first GUARANTEED container finishes + List finishedContainers = Collections.singletonList( + ContainerStatus.newInstance(allocatedContainers1.get(0).getId(), + ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS)); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.emptyList(), finishedContainers), + ResourceUtilization.newInstance(1024, 0, 0.1f)); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + + // the reserved container of the request that opted out of oversubscription + // should now be satisfied with a GUARANTEED container + assertEquals(2000, scheduler.getQueueManager().getQueue("queue2"). + getGuaranteedResourceUsage().getMemorySize()); + allocatedContainers3 = + scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers3.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers3.get(0).getExecutionType()); + assertTrue("The reservation for the third request should be canceled", + scheduler.getNode(node.getNodeID()).getReservedContainer() == null); + // the OPPORTUNISTIC container should not be promoted given the released + // resources are taken by handling the reservation + assertEquals(1024, scheduler.getQueueManager().getQueue("queue3"). + getOpportunisticResourceUsage().getMemorySize()); + + // now the second GUARANTEED container finishes + finishedContainers = Collections.singletonList( + ContainerStatus.newInstance(allocatedContainers2.get(0).getId(), + ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS)); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.emptyList(), finishedContainers), + ResourceUtilization.newInstance(3000, 0, 0.1f)); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + + // the OPPORTUNISTIC container should be promoted + assertEquals(1024, scheduler.getQueueManager().getQueue("queue3"). + getGuaranteedResourceUsage().getMemorySize()); + assertEquals(0, scheduler.getQueueManager().getQueue("queue3"). + getOpportunisticResourceUsage().getMemorySize()); + + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + memoryAllocationIncrement); + } + } + + /** + * Test promotion of OPPORTUNISTIC container when there is resources + * reserved after the container is allocated. The scheduler should + * promotes the OPPORTUNISTIC container before it satisfy the reservation + * when resources are released. + */ + @Test + public void testOpportunisticContainerPromotionWithPostReservation() + throws Exception { + + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + "yarn.resource-types.memory-mb.increment-allocation", 1024); + conf.setInt("yarn.resource-types.memory-mb.increment-allocation", 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation + // threshold of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(4096, 4), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create two scheduling requests that leave no unallocated resources + ApplicationAttemptId appAttempt1 = + createSchedulingRequest(2048, "queue1", "user1", 1, false); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + ApplicationAttemptId appAttempt2 = + createSchedulingRequest(2048, "queue1", "user1", 1, false); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(4096, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers2.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers2.get(0).getExecutionType()); + + // node utilization is low after the two container run on the node + ContainerStatus container1Status = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + ContainerStatus container2Status = ContainerStatus.newInstance( + allocatedContainers2.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + List containerStatuses = new ArrayList<>(2); + containerStatuses.add(container1Status); + containerStatuses.add(container2Status); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(containerStatuses, Collections.emptyList()), + ResourceUtilization.newInstance(1024, 0, 0.1f)); + + // create another scheduling request that asks for more than what's left + // unallocated on the node but can be served with overallocation. + ApplicationAttemptId appAttempt3 = + createSchedulingRequest(1024, "queue2", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getMemorySize()); + List allocatedContainers3 = + scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers3.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.OPPORTUNISTIC, + allocatedContainers3.get(0).getExecutionType()); + assertTrue("No reservation should be made for the third request", + scheduler.getNode(node.getNodeID()).getReservedContainer() == null); + + // create another scheduling request that opts out of oversubscription + ApplicationAttemptId appAttempt4 = + createSchedulingRequest(2000, "queue3", "user1", 1, true); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(0, scheduler.getQueueManager().getQueue("queue3"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers4 = + scheduler.getSchedulerApp(appAttempt4).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers4.size() == 0); + // verify that a reservation is made for the second request + assertTrue("A reservation should be made for the fourth request", + scheduler.getNode(node.getNodeID()).getReservedContainer(). + getReservedResource().equals(Resource.newInstance(2000, 1))); + + // now the first GUARANTEED container finishes + List finishedContainers = Collections.singletonList( + ContainerStatus.newInstance(allocatedContainers1.get(0).getId(), + ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS)); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.emptyList(), finishedContainers), + ResourceUtilization.newInstance(1024, 0, 0.1f)); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + + // the OPPORTUNISTIC container should be promoted + assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). + getGuaranteedResourceUsage().getMemorySize()); + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getMemorySize()); + assertTrue("A reservation should still be made for the fourth request", + scheduler.getNode(node.getNodeID()).getReservedContainer(). + getReservedResource().equals(Resource.newInstance(2000, 1))); + + // now the second GUARANTEED container finishes + finishedContainers = Collections.singletonList( + ContainerStatus.newInstance(allocatedContainers2.get(0).getId(), + ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS)); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.emptyList(), finishedContainers), + ResourceUtilization.newInstance(3000, 0, 0.1f)); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + + // the reserved container of the request that opted out of oversubscription + // should now be satisfied with a GUARANTEED container + assertEquals(2000, scheduler.getQueueManager().getQueue("queue3"). + getGuaranteedResourceUsage().getMemorySize()); + allocatedContainers4 = + scheduler.getSchedulerApp(appAttempt4).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers4.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers4.get(0).getExecutionType()); + assertTrue("The reservation for the fourth request should be canceled", + scheduler.getNode(node.getNodeID()).getReservedContainer() == null); + + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + memoryAllocationIncrement); + } + } @Test public void testAclSubmitApplication() throws Exception { // Set acl's