diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java index 6406efe..5307206 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java @@ -46,6 +46,16 @@ private boolean increaseAllocation; private List containersToKill; + public Resource getBlockedRequestResource() { + return blockedRequestResource; + } + + public void setBlockedRequestResource(Resource blockedRequestResource) { + this.blockedRequestResource = blockedRequestResource; + } + + private Resource blockedRequestResource = null; + public CSAssignment(Resource resource, NodeType type) { this(resource, type, null, null, false, false); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 6dcafec..832cf45 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; @@ -892,9 +893,11 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, } // if our queue cannot access this node, just return + CSAssignment nullAssignment = new CSAssignment( + Resources.createResource(0, 0), NodeType.NODE_LOCAL); if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY && !accessibleToPartition(node.getPartition())) { - return CSAssignment.NULL_ASSIGNMENT; + return nullAssignment; } // Check if this queue need more resource, simply skip allocation if this @@ -906,7 +909,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, + ", because it doesn't need more resource, schedulingMode=" + schedulingMode.name() + " node-partition=" + node.getPartition()); } - return CSAssignment.NULL_ASSIGNMENT; + return nullAssignment; } for (Iterator assignmentIterator = @@ -917,7 +920,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), currentResourceLimits, application.getCurrentReservation(), schedulingMode)) { - return CSAssignment.NULL_ASSIGNMENT; + return nullAssignment; } Resource userLimit = @@ -936,7 +939,11 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, CSAssignment assignment = application.assignContainers(clusterResource, node, currentResourceLimits, schedulingMode, null); - + Resource blkedResource = assignment.getBlockedRequestResource(); + if(blkedResource !=null) { + assignment.setBlockedRequestResource(Resources.subtract(blkedResource, + currentResourceLimits.getHeadroom())); + } if (LOG.isDebugEnabled()) { LOG.debug("post-assignContainers for application " + application.getApplicationId()); @@ -975,13 +982,16 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, } else if (assignment.getSkipped()) { application.updateNodeInfoForAMDiagnostics(node); } else { + if(assignment.getBlockedRequestResource() != null) { + return assignment; + } // If we don't allocate anything, and it is not skipped by application, // we will return to respect FIFO of applications - return CSAssignment.NULL_ASSIGNMENT; + return nullAssignment; } } - return CSAssignment.NULL_ASSIGNMENT; + return nullAssignment; } protected Resource getHeadroom(User user, Resource queueCurrentLimit, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 6fcd6c1..75642c3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -385,6 +385,9 @@ private synchronized void removeApplication(ApplicationId applicationId, public synchronized CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits resourceLimits, SchedulingMode schedulingMode) { + CSAssignment nullAssignment = new CSAssignment(Resources.createResource(0, + 0), NodeType.NODE_LOCAL); + // if our queue cannot access this node, just return if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY && !accessibleToPartition(node.getPartition())) { @@ -393,7 +396,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, + ", because it is not able to access partition=" + node .getPartition()); } - return CSAssignment.NULL_ASSIGNMENT; + return nullAssignment; } // Check if this queue need more resource, simply skip allocation if this @@ -405,7 +408,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, + ", because it doesn't need more resource, schedulingMode=" + schedulingMode.name() + " node-partition=" + node.getPartition()); } - return CSAssignment.NULL_ASSIGNMENT; + return nullAssignment; } CSAssignment assignment = @@ -474,6 +477,10 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, " cluster=" + clusterResource); } else { + if(assignedToChild.getBlockedRequestResource() != null) { + assignment.setBlockedRequestResource( + assignedToChild.getBlockedRequestResource()); + } break; } @@ -572,7 +579,10 @@ private synchronized CSAssignment assignContainersToChildQueues( new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); printChildQueues(); - + ResourceLimits blockedLimits = + new ResourceLimits(Resource.newInstance(0,0)); + ResourceLimits finalBlockedLimits = + new ResourceLimits(Resource.newInstance(0,0)); // Try to assign to most 'under-served' sub-queue for (Iterator iter = sortAndGetChildrenAllocationIterator(node); iter .hasNext();) { @@ -584,9 +594,12 @@ private synchronized CSAssignment assignContainersToChildQueues( // Get ResourceLimits of child queue before assign containers ResourceLimits childLimits = - getResourceLimitsOfChild(childQueue, cluster, limits, node.getPartition()); - - assignment = childQueue.assignContainers(cluster, node, + getResourceLimitsOfChild( + childQueue, cluster, limits, node.getPartition()); + childLimits.setLimit(Resources.max( + resourceCalculator,cluster, Resources.subtract(childLimits.getLimit(), + blockedLimits.getLimit()), Resources.none())); + assignment = childQueue.assignContainers(cluster, node, childLimits, schedulingMode); if(LOG.isDebugEnabled()) { LOG.debug("Assigned to queue: " + childQueue.getQueuePath() + @@ -611,9 +624,17 @@ private synchronized CSAssignment assignContainersToChildQueues( } } break; + } else { + if(assignment.getBlockedRequestResource() != null) { + blockedLimits = new ResourceLimits( + assignment.getBlockedRequestResource()); + finalBlockedLimits = new ResourceLimits( + Resources.add(finalBlockedLimits.getLimit(), + assignment.getBlockedRequestResource())); + } } } - + assignment.setBlockedRequestResource(finalBlockedLimits.getLimit()); return assignment; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java index afac235..0b191eb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java @@ -110,6 +110,14 @@ protected CSAssignment getCSAssignmentFromAllocateResult( } assignment.setContainersToKill(result.getToKillContainers()); + } else { + if(result.state.equals(ContainerAllocation.QUEUE_SKIPPED.state)) { + Resource resourceBlocked = result.getResourceBlocked(); + if(resourceBlocked != null) { + assignment.setBlockedRequestResource(resourceBlocked); + assignment.setResource(result.getResourceToBeAllocated()); + } + } } return assignment; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java index 8f749f6..2ec204d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java @@ -55,6 +55,7 @@ RMContainer containerToBeUnreserved; private Resource resourceToBeAllocated = Resources.none(); + private Resource resourceBlocked = null; AllocationState state; NodeType containerNodeType = NodeType.NODE_LOCAL; NodeType requestNodeType = NodeType.NODE_LOCAL; @@ -68,6 +69,15 @@ public ContainerAllocation(RMContainer containerToBeUnreserved, this.state = state; } + public ContainerAllocation(RMContainer containerToBeUnreserved, + Resource resourceToBeAllocated, + Resource resourceBlocked, AllocationState state) { + this.containerToBeUnreserved = containerToBeUnreserved; + this.resourceToBeAllocated = resourceToBeAllocated; + this.resourceBlocked = resourceBlocked; + this.state = state; + } + public RMContainer getContainerToBeUnreserved() { return containerToBeUnreserved; } @@ -98,4 +108,12 @@ public void setToKillContainers(List toKillContainers) { public List getToKillContainers() { return toKillContainers; } + + public Resource getResourceBlocked() { + return resourceBlocked; + } + + public void setResourceBlocked(Resource resourceBlocked) { + this.resourceBlocked = resourceBlocked; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index aae5292..7881982 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -139,7 +139,8 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, LOG.debug("cannot allocate required resource=" + required + " because of headroom"); } - return ContainerAllocation.QUEUE_SKIPPED; + return new ContainerAllocation(null, null, required, + AllocationState.QUEUE_SKIPPED); } // Inform the application it is about to get a scheduling opportunity diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index a88abe7..9dd954c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -87,6 +87,13 @@ public static Resource newResource(int mem) { return rs; } + public static Resource newResource(int mem, int vCores) { + Resource rs = recordFactory.newRecordInstance(Resource.class); + rs.setMemory(mem); + rs.setVirtualCores(vCores); + return rs; + } + public static Resource newUsedResource(Resource total) { Resource rs = recordFactory.newRecordInstance(Resource.class); rs.setMemory((int)(Math.random() * total.getMemorySize())); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 7c34292..dc6d96f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -113,6 +113,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event. + ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -164,6 +166,12 @@ private static final String B3 = B + ".b3"; private static float A_CAPACITY = 10.5f; private static float B_CAPACITY = 89.5f; + private static final String P1 = CapacitySchedulerConfiguration.ROOT + ".p1"; + private static final String P2 = CapacitySchedulerConfiguration.ROOT + ".p2"; + private static final String X1 = P1 + ".x1"; + private static final String X2 = P1 + ".x2"; + private static final String Y1 = P2 + ".y1"; + private static final String Y2 = P2 + ".y2"; private static float A1_CAPACITY = 30; private static float A2_CAPACITY = 70; private static float B1_CAPACITY = 79.2f; @@ -409,7 +417,52 @@ private CapacitySchedulerConfiguration setupQueueConfiguration( LOG.info("Setup top-level queues a and b"); return conf; } - + + private CapacitySchedulerConfiguration setupBlockedQueueConfiguration( + CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{"a", "b"}); + + conf.setCapacity(A, 80f); + conf.setCapacity(B, 20f); + conf.setUserLimitFactor(A, 100); + conf.setUserLimitFactor(B, 100); + conf.setMaximumCapacity(A, 100); + conf.setMaximumCapacity(B, 100); + LOG.info("Setup top-level queues a and b"); + return conf; + } + + private CapacitySchedulerConfiguration setupOtherBlockedQueueConfiguration( + CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{"p1", "p2"}); + + conf.setCapacity(P1, 50f); + conf.setMaximumCapacity(P1, 50f); + conf.setCapacity(P2, 50f); + conf.setMaximumCapacity(P2, 100f); + // Define 2nd-level queues + conf.setQueues(P1, new String[] {"x1", "x2"}); + conf.setCapacity(X1, 80f); + conf.setMaximumCapacity(X1, 100f); + conf.setUserLimitFactor(X1, 2f); + conf.setCapacity(X2, 20f); + conf.setMaximumCapacity(X2, 100f); + conf.setUserLimitFactor(X2, 2f); + + conf.setQueues(P2, new String[]{"y1", "y2"}); + conf.setCapacity(Y1, 80f); + conf.setUserLimitFactor(Y1, 2f); + conf.setCapacity(Y2, 20f); + conf.setUserLimitFactor(Y2, 2f); + return conf; + } + @Test public void testMaximumCapacitySetup() { float delta = 0.0000001f; @@ -3373,4 +3426,236 @@ public void handle(Event event) { Assert.assertEquals(availableResource.getMemorySize(), 0); Assert.assertEquals(availableResource.getVirtualCores(), 0); } + + @Test + public void testCSReservationWithRootUnblocked() throws Exception { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupOtherBlockedQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + ParentQueue q = (ParentQueue) cs.getQueue("p1"); + + Assert.assertNotNull(q); + String host = "127.0.0.1"; + String host1 = "test"; + RMNode node = + MockNodes.newNodeInfo(0, MockNodes.newResource(8 * GB, 8), 1, host); + RMNode node1 = + MockNodes.newNodeInfo(0, MockNodes.newResource(8 * GB, 8), 2, host1); + cs.handle(new NodeAddedSchedulerEvent(node)); + cs.handle(new NodeAddedSchedulerEvent(node1)); + ApplicationAttemptId appAttemptId1 = + appHelper(rm, cs, 100, 1, "x1", "userX1"); + ApplicationAttemptId appAttemptId2 = + appHelper(rm, cs, 100, 2, "x2", "userX2"); + ApplicationAttemptId appAttemptId3 = + appHelper(rm, cs, 100, 3, "y1", "userY1"); + RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + Priority priority = TestUtils.createMockPriority(1); + ResourceRequest y1Req = null; + ResourceRequest x1Req = null; + ResourceRequest x2Req = null; + for(int i=0; i < 4; i++) { + y1Req = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId3, + Collections.singletonList(y1Req), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + } + assertEquals("Y1 Used Resource should be 4 GB", 4 * GB, + cs.getQueue("y1").getUsedResources().getMemorySize()); + assertEquals("P2 Used Resource should be 4 GB", 4 * GB, + cs.getQueue("p2").getUsedResources().getMemorySize()); + + for(int i=0; i < 7; i++) { + x1Req = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId1, + Collections.singletonList(x1Req), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + } + assertEquals("X1 Used Resource should be 7 GB", 7 * GB, + cs.getQueue("x1").getUsedResources().getMemorySize()); + assertEquals("P1 Used Resource should be 7 GB", 7 * GB, + cs.getQueue("p1").getUsedResources().getMemorySize()); + + x2Req = TestUtils.createResourceRequest( + ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId2, + Collections.singletonList(x2Req), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + assertEquals("X2 Used Resource should be 0", 0, + cs.getQueue("x2").getUsedResources().getMemorySize()); + assertEquals("P1 Used Resource should be 7 GB", 7 * GB, + cs.getQueue("p1").getUsedResources().getMemorySize()); + + //this assign should fail + x1Req = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId1, + Collections.singletonList(x1Req), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + + assertEquals("P1 Used Resource should be 7 GB", 7 * GB, + cs.getQueue("p1").getUsedResources().getMemorySize()); + + //this should get thru + for (int i=0;i<4;i++) { + y1Req = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId3, + Collections.singletonList(y1Req), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + } + assertEquals("P2 Used Resource should be 8 GB", 8 * GB, + cs.getQueue("p2").getUsedResources().getMemorySize()); + + //Free a container from X1 + ContainerId containerId = ContainerId.newContainerId(appAttemptId1, 2); + cs.handle(new ContainerExpiredSchedulerEvent(containerId)); + + //Schedule pending request + CapacityScheduler.schedule(cs); + assertEquals("X2 Used Resource should be 2 GB", 2 * GB, + cs.getQueue("x2").getUsedResources().getMemorySize()); + assertEquals("P1 Used Resource should be 8 GB", 8 * GB, + cs.getQueue("p1").getUsedResources().getMemorySize()); + assertEquals("P2 Used Resource should be 8 GB", 8 * GB, + cs.getQueue("p2").getUsedResources().getMemorySize()); + assertEquals("Root Used Resource should be 16 GB", 16 * GB, + cs.getRootQueue().getUsedResources().getMemorySize()); + rm.stop(); + } + + @Test + public void testCSQueueBlocked() throws Exception { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupBlockedQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + LeafQueue q = (LeafQueue) cs.getQueue("a"); + + Assert.assertNotNull(q); + String host = "127.0.0.1"; + String host1 = "test"; + RMNode node = + MockNodes.newNodeInfo(0, MockNodes.newResource(8 * GB), 1, host); + RMNode node1 = + MockNodes.newNodeInfo(0, MockNodes.newResource(8 * GB), 2, host1); + cs.handle(new NodeAddedSchedulerEvent(node)); + cs.handle(new NodeAddedSchedulerEvent(node1)); + //add app begin + ApplicationAttemptId appAttemptId1 = + appHelper(rm, cs, 100, 1, "a", "user1"); + ApplicationAttemptId appAttemptId2 = + appHelper(rm, cs, 100, 2, "b", "user2"); + //add app end + + RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + Priority priority = TestUtils.createMockPriority(1); + ResourceRequest r1 = TestUtils.createResourceRequest( + ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory); + //This will allocate for app1 + cs.allocate(appAttemptId1, Collections.singletonList(r1), + Collections.emptyList(), + null, null, null, null).getContainers().size(); + CapacityScheduler.schedule(cs); + ResourceRequest r2 = null; + for (int i =0; i < 13; i++) { + r2 = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId2, + Collections.singletonList(r2), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + } + assertEquals("A Used Resource should be 2 GB", 2 * GB, + cs.getQueue("a").getUsedResources().getMemorySize()); + assertEquals("B Used Resource should be 2 GB", 13 * GB, + cs.getQueue("b").getUsedResources().getMemorySize()); + r1 = TestUtils.createResourceRequest( + ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory); + r2 = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId1, Collections.singletonList(r1), + Collections.emptyList(), + null, null, null, null).getContainers().size(); + CapacityScheduler.schedule(cs); + + cs.allocate(appAttemptId2, Collections.singletonList(r2), + Collections.emptyList(), null, null, null, null); + CapacityScheduler.schedule(cs); + //Check blocked Resource + assertEquals("A Used Resource should be 2 GB", 2 * GB, + cs.getQueue("a").getUsedResources().getMemorySize()); + assertEquals("B Used Resource should be 13 GB", 13 * GB, + cs.getQueue("b").getUsedResources().getMemorySize()); + + ContainerId containerId1 = ContainerId.newContainerId(appAttemptId2, 10); + ContainerId containerId2 =ContainerId.newContainerId(appAttemptId2, 11); + + cs.handle(new ContainerExpiredSchedulerEvent(containerId1)); + cs.handle(new ContainerExpiredSchedulerEvent(containerId2)); + + CapacityScheduler.schedule(cs); + assertEquals("A Used Resource should be 2 GB", 4 * GB, + cs.getQueue("a").getUsedResources().getMemorySize()); + assertEquals("B Used Resource should be 12 GB", 12 * GB, + cs.getQueue("b").getUsedResources().getMemorySize()); + assertEquals("Used Resource on Root should be 16 GB", 16 * GB, + cs.getRootQueue().getUsedResources().getMemorySize()); + rm.stop(); + } + + private ApplicationAttemptId appHelper(MockRM rm, CapacityScheduler cs, + int clusterTs, int appId, String queue, + String user) { + ApplicationId appId1 = BuilderUtils.newApplicationId(clusterTs, appId); + ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId( + appId1, appId); + + RMAppAttemptMetrics attemptMetric1 = + new RMAppAttemptMetrics(appAttemptId1, rm.getRMContext()); + RMAppImpl app1 = mock(RMAppImpl.class); + when(app1.getApplicationId()).thenReturn(appId1); + RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class); + Container container = mock(Container.class); + when(attempt1.getMasterContainer()).thenReturn(container); + ApplicationSubmissionContext submissionContext = mock( + ApplicationSubmissionContext.class); + when(attempt1.getSubmissionContext()).thenReturn(submissionContext); + when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1); + when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1); + when(app1.getCurrentAppAttempt()).thenReturn(attempt1); + rm.getRMContext().getRMApps().put(appId1, app1); + + SchedulerEvent addAppEvent1 = + new AppAddedSchedulerEvent(appId1, queue, user); + cs.handle(addAppEvent1); + SchedulerEvent addAttemptEvent1 = + new AppAttemptAddedSchedulerEvent(appAttemptId1, false); + cs.handle(addAttemptEvent1); + return appAttemptId1; + } }