diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index a71cc68..7445f85 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -767,8 +767,9 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerApp application = getApplication(reservedContainer.getApplicationAttemptId()); synchronized (application) { - CSAssignment assignment = application.assignReservedContainer(node, reservedContainer, - clusterResource, schedulingMode); + CSAssignment assignment = + application.assignContainers(clusterResource, node, + currentResourceLimits, schedulingMode, reservedContainer); handleExcessReservedContainer(clusterResource, assignment); return assignment; } @@ -816,7 +817,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // Try to schedule CSAssignment assignment = application.assignContainers(clusterResource, node, - currentResourceLimits, schedulingMode); + currentResourceLimits, schedulingMode, null); if (LOG.isDebugEnabled()) { LOG.debug("post-assignContainers for application " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java index 00c1bb9..e2b2c4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java @@ -27,16 +27,16 @@ public class ContainerAllocation { public static final ContainerAllocation PRIORITY_SKIPPED = new ContainerAllocation(null, null, AllocationState.PRIORITY_SKIPPED); - + public static final ContainerAllocation APP_SKIPPED = new ContainerAllocation(null, null, AllocationState.APP_SKIPPED); public static final ContainerAllocation QUEUE_SKIPPED = new ContainerAllocation(null, null, AllocationState.QUEUE_SKIPPED); - + public static final ContainerAllocation LOCALITY_SKIPPED = new ContainerAllocation(null, null, AllocationState.LOCALITY_SKIPPED); - + RMContainer containerToBeUnreserved; private Resource resourceToBeAllocated = Resources.none(); AllocationState state; @@ -50,26 +50,26 @@ public ContainerAllocation(RMContainer containerToBeUnreserved, this.resourceToBeAllocated = resourceToBeAllocated; this.state = state; } - + public RMContainer getContainerToBeUnreserved() { return containerToBeUnreserved; } - + public Resource getResourceToBeAllocated() { if (resourceToBeAllocated == null) { return Resources.none(); } return resourceToBeAllocated; } - + public AllocationState getAllocationState() { return state; } - + public NodeType getContainerNodeType() { return containerNodeType; } - + public Container getUpdatedContainer() { return updatedContainer; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java index b4168dd..051eec8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java @@ -18,12 +18,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; -import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -36,6 +39,8 @@ * extensible. */ public abstract class ContainerAllocator { + private static final Log LOG = LogFactory.getLog(ContainerAllocator.class); + FiCaSchedulerApp application; final ResourceCalculator rc; final RMContext rmContext; @@ -46,27 +51,8 @@ public ContainerAllocator(FiCaSchedulerApp application, this.rc = rc; this.rmContext = rmContext; } - - /** - * preAllocation is to perform checks, etc. to see if we can/cannot allocate - * container. It will put necessary information to returned - * {@link ContainerAllocation}. - */ - abstract ContainerAllocation preAllocation( - Resource clusterResource, FiCaSchedulerNode node, - SchedulingMode schedulingMode, ResourceLimits resourceLimits, - Priority priority, RMContainer reservedContainer); - - /** - * doAllocation is to update application metrics, create containers, etc. - * According to allocating conclusion decided by preAllocation. - */ - abstract ContainerAllocation doAllocation( - ContainerAllocation allocationResult, Resource clusterResource, - FiCaSchedulerNode node, SchedulingMode schedulingMode, Priority priority, - RMContainer reservedContainer); - - boolean checkHeadroom(Resource clusterResource, + + protected boolean checkHeadroom(Resource clusterResource, ResourceLimits currentResourceLimits, Resource required, FiCaSchedulerNode node) { // If headroom + currentReservation < required, we cannot allocate this @@ -84,6 +70,65 @@ boolean checkHeadroom(Resource clusterResource, required); } + + protected CSAssignment getCSAssignmentFromAllocateResult( + Resource clusterResource, ContainerAllocation result) { + // Handle skipped + boolean skipped = + (result.getAllocationState() == AllocationState.APP_SKIPPED); + CSAssignment assignment = new CSAssignment(skipped); + assignment.setApplication(application); + + // Handle excess reservation + assignment.setExcessReservation(result.getContainerToBeUnreserved()); + + // If we allocated something + if (Resources.greaterThan(rc, clusterResource, + result.getResourceToBeAllocated(), Resources.none())) { + Resource allocatedResource = result.getResourceToBeAllocated(); + Container updatedContainer = result.getUpdatedContainer(); + + assignment.setResource(allocatedResource); + assignment.setType(result.getContainerNodeType()); + + if (result.getAllocationState() == AllocationState.RESERVED) { + // This is a reserved container + LOG.info("Reserved container " + " application=" + + application.getApplicationId() + " resource=" + allocatedResource + + " queue=" + this.toString() + " cluster=" + clusterResource); + assignment.getAssignmentInformation().addReservationDetails( + updatedContainer.getId(), + application.getCSLeafQueue().getQueuePath()); + assignment.getAssignmentInformation().incrReservations(); + Resources.addTo(assignment.getAssignmentInformation().getReserved(), + allocatedResource); + assignment.setFulfilledReservation(true); + } else { + // This is a new container + // Inform the ordering policy + LOG.info("assignedContainer" + " application attempt=" + + application.getApplicationAttemptId() + " container=" + + updatedContainer.getId() + " queue=" + this + " clusterResource=" + + clusterResource); + + application + .getCSLeafQueue() + .getOrderingPolicy() + .containerAllocated(application, + application.getRMContainer(updatedContainer.getId())); + + assignment.getAssignmentInformation().addAllocationDetails( + updatedContainer.getId(), + application.getCSLeafQueue().getQueuePath()); + assignment.getAssignmentInformation().incrAllocations(); + Resources.addTo(assignment.getAssignmentInformation().getAllocated(), + allocatedResource); + } + } + + return assignment; + } + /** * allocate needs to handle following stuffs: * @@ -96,20 +141,7 @@ boolean checkHeadroom(Resource clusterResource, * container, this will also update metrics * */ - public ContainerAllocation allocate(Resource clusterResource, + public abstract CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, Priority priority, - RMContainer reservedContainer) { - ContainerAllocation result = - preAllocation(clusterResource, node, schedulingMode, - resourceLimits, priority, reservedContainer); - - if (AllocationState.ALLOCATED == result.state - || AllocationState.RESERVED == result.state) { - result = doAllocation(result, clusterResource, node, - schedulingMode, priority, reservedContainer); - } - - return result; - } + ResourceLimits resourceLimits, RMContainer reservedContainer); } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 6effcd3..8a31b4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -154,7 +155,6 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, return null; } - @Override ContainerAllocation preAllocation(Resource clusterResource, FiCaSchedulerNode node, SchedulingMode schedulingMode, ResourceLimits resourceLimits, Priority priority, @@ -295,7 +295,7 @@ private ContainerAllocation assignOffSwitchContainers( schedulingMode, currentResoureLimits); } - return ContainerAllocation.QUEUE_SKIPPED; + return ContainerAllocation.APP_SKIPPED; } private ContainerAllocation assignContainersOnNode(Resource clusterResource, @@ -400,7 +400,9 @@ private ContainerAllocation assignContainer(Resource clusterResource, LOG.warn("Node : " + node.getNodeID() + " does not have sufficient resource for request : " + request + " node total capability : " + node.getTotalResource()); - return ContainerAllocation.QUEUE_SKIPPED; + // Skip the locality when it's not a OFF_SWITH request + return type == NodeType.OFF_SWITCH ? ContainerAllocation.APP_SKIPPED + : ContainerAllocation.LOCALITY_SKIPPED; } assert Resources.greaterThan( @@ -457,7 +459,9 @@ private ContainerAllocation assignContainer(Resource clusterResource, // continue)). If we failed to unreserve some resource, we can't // continue. if (null == unreservedContainer) { - return ContainerAllocation.QUEUE_SKIPPED; + // Skip the locality when it's not a OFF_SWITH request + return type == NodeType.OFF_SWITCH ? ContainerAllocation.APP_SKIPPED + : ContainerAllocation.LOCALITY_SKIPPED; } } } @@ -480,7 +484,9 @@ private ContainerAllocation assignContainer(Resource clusterResource, if (LOG.isDebugEnabled()) { LOG.debug("we needed to unreserve to be able to allocate"); } - return ContainerAllocation.QUEUE_SKIPPED; + // Skip the locality when it's not a OFF_SWITH request + return type == NodeType.OFF_SWITCH ? ContainerAllocation.APP_SKIPPED + : ContainerAllocation.LOCALITY_SKIPPED; } } @@ -490,7 +496,9 @@ private ContainerAllocation assignContainer(Resource clusterResource, result.containerNodeType = type; return result; } - return ContainerAllocation.QUEUE_SKIPPED; + // Skip the locality when it's not a OFF_SWITH request + return type == NodeType.OFF_SWITCH ? ContainerAllocation.APP_SKIPPED + : ContainerAllocation.LOCALITY_SKIPPED; } } @@ -578,7 +586,6 @@ private ContainerAllocation handleNewContainerAllocation( return allocationResult; } - @Override ContainerAllocation doAllocation(ContainerAllocation allocationResult, Resource clusterResource, FiCaSchedulerNode node, SchedulingMode schedulingMode, Priority priority, @@ -591,7 +598,7 @@ ContainerAllocation doAllocation(ContainerAllocation allocationResult, // something went wrong getting/creating the container if (container == null) { LOG.warn("Couldn't get container for allocation!"); - return ContainerAllocation.QUEUE_SKIPPED; + return ContainerAllocation.APP_SKIPPED; } if (allocationResult.getAllocationState() == AllocationState.ALLOCATED) { @@ -626,4 +633,63 @@ ContainerAllocation doAllocation(ContainerAllocation allocationResult, return allocationResult; } + + private ContainerAllocation allocate(Resource clusterResource, + FiCaSchedulerNode node, SchedulingMode schedulingMode, + ResourceLimits resourceLimits, Priority priority, + RMContainer reservedContainer) { + ContainerAllocation result = + preAllocation(clusterResource, node, schedulingMode, resourceLimits, + priority, reservedContainer); + + if (AllocationState.ALLOCATED == result.state + || AllocationState.RESERVED == result.state) { + result = + doAllocation(result, clusterResource, node, schedulingMode, priority, + reservedContainer); + } + + return result; + } + + @Override + public CSAssignment assignContainers(Resource clusterResource, + FiCaSchedulerNode node, SchedulingMode schedulingMode, + ResourceLimits resourceLimits, + RMContainer reservedContainer) { + if (reservedContainer == null) { + // Check if application needs more resource, skip if it doesn't need more. + if (!application.hasPendingResourceRequest(rc, + node.getPartition(), clusterResource, schedulingMode)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId() + + ", because it doesn't need more resource, schedulingMode=" + + schedulingMode.name() + " node-label=" + node.getPartition()); + } + return CSAssignment.SKIP_ASSIGNMENT; + } + + // Schedule in priority order + for (Priority priority : application.getPriorities()) { + ContainerAllocation result = + allocate(clusterResource, node, schedulingMode, resourceLimits, + priority, reservedContainer); + + AllocationState allocationState = result.getAllocationState(); + if (allocationState == AllocationState.PRIORITY_SKIPPED) { + continue; + } + return getCSAssignmentFromAllocateResult(clusterResource, result); + } + + // We will reach here if we skipped all priorities of the app, so we will + // skip the app. + return CSAssignment.SKIP_ASSIGNMENT; + } else { + ContainerAllocation result = + allocate(clusterResource, node, schedulingMode, resourceLimits, + reservedContainer.getReservedPriority(), reservedContainer); + return getCSAssignmentFromAllocateResult(clusterResource, result); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index d75b2c3..50d6a87 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -57,10 +57,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AllocationState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.RegularContainerAllocator; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocation; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -286,7 +284,7 @@ public synchronized Resource getTotalPendingRequests() { return ret; } - public synchronized void addPreemptContainer(ContainerId cont){ + public synchronized void addPreemptContainer(ContainerId cont) { // ignore already completed containers if (liveContainers.containsKey(cont)) { containersToPreempt.add(cont); @@ -436,112 +434,19 @@ public RMContainer findNodeToUnreserve(Resource clusterResource, public LeafQueue getCSLeafQueue() { return (LeafQueue)queue; } - - private CSAssignment getCSAssignmentFromAllocateResult( - Resource clusterResource, ContainerAllocation result) { - // Handle skipped - boolean skipped = - (result.getAllocationState() == AllocationState.APP_SKIPPED); - CSAssignment assignment = new CSAssignment(skipped); - assignment.setApplication(this); - - // Handle excess reservation - assignment.setExcessReservation(result.getContainerToBeUnreserved()); - - // If we allocated something - if (Resources.greaterThan(rc, clusterResource, - result.getResourceToBeAllocated(), Resources.none())) { - Resource allocatedResource = result.getResourceToBeAllocated(); - Container updatedContainer = result.getUpdatedContainer(); - - assignment.setResource(allocatedResource); - assignment.setType(result.getContainerNodeType()); - - if (result.getAllocationState() == AllocationState.RESERVED) { - // This is a reserved container - LOG.info("Reserved container " + " application=" + getApplicationId() - + " resource=" + allocatedResource + " queue=" - + this.toString() + " cluster=" + clusterResource); - assignment.getAssignmentInformation().addReservationDetails( - updatedContainer.getId(), getCSLeafQueue().getQueuePath()); - assignment.getAssignmentInformation().incrReservations(); - Resources.addTo(assignment.getAssignmentInformation().getReserved(), - allocatedResource); - assignment.setFulfilledReservation(true); - } else { - // This is a new container - // Inform the ordering policy - LOG.info("assignedContainer" + " application attempt=" - + getApplicationAttemptId() + " container=" - + updatedContainer.getId() + " queue=" + this + " clusterResource=" - + clusterResource); - - getCSLeafQueue().getOrderingPolicy().containerAllocated(this, - getRMContainer(updatedContainer.getId())); - - assignment.getAssignmentInformation().addAllocationDetails( - updatedContainer.getId(), getCSLeafQueue().getQueuePath()); - assignment.getAssignmentInformation().incrAllocations(); - Resources.addTo(assignment.getAssignmentInformation().getAllocated(), - allocatedResource); - } - } - - return assignment; - } public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits currentResourceLimits, - SchedulingMode schedulingMode) { + SchedulingMode schedulingMode, RMContainer reservedContainer) { if (LOG.isDebugEnabled()) { LOG.debug("pre-assignContainers for application " + getApplicationId()); showRequests(); } - // Check if application needs more resource, skip if it doesn't need more. - if (!hasPendingResourceRequest(rc, - node.getPartition(), clusterResource, schedulingMode)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip app_attempt=" + getApplicationAttemptId() - + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-label=" + node.getPartition()); - } - return CSAssignment.SKIP_ASSIGNMENT; - } - synchronized (this) { - // Schedule in priority order - for (Priority priority : getPriorities()) { - ContainerAllocation allocationResult = - containerAllocator.allocate(clusterResource, node, - schedulingMode, currentResourceLimits, priority, null); - - // If it's a skipped allocation - AllocationState allocationState = allocationResult.getAllocationState(); - - if (allocationState == AllocationState.PRIORITY_SKIPPED) { - continue; - } - return getCSAssignmentFromAllocateResult(clusterResource, - allocationResult); - } + return containerAllocator.assignContainers(clusterResource, node, + schedulingMode, currentResourceLimits, reservedContainer); } - - // We will reach here if we skipped all priorities of the app, so we will - // skip the app. - return CSAssignment.SKIP_ASSIGNMENT; - } - - - public synchronized CSAssignment assignReservedContainer( - FiCaSchedulerNode node, RMContainer rmContainer, - Resource clusterResource, SchedulingMode schedulingMode) { - ContainerAllocation result = - containerAllocator.allocate(clusterResource, node, - schedulingMode, new ResourceLimits(Resources.none()), - rmContainer.getReservedPriority(), rmContainer); - - return getCSAssignmentFromAllocateResult(clusterResource, result); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index f419528..a7df274 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -624,16 +624,9 @@ public void testUserLimits() throws Exception { final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = - new FiCaSchedulerApp(appAttemptId_1, user_0, a, - a.getActiveUsersManager(), spyRMContext); - a.submitApplicationAttempt(app_1, user_0); // same user - - final ApplicationAttemptId appAttemptId_2 = - TestUtils.getMockApplicationAttemptId(2, 0); - FiCaSchedulerApp app_2 = - new FiCaSchedulerApp(appAttemptId_2, user_1, a, + new FiCaSchedulerApp(appAttemptId_1, user_1, a, a.getActiveUsersManager(), spyRMContext); - a.submitApplicationAttempt(app_2, user_1); + a.submitApplicationAttempt(app_1, user_1); // different user // Setup some nodes String host_0 = "127.0.0.1"; @@ -649,7 +642,7 @@ public void testUserLimits() throws Exception { // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); app_0.updateResourceRequests(Collections.singletonList( - TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true, + TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 2, true, priority, recordFactory))); app_1.updateResourceRequests(Collections.singletonList( @@ -664,39 +657,38 @@ public void testUserLimits() throws Exception { a.setUserLimit(50); a.setUserLimitFactor(2); - // Now, only user_0 should be active since he is the only one with - // outstanding requests - assertEquals("There should only be 1 active user!", - 1, a.getActiveUsersManager().getNumActiveUsers()); - - // This commented code is key to test 'activeUsers'. - // It should fail the test if uncommented since - // it would increase 'activeUsers' to 2 and stop user_2 - // Pre MAPREDUCE-3732 this test should fail without this block too -// app_2.updateResourceRequests(Collections.singletonList( -// TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, priority, -// recordFactory))); + // There're two active users + assertEquals(2, a.getActiveUsersManager().getNumActiveUsers()); // 1 container to user_0 a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - assertEquals(2*GB, a.getUsedResources().getMemory()); - assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(3*GB, a.getUsedResources().getMemory()); + assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); - // Again one to user_0 since he hasn't exceeded user limit yet + // Allocate one container to app_1. Even if app_0 + // submit earlier, it cannot get this container assigned since user_0 + // exceeded user-limit already. a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - assertEquals(3*GB, a.getUsedResources().getMemory()); - assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(4*GB, a.getUsedResources().getMemory()); + assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); - // One more to user_0 since he is the only active user + // Allocate one container to app_0, before allocating this container, + // user-limit = ceil((4 + 1) / 2) = 3G. app_0's used resource (3G) <= + // user-limit. a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - assertEquals(4*GB, a.getUsedResources().getMemory()); - assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); - assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); + assertEquals(7*GB, a.getUsedResources().getMemory()); + assertEquals(6*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); + + // app_0 doesn't have outstanding resources, there's only one active user. + assertEquals("There should only be 1 active user!", + 1, a.getActiveUsersManager().getNumActiveUsers()); + } @Test @@ -2570,6 +2562,96 @@ public void testFairAssignment() throws Exception { Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); } + + @Test + public void testLocalityDelaySkipsApplication() throws Exception { + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + + // User + String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), spyRMContext); + a.submitApplicationAttempt(app_0, user_0); + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), spyRMContext); + a.submitApplicationAttempt(app_1, user_0); + + // Setup some nodes and racks + String host_0 = "127.0.0.1"; + String rack_0 = "rack_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB); + + String host_1 = "127.0.0.2"; + String rack_1 = "rack_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB); + + String host_2 = "127.0.0.3"; + String rack_2 = "rack_2"; + FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB); + + final int numNodes = 3; + Resource clusterResource = + Resources.createResource(numNodes * (8*GB), numNodes * 16); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests and submit + // App0 has node local request for host_0/host_1, and app1 has node local + // request for host2. + Priority priority = TestUtils.createMockPriority(1); + List app_0_requests_0 = new ArrayList(); + app_0_requests_0.add( + TestUtils.createResourceRequest(host_0, 1*GB, 1, + true, priority, recordFactory)); + app_0_requests_0.add( + TestUtils.createResourceRequest(rack_0, 1*GB, 1, + true, priority, recordFactory)); + app_0_requests_0.add( + TestUtils.createResourceRequest(host_1, 1*GB, 1, + true, priority, recordFactory)); + app_0_requests_0.add( + TestUtils.createResourceRequest(rack_1, 1*GB, 1, + true, priority, recordFactory)); + app_0_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, // one extra + true, priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + List app_1_requests_0 = new ArrayList(); + app_1_requests_0.add( + TestUtils.createResourceRequest(host_2, 1*GB, 1, + true, priority, recordFactory)); + app_1_requests_0.add( + TestUtils.createResourceRequest(rack_2, 1*GB, 1, + true, priority, recordFactory)); + app_1_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // one extra + true, priority, recordFactory)); + app_1.updateResourceRequests(app_1_requests_0); + + // Start testing... + // When doing allocation, even if app_0 submit earlier than app_1, app_1 can + // still get allocated because app_0 is waiting for node-locality-delay + CSAssignment assignment = null; + + // Check app_0's scheduling opportunities increased and app_1 get allocated + assignment = a.assignContainers(clusterResource, node_2, + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + verifyContainerAllocated(assignment, NodeType.NODE_LOCAL); + assertEquals(1, app_0.getSchedulingOpportunities(priority)); + assertEquals(3, app_0.getTotalRequiredResources(priority)); + assertEquals(0, app_0.getLiveContainers().size()); + assertEquals(1, app_1.getLiveContainers().size()); + } private List createListOfApps(int noOfApps, String user, LeafQueue defaultQueue) {