diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index a8c62fd..c27b024 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -36,6 +36,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -123,7 +124,8 @@ } // preempt other containers - Resource skippedAMSize = Resource.newInstance(0, 0); + Map skippedAMSizePerPartition = + new HashMap(); Iterator desc = leafQueue.getOrderingPolicy().getPreemptionIterator(); while (desc.hasNext()) { @@ -136,20 +138,22 @@ } preemptFrom(fc, clusterResource, resToObtainByPartition, - skippedAMContainerlist, skippedAMSize, selectedCandidates, + skippedAMContainerlist, skippedAMSizePerPartition, selectedCandidates, totalPreemptionAllowed); } // Can try preempting AMContainers (still saving atmost // maxAMCapacityForThisQueue AMResource's) if more resources are // required to be preemptionCandidates from this Queue. - Resource maxAMCapacityForThisQueue = Resources.multiply( - Resources.multiply(clusterResource, - leafQueue.getAbsoluteCapacity()), - leafQueue.getMaxAMResourcePerQueuePercent()); + Map maxAMCapacityPerPartition = + new HashMap(); + for (String partition : resToObtainByPartition.keySet()) { + maxAMCapacityPerPartition.put(partition, Resources + .clone(leafQueue.getAMResourceLimitPerPartition(partition))); + } preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist, - resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue, + resToObtainByPartition, skippedAMSizePerPartition, maxAMCapacityPerPartition, totalPreemptionAllowed); } } @@ -171,17 +175,27 @@ private void preemptAMContainers(Resource clusterResource, Map> preemptMap, List skippedAMContainerlist, - Map resToObtainByPartition, Resource skippedAMSize, - Resource maxAMCapacityForThisQueue, Resource totalPreemptionAllowed) { + Map resToObtainByPartition, + Map skippedAMSizePerPartition, + Map maxAMCapacityPerPartition, + Resource totalPreemptionAllowed) { for (RMContainer c : skippedAMContainerlist) { // Got required amount of resources for preemption, can stop now if (resToObtainByPartition.isEmpty()) { break; } - // Once skippedAMSize reaches down to maxAMCapacityForThisQueue, + + Resource maxAMCapacity = maxAMCapacityPerPartition + .get(c.getNodeLabelExpression()); + // skippedAMSizePerPartition will have this container as + // preemptFrom method keeps a one to one mapping between both map. + Resource skippedAMSize = skippedAMSizePerPartition + .get(c.getNodeLabelExpression()); + + // Once skippedAMSize reaches down to maxAMCapacityPerPartition, // container selection iteration for preemption will be stopped. if (Resources.lessThanOrEqual(rc, clusterResource, skippedAMSize, - maxAMCapacityForThisQueue)) { + (maxAMCapacity == null) ? Resources.unbounded() : maxAMCapacity)) { break; } @@ -262,9 +276,10 @@ private String getPartitionByNodeId(NodeId nodeId) { * to preempt (after unreserving all reservation for that app). */ @SuppressWarnings("unchecked") - private void preemptFrom(FiCaSchedulerApp app, - Resource clusterResource, Map resToObtainByPartition, - List skippedAMContainerlist, Resource skippedAMSize, + private void preemptFrom(FiCaSchedulerApp app, Resource clusterResource, + Map resToObtainByPartition, + List skippedAMContainerlist, + Map skippedAMSizePerPartition, Map> selectedContainers, Resource totalPreemptionAllowed) { ApplicationAttemptId appId = app.getApplicationAttemptId(); @@ -323,6 +338,14 @@ private void preemptFrom(FiCaSchedulerApp app, // Skip AM Container from preemption for now. if (c.isAMContainer()) { skippedAMContainerlist.add(c); + Resource skippedAMSize = Resources.createResource(0, 0); + + if (null == (skippedAMSize = skippedAMSizePerPartition + .get(c.getNodeLabelExpression()))) { + skippedAMSize = Resource.newInstance(0, 0); + skippedAMSizePerPartition.put(c.getNodeLabelExpression(), + skippedAMSize); + } Resources.addTo(skippedAMSize, c.getAllocatedResource()); continue; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index 7c8fb2a..d5bbd60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -92,6 +92,7 @@ EventHandler mDisp = null; ProportionalCapacityPreemptionPolicy policy = null; Resource clusterResource = null; + Resource setAMResourceLimit = Resource.newInstance(0, 0); @SuppressWarnings("unchecked") @Before @@ -521,8 +522,11 @@ private void setupQueue(CSQueue queue, String q, String[] queueExprArray, ru.setPending(partitionName, pending); if (!isParent(queueExprArray, idx)) { LeafQueue lq = (LeafQueue) queue; - when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class), - isA(String.class))).thenReturn(pending); + when(lq.getTotalPendingResourcesConsideringUserLimit( + isA(Resource.class), isA(String.class))).thenReturn(pending); + when(lq.getAMResourceLimitPerPartition(partitionName)) + .thenReturn((setAMResourceLimit.equals(Resources.none())) + ? Resource.newInstance(0, 0) : setAMResourceLimit); } ru.setUsed(partitionName, parseResourceFromString(values[2].trim())); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index e3ef8c2..63ad363 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -96,7 +96,7 @@ int appAlloc = 0; boolean setAMContainer = false; boolean setLabeledContainer = false; - float setAMResourcePercent = 0.0f; + Resource setAMResourceLimit = Resource.newInstance(0, 0); Random rand = null; Clock mClock = null; CapacitySchedulerConfiguration conf = null; @@ -882,10 +882,10 @@ public void testAMResourcePercentForSkippedAMContainers() { { 2, 0, 0 }, // subqueues }; setAMContainer = true; - setAMResourcePercent = 0.5f; + setAMResourceLimit = Resources.createResource(5, 0); ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); - + // AMResoucePercent is 50% of cluster and maxAMCapacity will be 5Gb. // Total used AM container size is 20GB, hence 2 AM container has // to be preempted as Queue Capacity is 10Gb. @@ -1260,8 +1260,9 @@ public Object answer(InvocationOnMock invocation) { } }); when(lq.getOrderingPolicy()).thenReturn(so); - if(setAMResourcePercent != 0.0f){ - when(lq.getMaxAMResourcePerQueuePercent()).thenReturn(setAMResourcePercent); + if (setAMResourceLimit != null) { + when(lq.getAMResourceLimitPerPartition("")).thenReturn( + setAMResourceLimit); } p.getChildQueues().add(lq); return lq; @@ -1323,6 +1324,7 @@ RMContainer mockContainer(ApplicationAttemptId appAttId, int id, when(mC.getContainer()).thenReturn(c); when(mC.getApplicationAttemptId()).thenReturn(appAttId); when(mC.getAllocatedResource()).thenReturn(r); + when(mC.getNodeLabelExpression()).thenReturn(""); if (priority.AMCONTAINER.getValue() == cpriority) { when(mC.isAMContainer()).thenReturn(true); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java index e31a889..7e7d6f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Before; import org.junit.Test; @@ -614,4 +615,68 @@ public void testNodePartitionPreemptionWithVCoreResource() throws IOException { verify(mDisp, never()).handle( argThat(new IsPreemptionRequestFor(getAppAttemptId(3)))); } + + @Test + public void testNodePartitionPreemptionOfAMContainerWithLimits() + throws IOException { + /** + *
+     *       root
+     *       /  \
+     *      a    b
+     * 
+ * + * Both a/b can access x, and guaranteed capacity of them is 2:98. Two + * nodes, n1 has 100 x, n2 has 100 NO_LABEL. + * + * app1/app2/app3/app4/app5 in a, both uses 20 resources(x) + * + * b has 100 pending resource of x + * + * After preemption, it should preempt 20 from app4/app5 an 19 from + * app1-app3. App4/app5's AM container will be preempted. Ideally App3's + * AM container also was supposed to be preempted. Since maxAMLimit was + * 3, PCPP will ensure that 3 AM container will run atleast. + */ + String labelsConfig = + "=100,true;" + // default partition + "x=100,true"; // partition=x + String nodesConfig = + "n1=x;" + // n1 has partition=x + "n2="; // n2 is default partition + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100],x=[100 100 100 100]);" + //root + "-a(=[50 100 0 0],x=[2 100 100 50]);" + // a + "-b(=[50 100 0 0],x=[98 100 0 100])"; // b + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,1,n1,x,20,false);" + // uses 20 resource + "a\t" // app2 in a + + "(1,1,n1,x,20,false);" + // uses 20 resource + "a\t" // app3 in a + + "(1,1,n1,x,20,false);" + // uses 20 resource + "a\t" // app4 in a + + "(1,1,n1,x,20,false);" + // uses 20 resource + "a\t" // app5 in a + + "(1,1,n1,x,20,false);"; // uses 20 resource + + setAMResourceLimit = Resources.createResource(3, 0); + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // 4 from app1 + verify(mDisp, times(19)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(1)))); + // 19 from app2-app5 + verify(mDisp, times(19)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(2)))); + verify(mDisp, times(19)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(3)))); + verify(mDisp, times(20)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(4)))); + verify(mDisp, times(20)).handle( + argThat(new IsPreemptionRequestFor(getAppAttemptId(5)))); + } } -- 2.7.4 (Apple Git-66)