riMap = new HashMap<>();
+ String RESOURCE_1 = "res1";
+
+ // Initialize mandatory resources
+ ResourceInformation memory = ResourceInformation.newInstance(
+ ResourceInformation.MEMORY_MB.getName(),
+ ResourceInformation.MEMORY_MB.getUnits(),
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+ ResourceInformation vcores = ResourceInformation.newInstance(
+ ResourceInformation.VCORES.getName(),
+ ResourceInformation.VCORES.getUnits(),
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+ riMap.put(ResourceInformation.MEMORY_URI, memory);
+ riMap.put(ResourceInformation.VCORES_URI, vcores);
+ riMap.put(RESOURCE_1, ResourceInformation.newInstance(RESOURCE_1, "", 0,
+ ResourceTypes.COUNTABLE, 0, Integer.MAX_VALUE));
+
+ ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
+
+ /**
+ * Queue structure is:
+ *
+ *
+ * root
+ * / \ \
+ * a b c
+ *
+ * A / B / C have 33.3 / 33.3 / 33.4 resources
+ * Total cluster resource have mem=30, cpu=18, GPU=6
+ * A uses mem=6, cpu=3, GPU=3
+ * B uses mem=6, cpu=3, GPU=3
+ * C is asking mem=1,cpu=1,GPU=1
+ *
+ * We expect it can preempt from one of the jobs
+ */
+ String labelsConfig =
+ "=30:18:6,true;";
+ String nodesConfig =
+ "n1= res=30:18:6;"; // n1 is default partition
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[30:18:6 30:18:6 12:12:6 1:1:1]){priority=1};" + //root
+ "-a(=[10:6:2 10:6:2 6:6:3 0:0:0]){priority=1};" + // a
+ "-b(=[10:6:2 10:6:2 6:6:3 0:0:0]){priority=1};" + // b
+ "-c(=[10:6:2 10:6:2 0:0:0 1:1:1]){priority=2}"; // c
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t" // app1 in a1
+ + "(1,2:2:1,n1,,3,false);" +
+ "b\t" // app2 in b2
+ + "(1,2:2:1,n1,,3,false)";
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ verify(mDisp, times(1)).handle(
+ argThat(new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(getAppAttemptId(1))));
+
+ riMap.remove(RESOURCE_1);
+ ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
index 67c09cd7e5d..480885ad5fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
@@ -667,24 +667,24 @@ public void testNormalizeGuaranteeWithMultipleResource() throws IOException {
String queuesConfig =
// guaranteed,max,used,pending
"root(=[100:100:10 100:100:10 100:100:10 100:100:10]);" + //root
- "-a(=[50:80:4 100:100:10 80:90:10 30:20:4]);" + // a
- "--a1(=[25:30:2 100:50:10 80:90:10 0]);" + // a1
- "--a2(=[25:50:2 100:50:10 0 30:20:4]);" + // a2
- "-b(=[50:20:6 100:100:10 20:10 40:50:8]);" + // b
- "--b1(=[25:5:4 100:20:10 0 20:10:4]);" + // b1
- "--b2(=[25:15:2 100:20:10 20:10 20:10:4])"; // b2
+ "-a(=[50:80:4 100:100:10 80:90:10 30:20:4]);" + // a
+ "--a1(=[25:30:2 100:50:10 80:90:10 0]);" + // a1
+ "--a2(=[25:50:2 100:50:10 0 30:20:4]);" + // a2
+ "-b(=[50:20:6 100:100:10 20:10 40:50:8]);" + // b
+ "--b1(=[25:5:4 100:20:10 0 20:40:4]);" + // b1
+ "--b2(=[25:15:2 100:20:10 20:10 20:10:4])"; // b2
String appsConfig=
//queueName\t(priority,resource,host,expression,#repeat,reserved)
"a1\t" // app1 in a1
- + "(1,8:9:1,n1,,10,false);" +
- "b2\t" // app2 in b2
- + "(1,2:1,n1,,10,false)"; // 80 of y
+ + "(1,8:9:1,n1,,10,false);" +
+ "b2\t" // app2 in b2
+ + "(1,2:1,n1,,10,false)"; // 80 of y
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
- verify(mDisp, times(7)).handle(
- argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+ verify(mDisp, times(6)).handle(
+ argThat(new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(getAppAttemptId(1))));
riMap.remove(RESOURCE_1);
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java
index 0d6d350f001..66958740609 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java
@@ -18,8 +18,17 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.Test;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@@ -27,6 +36,12 @@
public class TestProportionalCapacityPreemptionPolicyInterQueueWithDRF
extends ProportionalCapacityPreemptionPolicyMockFramework {
+ @Override
+ public void setup() {
+ rc = new DominantResourceCalculator();
+ super.setup();
+ }
+
@Test
public void testInterQueuePreemptionWithMultipleResource()
throws Exception {
@@ -65,4 +80,71 @@ public void testInterQueuePreemptionWithMultipleResource()
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
+
+ @Test
+ public void test3ResourceTypesInterQueuePreemption() throws IOException {
+ // Initialize resource map
+ Map riMap = new HashMap<>();
+ String RESOURCE_1 = "res1";
+
+ // Initialize mandatory resources
+ ResourceInformation memory = ResourceInformation.newInstance(
+ ResourceInformation.MEMORY_MB.getName(),
+ ResourceInformation.MEMORY_MB.getUnits(),
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+ ResourceInformation vcores = ResourceInformation.newInstance(
+ ResourceInformation.VCORES.getName(),
+ ResourceInformation.VCORES.getUnits(),
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+ riMap.put(ResourceInformation.MEMORY_URI, memory);
+ riMap.put(ResourceInformation.VCORES_URI, vcores);
+ riMap.put(RESOURCE_1, ResourceInformation.newInstance(RESOURCE_1, "", 0,
+ ResourceTypes.COUNTABLE, 0, Integer.MAX_VALUE));
+
+ ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
+
+ /**
+ * Queue structure is:
+ *
+ *
+ * root
+ * / \ \
+ * a b c
+ *
+ * A / B / C have 33.3 / 33.3 / 33.4 resources
+ * Total cluster resource have mem=30, cpu=18, GPU=6
+ * A uses mem=6, cpu=3, GPU=3
+ * B uses mem=6, cpu=3, GPU=3
+ * C is asking mem=1,cpu=1,GPU=1
+ *
+ * We expect it can preempt from one of the jobs
+ */
+ String labelsConfig =
+ "=30:18:6,true;";
+ String nodesConfig =
+ "n1= res=30:18:6;"; // n1 is default partition
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[30:18:6 30:18:6 12:12:6 1:1:1]);" + //root
+ "-a(=[10:6:2 10:6:2 6:6:3 0:0:0]);" + // a
+ "-b(=[10:6:2 10:6:2 6:6:3 0:0:0]);" + // b
+ "-c(=[10:6:2 10:6:2 0:0:0 1:1:1])"; // c
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t" // app1 in a1
+ + "(1,2:2:1,n1,,3,false);" +
+ "b\t" // app2 in b2
+ + "(1,2:2:1,n1,,3,false)";
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ verify(mDisp, times(1)).handle(
+ argThat(new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(getAppAttemptId(1))));
+
+ riMap.remove(RESOURCE_1);
+ ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
+ }
}