diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java index 721eb36..c81775b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java @@ -50,6 +50,12 @@ public ResourceLimits(Resource limit, Resource amountNeededUnreserve) { this.limit = limit; } + public ResourceLimits(ResourceLimits limits) { + this.amountNeededUnreserve = limits.amountNeededUnreserve; + this.limit = limits.limit; + this.headroom = limits.headroom; + } + public Resource getLimit() { return limit; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java index 6406efe..9daa619 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java @@ -34,36 +34,47 @@ public static final CSAssignment NULL_ASSIGNMENT = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); - public static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true); + public static final CSAssignment SKIP_ASSIGNMENT = + new CSAssignment(SkippedType.OTHER_SKIPPED); private Resource resource; private NodeType type; private RMContainer excessReservation; private FiCaSchedulerApp application; - private final boolean skipped; + private SkippedType skipped; + + /** + * Reason for the queue to get skipped. + */ + public enum SkippedType { + NO_SKIPPED, + QUEUE_LIMIT_SKIPPED, + OTHER_SKIPPED + } + private boolean fulfilledReservation; private final AssignmentInformation assignmentInformation; private boolean increaseAllocation; private List containersToKill; public CSAssignment(Resource resource, NodeType type) { - this(resource, type, null, null, false, false); + this(resource, type, null, null, SkippedType.NO_SKIPPED, false); } public CSAssignment(FiCaSchedulerApp application, RMContainer excessReservation) { this(excessReservation.getContainer().getResource(), NodeType.NODE_LOCAL, - excessReservation, application, false, false); + excessReservation, application, SkippedType.NO_SKIPPED, false); } - public CSAssignment(boolean skipped) { + public CSAssignment(SkippedType skipped) { this(Resource.newInstance(0, 0), NodeType.NODE_LOCAL, null, null, skipped, false); } public CSAssignment(Resource resource, NodeType type, RMContainer excessReservation, FiCaSchedulerApp application, - boolean skipped, boolean fulfilledReservation) { + SkippedType skipped, boolean fulfilledReservation) { this.resource = resource; this.type = type; this.excessReservation = excessReservation; @@ -73,6 +84,16 @@ public CSAssignment(Resource resource, NodeType type, this.assignmentInformation = new AssignmentInformation(); } + public CSAssignment(CSAssignment csAssignment) { + this.resource = csAssignment.resource; + this.type = csAssignment.type; + this.excessReservation = csAssignment.excessReservation; + this.application = csAssignment.application; + this.skipped = csAssignment.skipped; + this.fulfilledReservation = csAssignment.fulfilledReservation; + this.assignmentInformation = csAssignment.assignmentInformation; + } + public Resource getResource() { return resource; } @@ -105,10 +126,14 @@ public void setExcessReservation(RMContainer rmContainer) { excessReservation = rmContainer; } - public boolean getSkipped() { + public SkippedType getSkippedType() { return skipped; } - + + public void setSkippedType(SkippedType skippedType) { + this.skipped = skippedType; + } + @Override public String toString() { String ret = "resource:" + resource.toString(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 6dcafec..558446a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; @@ -972,8 +973,12 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // Done return assignment; - } else if (assignment.getSkipped()) { + } else if (assignment.getSkippedType() + == CSAssignment.SkippedType.OTHER_SKIPPED) { application.updateNodeInfoForAMDiagnostics(node); + } else if(assignment.getSkippedType() + == CSAssignment.SkippedType.QUEUE_LIMIT_SKIPPED) { + return assignment; } else { // If we don't allocate anything, and it is not skipped by application, // we will return to respect FIFO of applications diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 6fcd6c1..dd781fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -474,6 +474,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, " cluster=" + clusterResource); } else { + assignment.setSkippedType(assignedToChild.getSkippedType()); break; } @@ -568,9 +569,10 @@ private ResourceLimits getResourceLimitsOfChild(CSQueue child, private synchronized CSAssignment assignContainersToChildQueues( Resource cluster, FiCaSchedulerNode node, ResourceLimits limits, SchedulingMode schedulingMode) { - CSAssignment assignment = - new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); - + CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT; + + ResourceLimits parentLimits = new ResourceLimits(limits); + CSAssignment.SkippedType skippedType = CSAssignment.SkippedType.NO_SKIPPED; printChildQueues(); // Try to assign to most 'under-served' sub-queue @@ -584,7 +586,8 @@ private synchronized CSAssignment assignContainersToChildQueues( // Get ResourceLimits of child queue before assign containers ResourceLimits childLimits = - getResourceLimitsOfChild(childQueue, cluster, limits, node.getPartition()); + getResourceLimitsOfChild(childQueue, cluster, parentLimits, + node.getPartition()); assignment = childQueue.assignContainers(cluster, node, childLimits, schedulingMode); @@ -611,9 +614,29 @@ private synchronized CSAssignment assignContainersToChildQueues( } } break; + } else { + if (assignment.getSkippedType() + == CSAssignment.SkippedType.QUEUE_LIMIT_SKIPPED) { + skippedType = CSAssignment.SkippedType.QUEUE_LIMIT_SKIPPED; + Resource newParentResource = Resources.subtract( + parentLimits.getLimit(), Resources.max(resourceCalculator, + cluster, childLimits.getHeadroom(), Resources.none())); + LOG.debug("Update parentLimits for " + this.getQueueName() + + " from " + parentLimits.getLimit().toString() + " to " + + newParentResource.toString() + " and childQueue=" + + childQueue.getQueueName()); + parentLimits.setLimit(newParentResource); + } } } - + + if(!skippedType.equals(assignment.getSkippedType())) { + //Make a copy of the assignment to avoid changing when + // returned assignment == NULL_ASSIGNMENT + CSAssignment csAssignment = new CSAssignment(assignment); + csAssignment.setSkippedType(skippedType); + return csAssignment; + } return assignment; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java index afac235..b58e022 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java @@ -55,8 +55,10 @@ protected CSAssignment getCSAssignmentFromAllocateResult( Resource clusterResource, ContainerAllocation result, RMContainer rmContainer) { // Handle skipped - boolean skipped = - (result.getAllocationState() == AllocationState.APP_SKIPPED); + CSAssignment.SkippedType skipped = + (result.getAllocationState() == AllocationState.APP_SKIPPED) ? + CSAssignment.SkippedType.OTHER_SKIPPED : + CSAssignment.SkippedType.NO_SKIPPED; CSAssignment assignment = new CSAssignment(skipped); assignment.setApplication(application); @@ -110,6 +112,11 @@ protected CSAssignment getCSAssignmentFromAllocateResult( } assignment.setContainersToKill(result.getToKillContainers()); + } else { + if (result.getAllocationState() == AllocationState.QUEUE_SKIPPED) { + assignment.setSkippedType( + CSAssignment.SkippedType.QUEUE_LIMIT_SKIPPED); + } } return assignment; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java index 25e5824..ba2d7b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java @@ -70,7 +70,7 @@ private CSAssignment createReservedIncreasedCSAssignment( SchedContainerChangeRequest request) { CSAssignment assignment = new CSAssignment(request.getDeltaCapacity(), NodeType.NODE_LOCAL, null, - application, false, false); + application, CSAssignment.SkippedType.NO_SKIPPED, false); Resources.addTo(assignment.getAssignmentInformation().getReserved(), request.getDeltaCapacity()); assignment.getAssignmentInformation().incrReservations(); @@ -87,7 +87,7 @@ private CSAssignment createSuccessfullyIncreasedCSAssignment( SchedContainerChangeRequest request, boolean fromReservation) { CSAssignment assignment = new CSAssignment(request.getDeltaCapacity(), NodeType.NODE_LOCAL, null, - application, false, fromReservation); + application, CSAssignment.SkippedType.NO_SKIPPED, fromReservation); Resources.addTo(assignment.getAssignmentInformation().getAllocated(), request.getDeltaCapacity()); assignment.getAssignmentInformation().incrAllocations(); @@ -308,7 +308,8 @@ public CSAssignment assignContainers(Resource clusterResource, // Try to allocate the increase request assigned = allocateIncreaseRequest(node, clusterResource, increaseRequest); - if (!assigned.getSkipped()) { + if (assigned.getSkippedType() + == CSAssignment.SkippedType.NO_SKIPPED) { // When we don't skip this request, which means we either allocated // OR reserved this request. We will break break; @@ -324,7 +325,8 @@ public CSAssignment assignContainers(Resource clusterResource, } // We may have allocated something - if (assigned != null && !assigned.getSkipped()) { + if (assigned != null && assigned.getSkippedType() + == CSAssignment.SkippedType.NO_SKIPPED) { break; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index e64d6bb..be16472 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -86,6 +86,13 @@ public static Resource newResource(int mem) { return rs; } + public static Resource newResource(int mem, int vCores) { + Resource rs = recordFactory.newRecordInstance(Resource.class); + rs.setMemorySize(mem); + rs.setVirtualCores(vCores); + return rs; + } + public static Resource newUsedResource(Resource total) { Resource rs = recordFactory.newRecordInstance(Resource.class); rs.setMemorySize((int)(Math.random() * total.getMemorySize())); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 7c34292..98c85d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -113,6 +113,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event. + ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -164,6 +166,12 @@ private static final String B3 = B + ".b3"; private static float A_CAPACITY = 10.5f; private static float B_CAPACITY = 89.5f; + private static final String P1 = CapacitySchedulerConfiguration.ROOT + ".p1"; + private static final String P2 = CapacitySchedulerConfiguration.ROOT + ".p2"; + private static final String X1 = P1 + ".x1"; + private static final String X2 = P1 + ".x2"; + private static final String Y1 = P2 + ".y1"; + private static final String Y2 = P2 + ".y2"; private static float A1_CAPACITY = 30; private static float A2_CAPACITY = 70; private static float B1_CAPACITY = 79.2f; @@ -409,7 +417,52 @@ private CapacitySchedulerConfiguration setupQueueConfiguration( LOG.info("Setup top-level queues a and b"); return conf; } - + + private CapacitySchedulerConfiguration setupBlockedQueueConfiguration( + CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{"a", "b"}); + + conf.setCapacity(A, 80f); + conf.setCapacity(B, 20f); + conf.setUserLimitFactor(A, 100); + conf.setUserLimitFactor(B, 100); + conf.setMaximumCapacity(A, 100); + conf.setMaximumCapacity(B, 100); + LOG.info("Setup top-level queues a and b"); + return conf; + } + + private CapacitySchedulerConfiguration setupOtherBlockedQueueConfiguration( + CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{"p1", "p2"}); + + conf.setCapacity(P1, 50f); + conf.setMaximumCapacity(P1, 50f); + conf.setCapacity(P2, 50f); + conf.setMaximumCapacity(P2, 100f); + // Define 2nd-level queues + conf.setQueues(P1, new String[] {"x1", "x2"}); + conf.setCapacity(X1, 80f); + conf.setMaximumCapacity(X1, 100f); + conf.setUserLimitFactor(X1, 2f); + conf.setCapacity(X2, 20f); + conf.setMaximumCapacity(X2, 100f); + conf.setUserLimitFactor(X2, 2f); + + conf.setQueues(P2, new String[]{"y1", "y2"}); + conf.setCapacity(Y1, 80f); + conf.setUserLimitFactor(Y1, 2f); + conf.setCapacity(Y2, 20f); + conf.setUserLimitFactor(Y2, 2f); + return conf; + } + @Test public void testMaximumCapacitySetup() { float delta = 0.0000001f; @@ -3373,4 +3426,237 @@ public void handle(Event event) { Assert.assertEquals(availableResource.getMemorySize(), 0); Assert.assertEquals(availableResource.getVirtualCores(), 0); } + + @Test + public void testCSReservationWithRootUnblocked() throws Exception { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + conf.setResourceComparator(DominantResourceCalculator.class); + setupOtherBlockedQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + ParentQueue q = (ParentQueue) cs.getQueue("p1"); + + Assert.assertNotNull(q); + String host = "127.0.0.1"; + String host1 = "test"; + RMNode node = + MockNodes.newNodeInfo(0, MockNodes.newResource(8 * GB, 8), 1, host); + RMNode node1 = + MockNodes.newNodeInfo(0, MockNodes.newResource(8 * GB, 8), 2, host1); + cs.handle(new NodeAddedSchedulerEvent(node)); + cs.handle(new NodeAddedSchedulerEvent(node1)); + ApplicationAttemptId appAttemptId1 = + appHelper(rm, cs, 100, 1, "x1", "userX1"); + ApplicationAttemptId appAttemptId2 = + appHelper(rm, cs, 100, 2, "x2", "userX2"); + ApplicationAttemptId appAttemptId3 = + appHelper(rm, cs, 100, 3, "y1", "userY1"); + RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + Priority priority = TestUtils.createMockPriority(1); + ResourceRequest y1Req = null; + ResourceRequest x1Req = null; + ResourceRequest x2Req = null; + for(int i=0; i < 4; i++) { + y1Req = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId3, + Collections.singletonList(y1Req), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + } + assertEquals("Y1 Used Resource should be 4 GB", 4 * GB, + cs.getQueue("y1").getUsedResources().getMemorySize()); + assertEquals("P2 Used Resource should be 4 GB", 4 * GB, + cs.getQueue("p2").getUsedResources().getMemorySize()); + + for(int i=0; i < 7; i++) { + x1Req = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId1, + Collections.singletonList(x1Req), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + } + assertEquals("X1 Used Resource should be 7 GB", 7 * GB, + cs.getQueue("x1").getUsedResources().getMemorySize()); + assertEquals("P1 Used Resource should be 7 GB", 7 * GB, + cs.getQueue("p1").getUsedResources().getMemorySize()); + + x2Req = TestUtils.createResourceRequest( + ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId2, + Collections.singletonList(x2Req), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + assertEquals("X2 Used Resource should be 0", 0, + cs.getQueue("x2").getUsedResources().getMemorySize()); + assertEquals("P1 Used Resource should be 7 GB", 7 * GB, + cs.getQueue("p1").getUsedResources().getMemorySize()); + //this assign should fail + x1Req = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId1, + Collections.singletonList(x1Req), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + assertEquals("X1 Used Resource should be 7 GB", 7 * GB, + cs.getQueue("x1").getUsedResources().getMemorySize()); + assertEquals("P1 Used Resource should be 7 GB", 7 * GB, + cs.getQueue("p1").getUsedResources().getMemorySize()); + + //this should get thru + for (int i=0; i < 4; i++) { + y1Req = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId3, + Collections.singletonList(y1Req), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + } + assertEquals("P2 Used Resource should be 8 GB", 8 * GB, + cs.getQueue("p2").getUsedResources().getMemorySize()); + + //Free a container from X1 + ContainerId containerId = ContainerId.newContainerId(appAttemptId1, 2); + cs.handle(new ContainerExpiredSchedulerEvent(containerId)); + + //Schedule pending request + CapacityScheduler.schedule(cs); + assertEquals("X2 Used Resource should be 2 GB", 2 * GB, + cs.getQueue("x2").getUsedResources().getMemorySize()); + assertEquals("P1 Used Resource should be 8 GB", 8 * GB, + cs.getQueue("p1").getUsedResources().getMemorySize()); + assertEquals("P2 Used Resource should be 8 GB", 8 * GB, + cs.getQueue("p2").getUsedResources().getMemorySize()); + assertEquals("Root Used Resource should be 16 GB", 16 * GB, + cs.getRootQueue().getUsedResources().getMemorySize()); + rm.stop(); + } + + @Test + public void testCSQueueBlocked() throws Exception { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupBlockedQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + LeafQueue q = (LeafQueue) cs.getQueue("a"); + + Assert.assertNotNull(q); + String host = "127.0.0.1"; + String host1 = "test"; + RMNode node = + MockNodes.newNodeInfo(0, MockNodes.newResource(8 * GB), 1, host); + RMNode node1 = + MockNodes.newNodeInfo(0, MockNodes.newResource(8 * GB), 2, host1); + cs.handle(new NodeAddedSchedulerEvent(node)); + cs.handle(new NodeAddedSchedulerEvent(node1)); + //add app begin + ApplicationAttemptId appAttemptId1 = + appHelper(rm, cs, 100, 1, "a", "user1"); + ApplicationAttemptId appAttemptId2 = + appHelper(rm, cs, 100, 2, "b", "user2"); + //add app end + + RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + Priority priority = TestUtils.createMockPriority(1); + ResourceRequest r1 = TestUtils.createResourceRequest( + ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory); + //This will allocate for app1 + cs.allocate(appAttemptId1, Collections.singletonList(r1), + Collections.emptyList(), + null, null, null, null).getContainers().size(); + CapacityScheduler.schedule(cs); + ResourceRequest r2 = null; + for (int i =0; i < 13; i++) { + r2 = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId2, + Collections.singletonList(r2), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + } + assertEquals("A Used Resource should be 2 GB", 2 * GB, + cs.getQueue("a").getUsedResources().getMemorySize()); + assertEquals("B Used Resource should be 2 GB", 13 * GB, + cs.getQueue("b").getUsedResources().getMemorySize()); + r1 = TestUtils.createResourceRequest( + ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory); + r2 = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId1, Collections.singletonList(r1), + Collections.emptyList(), + null, null, null, null).getContainers().size(); + CapacityScheduler.schedule(cs); + + cs.allocate(appAttemptId2, Collections.singletonList(r2), + Collections.emptyList(), null, null, null, null); + CapacityScheduler.schedule(cs); + //Check blocked Resource + assertEquals("A Used Resource should be 2 GB", 2 * GB, + cs.getQueue("a").getUsedResources().getMemorySize()); + assertEquals("B Used Resource should be 13 GB", 13 * GB, + cs.getQueue("b").getUsedResources().getMemorySize()); + + ContainerId containerId1 = ContainerId.newContainerId(appAttemptId2, 10); + ContainerId containerId2 =ContainerId.newContainerId(appAttemptId2, 11); + + cs.handle(new ContainerExpiredSchedulerEvent(containerId1)); + cs.handle(new ContainerExpiredSchedulerEvent(containerId2)); + + CapacityScheduler.schedule(cs); + assertEquals("A Used Resource should be 2 GB", 4 * GB, + cs.getQueue("a").getUsedResources().getMemorySize()); + assertEquals("B Used Resource should be 12 GB", 12 * GB, + cs.getQueue("b").getUsedResources().getMemorySize()); + assertEquals("Used Resource on Root should be 16 GB", 16 * GB, + cs.getRootQueue().getUsedResources().getMemorySize()); + rm.stop(); + } + + private ApplicationAttemptId appHelper(MockRM rm, CapacityScheduler cs, + int clusterTs, int appId, String queue, + String user) { + ApplicationId appId1 = BuilderUtils.newApplicationId(clusterTs, appId); + ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId( + appId1, appId); + + RMAppAttemptMetrics attemptMetric1 = + new RMAppAttemptMetrics(appAttemptId1, rm.getRMContext()); + RMAppImpl app1 = mock(RMAppImpl.class); + when(app1.getApplicationId()).thenReturn(appId1); + RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class); + Container container = mock(Container.class); + when(attempt1.getMasterContainer()).thenReturn(container); + ApplicationSubmissionContext submissionContext = mock( + ApplicationSubmissionContext.class); + when(attempt1.getSubmissionContext()).thenReturn(submissionContext); + when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1); + when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1); + when(app1.getCurrentAppAttempt()).thenReturn(attempt1); + rm.getRMContext().getRMApps().put(appId1, app1); + + SchedulerEvent addAppEvent1 = + new AppAddedSchedulerEvent(appId1, queue, user); + cs.handle(addAppEvent1); + SchedulerEvent addAttemptEvent1 = + new AppAttemptAddedSchedulerEvent(appAttemptId1, false); + cs.handle(addAttemptEvent1); + return appAttemptId1; + } }