diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java index 2857379..f84d0f2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java @@ -61,7 +61,7 @@ public ResourceUsage() { //CACHED_USED and CACHED_PENDING may be read by anyone, but must only //be written by ordering policies USED(0), PENDING(1), AMUSED(2), RESERVED(3), CACHED_USED(4), - CACHED_PENDING(5), AMLIMIT(6); + CACHED_PENDING(5), AMLIMIT(6), BLOCKED(7); private int idx; @@ -190,6 +190,30 @@ public void incPending(String label, Resource res) { _inc(label, ResourceType.PENDING, res); } + public Resource getBlocked(String label) { + return _get(label, ResourceType.BLOCKED); + } + + public void incBlocked(String label, Resource res) { + _inc(label, ResourceType.BLOCKED, res); + } + public void incBlocked(Resource res) { + _inc(NL, ResourceType.BLOCKED, res); + } + + public void decBlocked(Resource res) { + decBlocked(NL, res); + } + public void decBlocked(String label, Resource res) { + _dec(label, ResourceType.BLOCKED, res); + } + public void setBlocked(Resource res) { + setBlocked(NL, res); + } + + public void setBlocked(String label, Resource res) { + _set(label, ResourceType.BLOCKED, res); + } public void incPending(Resource res) { incPending(NL, res); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index dc90c5b..b71fceb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -360,6 +360,13 @@ synchronized void allocateResource(Resource clusterResource, protected synchronized void releaseResource(Resource clusterResource, Resource resource, String nodePartition, boolean changeContainerResource) { queueUsage.decUsed(nodePartition, resource); + if(!Resources.equals(queueUsage.getBlocked(nodePartition), + Resources.none())) { + Resource blocked = Resources.componentwiseMax(Resources.subtractFrom( + queueUsage.getBlocked(nodePartition), resource), + Resource.newInstance(0, 0)); + queueUsage.setBlocked(nodePartition, blocked); + } CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, minimumAllocation, this, labelManager, nodePartition); @@ -428,7 +435,7 @@ private boolean isQueueHierarchyPreemptionDisabled(CSQueue q) { parentQ.getPreemptionDisabled()); } - private Resource getCurrentLimitResource(String nodePartition, + protected Resource getCurrentLimitResource(String nodePartition, Resource clusterResource, ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { @@ -478,7 +485,7 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource, currentResourceLimits, schedulingMode); Resource nowTotalUsed = queueUsage.getUsed(nodePartition); - + Resource blocked = queueUsage.getBlocked(nodePartition); // Set headroom for currentResourceLimits: // When queue is a parent queue: Headroom = limit - used + killable // When queue is a leaf queue: Headroom = limit - used (leaf queue cannot preempt itself) @@ -487,6 +494,7 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource, usedExceptKillable = Resources.subtract(nowTotalUsed, getTotalKillableResource(nodePartition)); } + usedExceptKillable = Resources.addTo(usedExceptKillable, blocked); currentResourceLimits.setHeadroom( Resources.subtract(currentLimitResource, usedExceptKillable)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java index 6406efe..5307206 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java @@ -46,6 +46,16 @@ private boolean increaseAllocation; private List containersToKill; + public Resource getBlockedRequestResource() { + return blockedRequestResource; + } + + public void setBlockedRequestResource(Resource blockedRequestResource) { + this.blockedRequestResource = blockedRequestResource; + } + + private Resource blockedRequestResource = null; + public CSAssignment(Resource resource, NodeType type) { this(resource, type, null, null, false, false); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 6dcafec..3191ce5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; @@ -936,7 +937,14 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, CSAssignment assignment = application.assignContainers(clusterResource, node, currentResourceLimits, schedulingMode, null); - + Resource blkedResource = assignment.getBlockedRequestResource(); + if(blkedResource !=null) + assignment.setBlockedRequestResource(Resources.subtract(blkedResource, + currentResourceLimits.getHeadroom())); + else { + assignment.setBlockedRequestResource(null); + queueUsage.setBlocked(node.getPartition(), Resources.none()); + } if (LOG.isDebugEnabled()) { LOG.debug("post-assignContainers for application " + application.getApplicationId()); @@ -975,6 +983,13 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, } else if (assignment.getSkipped()) { application.updateNodeInfoForAMDiagnostics(node); } else { + if(assignment.getBlockedRequestResource() != null) { + CSAssignment blockedAssignment = new CSAssignment( + Resources.createResource(0, 0), NodeType.NODE_LOCAL); + blockedAssignment.setBlockedRequestResource(assignment + .getBlockedRequestResource()); + return blockedAssignment; + } // If we don't allocate anything, and it is not skipped by application, // we will return to respect FIFO of applications return CSAssignment.NULL_ASSIGNMENT; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 6fcd6c1..155dd7d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -474,6 +474,21 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, " cluster=" + clusterResource); } else { + if(assignedToChild.getBlockedRequestResource() != null) { + assignment.setBlockedRequestResource( + assignedToChild.getBlockedRequestResource()); + Resource currentLimitResource = + getCurrentLimitResource(node.getPartition(), clusterResource, + resourceLimits, schedulingMode); + Resource usedAndBlocked = Resources.add( + queueUsage.getUsed(node.getPartition()), + assignedToChild.getBlockedRequestResource()); + if(Resources.greaterThanOrEqual(resourceCalculator, clusterResource, + usedAndBlocked, currentLimitResource)) { + setTotalBlockedResource( + assignedToChild.getBlockedRequestResource(),node); + } + } break; } @@ -500,6 +515,11 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, return assignment; } + private void setTotalBlockedResource(Resource resourceTobeAdded, + FiCaSchedulerNode node) { + queueUsage.incBlocked(node.getPartition(), resourceTobeAdded); + } + private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { // Two conditions need to meet when trying to allocate: // 1) Node doesn't have reserved container @@ -597,7 +617,8 @@ private synchronized CSAssignment assignContainersToChildQueues( // If we do assign, remove the queue and re-insert in-order to re-sort if (Resources.greaterThan( resourceCalculator, cluster, - assignment.getResource(), Resources.none())) { + assignment.getResource(), Resources.none()) || + (assignment.getBlockedRequestResource() != null)) { // Only update childQueues when we doing non-partitioned node // allocation. if (RMNodeLabelsManager.NO_LABEL.equals(node.getPartition())) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java index afac235..c194fa3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java @@ -110,6 +110,13 @@ protected CSAssignment getCSAssignmentFromAllocateResult( } assignment.setContainersToKill(result.getToKillContainers()); + } else { + if(result.state.equals(ContainerAllocation.QUEUE_SKIPPED.state)) { + if(result.resourceBlocked != Resources.none()) { + assignment.setBlockedRequestResource(result.resourceBlocked); + assignment.setResource(result.getResourceToBeAllocated()); + } + } } return assignment; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java index 8f749f6..549e115 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java @@ -55,6 +55,7 @@ RMContainer containerToBeUnreserved; private Resource resourceToBeAllocated = Resources.none(); + public Resource resourceBlocked = null; AllocationState state; NodeType containerNodeType = NodeType.NODE_LOCAL; NodeType requestNodeType = NodeType.NODE_LOCAL; @@ -67,6 +68,14 @@ public ContainerAllocation(RMContainer containerToBeUnreserved, this.resourceToBeAllocated = resourceToBeAllocated; this.state = state; } + public ContainerAllocation(RMContainer containerToBeUnreserved, + Resource resourceToBeAllocated, + Resource resourceBlocked, AllocationState state) { + this.containerToBeUnreserved = containerToBeUnreserved; + this.resourceToBeAllocated = resourceToBeAllocated; + this.resourceBlocked = resourceBlocked; + this.state = state; + } public RMContainer getContainerToBeUnreserved() { return containerToBeUnreserved; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index aae5292..9e59bb7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -139,7 +139,8 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, LOG.debug("cannot allocate required resource=" + required + " because of headroom"); } - return ContainerAllocation.QUEUE_SKIPPED; + return new ContainerAllocation(null,null, required, + AllocationState.QUEUE_SKIPPED); } // Inform the application it is about to get a scheduling opportunity diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index a88abe7..2c5a720 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -87,6 +87,13 @@ public static Resource newResource(int mem) { return rs; } + public static Resource newResource(int mem,int vCores) { + Resource rs = recordFactory.newRecordInstance(Resource.class); + rs.setMemory(mem); + rs.setVirtualCores(vCores); + return rs; + } + public static Resource newUsedResource(Resource total) { Resource rs = recordFactory.newRecordInstance(Resource.class); rs.setMemory((int)(Math.random() * total.getMemorySize())); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 7c34292..01f60f0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -113,6 +113,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event. + ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -164,6 +166,12 @@ private static final String B3 = B + ".b3"; private static float A_CAPACITY = 10.5f; private static float B_CAPACITY = 89.5f; + private static final String P1 = CapacitySchedulerConfiguration.ROOT + ".p1"; + private static final String P2 = CapacitySchedulerConfiguration.ROOT + ".p2"; + private static final String X1 = P1 + ".x1"; + private static final String X2 = P1 + ".x2"; + private static final String Y1 = P2 + ".y1"; + private static final String Y2 = P2 + ".y2"; private static float A1_CAPACITY = 30; private static float A2_CAPACITY = 70; private static float B1_CAPACITY = 79.2f; @@ -409,7 +417,51 @@ private CapacitySchedulerConfiguration setupQueueConfiguration( LOG.info("Setup top-level queues a and b"); return conf; } - + + private CapacitySchedulerConfiguration setupBlockedQueueConfiguration( + CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"}); + + conf.setCapacity(A, 80f); + conf.setCapacity(B, 20f); + conf.setUserLimitFactor(A, 100); + conf.setUserLimitFactor(B, 100); + conf.setMaximumCapacity(A, 100); + conf.setMaximumCapacity(B, 100); + LOG.info("Setup top-level queues a and b"); + return conf; + } + + private CapacitySchedulerConfiguration setupOtherBlockedQueueConfiguration( + CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{"p1", "p2"}); + + conf.setCapacity(P1, 50f); + conf.setMaximumCapacity(P1, 50f); + conf.setCapacity(P2, 50f); + conf.setMaximumCapacity(P2, 100f); + // Define 2nd-level queues + conf.setQueues(P1, new String[] {"x1", "x2"}); + conf.setCapacity(X1, 80f); + conf.setMaximumCapacity(X1,100f); + conf.setUserLimitFactor(X1, 2f); + conf.setCapacity(X2, 20f); + conf.setMaximumCapacity(X2, 100f); + conf.setUserLimitFactor(X2, 2f); + + conf.setQueues(P2, new String[]{"y1", "y2"}); + conf.setCapacity(Y1, 80f); + conf.setUserLimitFactor(Y1, 2f); + conf.setCapacity(Y2, 20f); + conf.setUserLimitFactor(Y2, 2f); + return conf; + } + @Test public void testMaximumCapacitySetup() { float delta = 0.0000001f; @@ -3373,4 +3425,270 @@ public void handle(Event event) { Assert.assertEquals(availableResource.getMemorySize(), 0); Assert.assertEquals(availableResource.getVirtualCores(), 0); } + + @Test + public void testCSReservationWithRootUnblocked() throws Exception { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupOtherBlockedQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + ParentQueue q = (ParentQueue) cs.getQueue("p1"); + + Assert.assertNotNull(q); + String host = "127.0.0.1"; + String host1 = "test"; + RMNode node = + MockNodes.newNodeInfo(0, MockNodes.newResource(8 * GB, 8), 1, host); + RMNode node1 = + MockNodes.newNodeInfo(0, MockNodes.newResource(8 * GB, 8), 2, host1); + cs.handle(new NodeAddedSchedulerEvent(node)); + cs.handle(new NodeAddedSchedulerEvent(node1)); + ApplicationAttemptId appAttemptId1 = + appHelper(rm, cs, 100, 1, "x1", "userX1"); + ApplicationAttemptId appAttemptId2 = + appHelper(rm, cs, 100, 2, "x2", "userX2"); + ApplicationAttemptId appAttemptId3 = + appHelper(rm, cs, 100, 3, "y1", "userY1"); + RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + Priority priority = TestUtils.createMockPriority(1); + ResourceRequest y1Req = null; + ResourceRequest x1Req = null; + ResourceRequest x2Req = null; + for(int i=0;i<4;i++) { + y1Req = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId3, + Collections.singletonList(y1Req), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + } + assertEquals("Y1 Used Resource should be 4 GB",4 * GB, + cs.getQueue("y1").getUsedResources().getMemorySize()); + assertEquals("P2 Used Resource should be 4 GB", 4 * GB, + cs.getQueue("p2").getUsedResources().getMemorySize()); + + for(int i=0;i<7;i++) { + x1Req = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId1, + Collections.singletonList(x1Req), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + } + assertEquals("X1 Used Resource should be 7 GB", 7 * GB, + cs.getQueue("x1").getUsedResources().getMemorySize()); + assertEquals("P1 Used Resource should be 7 GB", 7 * GB, + cs.getQueue("p1").getUsedResources().getMemorySize()); + assertEquals("P1 Blocked Resource should be 0", 0, + cs.getQueue("p1").getQueueResourceUsage().getBlocked(""). + getMemorySize()); + + x2Req = TestUtils.createResourceRequest( + ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId2, + Collections.singletonList(x2Req), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + assertEquals("X2 Used Resource should be 0", 0, + cs.getQueue("x2").getUsedResources().getMemorySize()); + assertEquals("P1 Used Resource should be 7 GB", 7 * GB, + cs.getQueue("p1").getUsedResources().getMemorySize()); + assertEquals("P1 Blocked Resource should be 1 GB", 1 * GB, + cs.getQueue("p1").getQueueResourceUsage().getBlocked(""). + getMemorySize()); + assertEquals("X2 Blocked Resource should be 0", 0, + cs.getQueue("x2").getQueueResourceUsage().getBlocked(""). + getMemorySize()); + assertEquals("Root Queue Blocked Resource should be 0", 0, + cs.getRootQueue().getQueueResourceUsage().getBlocked(""). + getMemorySize()); + //this assign should fail + x1Req = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId1, + Collections.singletonList(x1Req), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + + assertEquals("P1 Used Resource should be 7 GB", 7 * GB, + cs.getQueue("p1").getUsedResources().getMemorySize()); + + //this should get thru + for (int i=0;i<4;i++) { + y1Req = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId3, + Collections.singletonList(y1Req), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + } + assertEquals("P2 Used Resource should be 8 GB", 8 * GB, + cs.getQueue("p2").getUsedResources().getMemorySize()); + + //Free a container from X1 + ContainerId containerId = ContainerId.newContainerId(appAttemptId1, 2); + cs.handle(new ContainerExpiredSchedulerEvent(containerId)); + assertEquals("P1 Blocked Resource should be 0", 0, + cs.getQueue("p1").getQueueResourceUsage().getBlocked(""). + getMemorySize()); + assertEquals("X2 Blocked Resource should be 0", 0, + cs.getQueue("x2").getQueueResourceUsage().getBlocked(""). + getMemorySize()); + assertEquals("Root Queue Blocked Resource should be 0", 0, + cs.getRootQueue().getQueueResourceUsage().getBlocked(""). + getMemorySize()); + //Schedule pending request + CapacityScheduler.schedule(cs); + assertEquals("X2 Used Resource should be 2 GB", 2 * GB, + cs.getQueue("x2").getUsedResources().getMemorySize()); + assertEquals("P1 Used Resource should be 8 GB", 8 * GB, + cs.getQueue("p1").getUsedResources().getMemorySize()); + assertEquals("P2 Used Resource should be 8 GB", 8 * GB, + cs.getQueue("p2").getUsedResources().getMemorySize()); + assertEquals("Root Used Resource should be 16 GB", 16 * GB, + cs.getRootQueue().getUsedResources().getMemorySize()); + + } + + @Test + public void testCSReservationInversion() throws Exception { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupBlockedQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + LeafQueue q = (LeafQueue) cs.getQueue("a"); + + Assert.assertNotNull(q); + String host = "127.0.0.1"; + String host1 = "test"; + RMNode node = + MockNodes.newNodeInfo(0, MockNodes.newResource(8 * GB), 1, host); + RMNode node1 = + MockNodes.newNodeInfo(0, MockNodes.newResource(8 * GB), 2, host1); + cs.handle(new NodeAddedSchedulerEvent(node)); + cs.handle(new NodeAddedSchedulerEvent(node1)); + //add app begin + ApplicationAttemptId appAttemptId1 = + appHelper(rm, cs, 100, 1, "a", "user1"); + ApplicationAttemptId appAttemptId2 = appHelper(rm,cs,100,2,"b","user2"); + //add app end + + RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + Priority priority = TestUtils.createMockPriority(1); + ResourceRequest r1 = TestUtils.createResourceRequest( + ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory); + //This will allocate for app1 + cs.allocate(appAttemptId1, Collections.singletonList(r1), + Collections.emptyList(), + null, null, null, null).getContainers().size(); + CapacityScheduler.schedule(cs); + ResourceRequest r2 = null; + for (int i =0; i < 13; i++) { + r2 = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId2, + Collections.singletonList(r2), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + } + assertEquals("A Used Resource should be 2 GB", 2 * GB, + cs.getQueue("a").getUsedResources().getMemory()); + assertEquals("B Used Resource should be 2 GB", 13 * GB, + cs.getQueue("b").getUsedResources().getMemory()); + r1 = TestUtils.createResourceRequest( + ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory); + r2 = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId1, Collections.singletonList(r1), + Collections.emptyList(), + null, null, null, null).getContainers().size(); + CapacityScheduler.schedule(cs); + + cs.allocate + (appAttemptId2, + Collections.singletonList(r2), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + //Check blocked Resource + assertEquals("A Used Resource should be 2 GB", 2 * GB, + cs.getQueue("a").getUsedResources().getMemorySize()); + assertEquals("B Used Resource should be 13 GB", 13 * GB, + cs.getQueue("b").getUsedResources().getMemorySize()); + assertEquals("Blocked Resource on Root should be 1 GB", 1 * GB, + cs.getRootQueue().getQueueResourceUsage().getBlocked(""). + getMemorySize()); + ContainerId containerId1 = ContainerId.newContainerId(appAttemptId2, 10); + ContainerId containerId2 =ContainerId.newContainerId(appAttemptId2, 11); + + cs.handle(new ContainerExpiredSchedulerEvent(containerId1)); + cs.handle(new ContainerExpiredSchedulerEvent(containerId2)); + assertEquals("Blocked Resource on Root should be 0 GB", 0, + cs.getRootQueue().getQueueResourceUsage().getBlocked(""). + getMemorySize()); + + CapacityScheduler.schedule(cs); + assertEquals("A Used Resource should be 2 GB", 4 * GB, + cs.getQueue("a").getUsedResources().getMemorySize()); + assertEquals("B Used Resource should be 13 GB", 12 * GB, + cs.getQueue("b").getUsedResources().getMemorySize()); + assertEquals("Used Resource on Root should be 16 GB", 16 * GB, + cs.getRootQueue().getUsedResources().getMemorySize()); + assertEquals("Blocked Resource on Root should be 0", 0, + cs.getRootQueue().getQueueResourceUsage().getBlocked(""). + getMemorySize()); + assertEquals("Blocked Resource on A should be 0", 0 , + cs.getQueue("a").getQueueResourceUsage().getBlocked(""). + getMemorySize()); + assertEquals("Blocked Resource on B should be 0", 0 , + cs.getQueue("b").getQueueResourceUsage().getBlocked(""). + getMemorySize()); + } + + private ApplicationAttemptId appHelper(MockRM rm, CapacityScheduler cs, + int clusterTs, int appId, String queue, + String user) { + ApplicationId appId1 = BuilderUtils.newApplicationId(clusterTs, appId); + ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId( + appId1, appId); + + RMAppAttemptMetrics attemptMetric1 = + new RMAppAttemptMetrics(appAttemptId1, rm.getRMContext()); + RMAppImpl app1 = mock(RMAppImpl.class); + when(app1.getApplicationId()).thenReturn(appId1); + RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class); + Container container = mock(Container.class); + when(attempt1.getMasterContainer()).thenReturn(container); + ApplicationSubmissionContext submissionContext = mock( + ApplicationSubmissionContext.class); + when(attempt1.getSubmissionContext()).thenReturn(submissionContext); + when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1); + when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1); + when(app1.getCurrentAppAttempt()).thenReturn(attempt1); + rm.getRMContext().getRMApps().put(appId1, app1); + + SchedulerEvent addAppEvent1 = + new AppAddedSchedulerEvent(appId1, queue, user); + cs.handle(addAppEvent1); + SchedulerEvent addAttemptEvent1 = + new AppAttemptAddedSchedulerEvent(appAttemptId1, false); + cs.handle(addAttemptEvent1); + return appAttemptId1; + } }