diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index d9e9091bc86..b294ea8ab0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -100,7 +101,7 @@ // sure such containers will be preemptionCandidates first Map> ignorePartitionExclusivityContainers = leafQueue.getIgnoreExclusivityRMContainers(); - for (String partition : resToObtainByPartition.keySet()) { + for (String partition : new HashSet(resToObtainByPartition.keySet())) { if (ignorePartitionExclusivityContainers.containsKey(partition)) { TreeSet rmContainers = ignorePartitionExclusivityContainers.get(partition); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java index 27208020185..f4cb0eb7a09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java @@ -246,4 +246,58 @@ public void testInterQueuePreemptionWithStrictAndRelaxedDRF() new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(1)))); } + + @Test + public void testResourceTypesInterQueuePreemptionWithThreePartitions() throws IOException { + /* + * root + * / \ \ + * batch a b + * + * Each of batch, a, b have 100:100 resources + * All partitions are non-exclusive. + * Total cluster resource have 300:300 + * app1 on batch uses mem=100, cpu=100 from batch, mem=50, cpu=50 from a, mem=50, cpu=50 from b + * app2 on a uses mem=50, cpu=50 from a and requests mem=50, cpu=50 more from a + * app3 on a uses mem=50, cpu=50 from b and requests mem=50, cpu=50 more from b + * + * We expect it can preempt from app1 successfully + */ + String labelsConfig = "=100:100,false;" + // default partition (100, non-exclusive) + "x=100:100,false;" + // x partition (100, non-exclusive) + "y=100:100,false;"; // y partition (100, non-exclusive) + String nodesConfig = "n1= res=50:50;n2= res=50:50;" + // n1~n2 are default partition + "x1=x res=50:50;x2=x res=50:50;" + // x1~x2 are x partition + "y1=y res=50:50;y2=y res=50:50;"; // y1~y2 are y partition + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100:100 100:100 100:100 0:0],x=[100:100 100:100 100:100 50:50]," + + "y=[100:100 100:100 100:100 50:50]);" + //root + "-batch(=[100:100 100:100 100:100 0:0],x=[0:0 0:0 50:50 0:0],y=[0:0 0:0 50:50 0:0]);" + // batch queue + "-a(=[0:0 0:0 0:0 0:0],x=[100:100 100:100 50:50 50:50],y=[0:0 0:0 0:0 0:0]);" + // x queue + "-b(=[0:0 0:0 0:0 0:0],x=[0:0 0:0 0:0 0:0],y=[100:100 100:100 50:50 50:50]);"; // y queue + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "batch\t" + // app1 in batch + "(1,50:50,n1,,1,false)(1,50:50,n2,,1,false)" + // 100 * default in n1~n2 + "(1,50:50,x1,,1,false)" + // 50 * x in x1 + "(1,50:50,y1,,1,false);" + // 50 * y in y1 + "a\t" + // app2 in x + "(1,50:50,x2,x,1,false);" + // 50 * x in x2 + "b\t" + // app3 in y + "(1,50:50,y2,y,1,false);"; // 50 * y in y2 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(mDisp, times(2)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockQueueHierarchy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockQueueHierarchy.java index ae4ff5a663e..b0bb675ad66 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockQueueHierarchy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockQueueHierarchy.java @@ -211,6 +211,15 @@ private void setupQueue(CSQueue queue, String q, String[] queueExprArray, LOG.debug("Parent=" + (parentQueue == null ? "null" : parentQueue .getQueuePath())); + Boolean isLeafQueue = !isParent(queueExprArray, idx); + if (isLeafQueue) { + LeafQueue lq = (LeafQueue) queue; + when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class), + isA(String.class), eq(false))).thenReturn(Resources.none()); + when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class), + isA(String.class), eq(true))).thenReturn(Resources.none()); + } + // Setup other fields like used resource, guaranteed resource, etc. String capacitySettingStr = q.substring(q.indexOf("(") + 1, q.indexOf(")")); for (String s : capacitySettingStr.split(",")) { @@ -237,8 +246,6 @@ private void setupQueue(CSQueue queue, String q, String[] queueExprArray, qc.setAbsoluteMaximumCapacity(partitionName, absMax); qc.setAbsoluteUsedCapacity(partitionName, absUsed); qc.setUsedCapacity(partitionName, used); - qr.setEffectiveMaxResource(parseResourceFromString(values[1].trim())); - qr.setEffectiveMinResource(parseResourceFromString(values[0].trim())); qr.setEffectiveMaxResource(partitionName, parseResourceFromString(values[1].trim())); qr.setEffectiveMinResource(partitionName, @@ -255,12 +262,12 @@ private void setupQueue(CSQueue queue, String q, String[] queueExprArray, reserved = parseResourceFromString(values[4].trim()); ru.setReserved(partitionName, reserved); } - if (!isParent(queueExprArray, idx)) { + if (isLeafQueue) { LeafQueue lq = (LeafQueue) queue; when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class), - isA(String.class), eq(false))).thenReturn(pending); + eq(partitionName), eq(false))).thenReturn(pending); when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class), - isA(String.class), eq(true))).thenReturn( + eq(partitionName), eq(true))).thenReturn( Resources.subtract(pending, reserved)); } ru.setUsed(partitionName, parseResourceFromString(values[2].trim()));