diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java index 6375c4afd9c..f5c4058beb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java @@ -145,4 +145,9 @@ public Resource normalizeDown(Resource r, Resource stepFactor) { return Resources.createResource( roundDown((r.getMemorySize()), stepFactor.getMemorySize())); } + + @Override + public boolean isAnyMajorResourceZeroOrNegative(Resource resource) { + return resource.getMemorySize() <= 0f; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java index 6fed23b4c2a..06ce1e09a67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java @@ -613,4 +613,17 @@ public Resource normalizeDown(Resource r, Resource stepFactor) { } return ret; } + + @Override + public boolean isAnyMajorResourceZeroOrNegative(Resource resource) { + int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + for (int i = 0; i < maxLength; i++) { + ResourceInformation resourceInformation = resource.getResourceInformation( + i); + if (resourceInformation.getValue() <= 0L) { + return true; + } + } + return false; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java index 1c42126019e..0d901d34498 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java @@ -247,6 +247,15 @@ public abstract float divide( public abstract boolean isAnyMajorResourceZero(Resource resource); /** + * Check if resource has any major resource types (which are all NodeManagers + * included) a zero value or negative value. + * + * @param resource resource + * @return returns true if any resource is zero. + */ + public abstract boolean isAnyMajorResourceZeroOrNegative(Resource resource); + + /** * Get resource rand normalize down using step-factor * stepFactor. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index 1c08844cf33..c99548dd8ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -552,6 +552,11 @@ public static boolean isAnyMajorResourceZero(ResourceCalculator rc, return rc.isAnyMajorResourceZero(resource); } + public static boolean isAnyMajorResourceZeroOrNegative(ResourceCalculator rc, + Resource resource) { + return rc.isAnyMajorResourceZeroOrNegative(resource); + } + public static Resource normalizeDown(ResourceCalculator calculator, Resource resource, Resource factor) { return calculator.normalizeDown(resource, factor); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java index f097e9c6291..4add73462f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java @@ -21,6 +21,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -132,6 +133,16 @@ private static void deductPreemptableResourcePerApp( * map to hold preempted containers * @param totalPreemptionAllowed * total preemption allowed per round + * @param conservativeDRF + * should we do conservativeDRF preemption or not. + * When true: + * stop preempt container when any resource type < 0 for to- + * preempt. + * This is default preemption behavior of intra-queue preemption + * When false: + * stop preempt container when: all resource type <= 0 for to- + * preempt. + * This is default preemption behavior of inter-queue preemption * @return should we preempt rmContainer. If we should, deduct from * resourceToObtainByPartition */ @@ -140,7 +151,7 @@ public static boolean tryPreemptContainerAndDeductResToObtain( Map resourceToObtainByPartitions, RMContainer rmContainer, Resource clusterResource, Map> preemptMap, - Resource totalPreemptionAllowed) { + Resource totalPreemptionAllowed, boolean conservativeDRF) { ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId(); // We will not account resource of a container twice or more @@ -152,13 +163,57 @@ public static boolean tryPreemptContainerAndDeductResToObtain( rmContainer.getAllocatedNode()); Resource toObtainByPartition = resourceToObtainByPartitions .get(nodePartition); + if (null == toObtainByPartition) { + return false; + } + + // If a toObtain resource type == 0, set it to -1 to avoid 0 resource + // type affect following doPreemption check: isAnyMajorResourceZero + for (ResourceInformation ri : toObtainByPartition.getResources()) { + if (ri.getValue() == 0) { + ri.setValue(-1); + } + } - if (null != toObtainByPartition - && Resources.greaterThan(rc, clusterResource, toObtainByPartition, + if (Resources.greaterThan(rc, clusterResource, toObtainByPartition, Resources.none()) && Resources.fitsIn(rc, rmContainer.getAllocatedResource(), - totalPreemptionAllowed) - && !Resources.isAnyMajorResourceZero(rc, toObtainByPartition)) { + totalPreemptionAllowed)){ + + boolean doPreempt; + if (conservativeDRF) { + doPreempt = !Resources.isAnyMajorResourceZeroOrNegative(rc, + toObtainByPartition); + } else{ + // How much resource left after preemption happen. + Resource toObtainAfterPreemption = Resources.subtract(toObtainByPartition, + rmContainer.getAllocatedResource()); + + // When we want to do more aggressive preemption, we will do preemption: + // (when following conditions are all satisified + // - After preempt the container, the to-obtain should be either > 0 + // OR any major resource equals to 0. + // - The preempt of the container makes positive contribution to the + // to-obtain resource. Positive contribution means any positive + // resource type decreases. + // + // This is example of positive contribution: + // * before: <30, 10, 5>, after <20, 10, -10> + // But this not positive contribution: + // * before: <30, 10, 0>, after <30, 10, -15> + doPreempt = Resources.greaterThanOrEqual(rc, clusterResource, + toObtainAfterPreemption, Resources.none()) || Resources + .isAnyMajorResourceZero(rc, toObtainAfterPreemption); + doPreempt = doPreempt && Resources.lessThan(rc, clusterResource, + Resources + .componentwiseMax(toObtainAfterPreemption, Resources.none()), + Resources.componentwiseMax(toObtainByPartition, Resources.none())); + } + + if (!doPreempt) { + return false; + } + Resources.subtractFrom(toObtainByPartition, rmContainer.getAllocatedResource()); Resources.subtractFrom(totalPreemptionAllowed, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index 748548a761e..3b2fcbb90d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -111,7 +111,7 @@ .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext, resToObtainByPartition, c, clusterResource, selectedCandidates, - totalPreemptionAllowed); + totalPreemptionAllowed, false); if (!preempted) { continue; } @@ -187,7 +187,7 @@ private void preemptAMContainers(Resource clusterResource, boolean preempted = CapacitySchedulerPreemptionUtils .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext, resToObtainByPartition, c, clusterResource, preemptMap, - totalPreemptionAllowed); + totalPreemptionAllowed, false); if (preempted) { Resources.subtractFrom(skippedAMSize, c.getAllocatedResource()); } @@ -221,7 +221,7 @@ private void preemptFrom(FiCaSchedulerApp app, // Try to preempt this container CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain( rc, preemptionContext, resToObtainByPartition, c, clusterResource, - selectedContainers, totalPreemptionAllowed); + selectedContainers, totalPreemptionAllowed, false); if (!preemptionContext.isObserveOnly()) { preemptionContext.getRMContext().getDispatcher().getEventHandler() @@ -264,7 +264,7 @@ private void preemptFrom(FiCaSchedulerApp app, // Try to preempt this container CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain( rc, preemptionContext, resToObtainByPartition, c, clusterResource, - selectedContainers, totalPreemptionAllowed); + selectedContainers, totalPreemptionAllowed, false); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java index 1776bd4d946..9f462a1c68c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java @@ -279,7 +279,7 @@ private void calculateToBePreemptedResourcePerApp(Resource clusterResource, // Once unallocated resource is 0, we can stop assigning ideal per app. if (Resources.lessThanOrEqual(rc, clusterResource, queueReassignableResource, Resources.none()) - || Resources.isAnyMajorResourceZero(rc, queueReassignableResource)) { + || Resources.isAnyMajorResourceZeroOrNegative(rc, queueReassignableResource)) { continue; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java index 5b6932e6403..a91fac7bd3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java @@ -230,7 +230,7 @@ private void preemptFromLeastStarvedApp(LeafQueue leafQueue, boolean ret = CapacitySchedulerPreemptionUtils .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext, resToObtainByPartition, c, clusterResource, selectedCandidates, - totalPreemptedResourceAllowed); + totalPreemptedResourceAllowed, true); // Subtract from respective user's resource usage once a container is // selected for preemption. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java index e9a8116f1e6..e3ca335243b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java @@ -21,6 +21,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.junit.Before; import org.junit.Test; @@ -33,6 +34,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TestPreemptionForQueueWithPriorities extends ProportionalCapacityPreemptionPolicyMockFramework { @@ -426,7 +428,7 @@ public void testPriorityPreemptionWithMandatoryResourceForHierarchicalOfQueues() // Preemption should first divide capacities between a / b, and b1 should // get less preemption than b2 (because b1 has higher priority) - verify(mDisp, never()).handle(argThat( + verify(mDisp, times(2)).handle(argThat( new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(1)))); verify(mDisp, never()).handle(argThat( @@ -435,7 +437,7 @@ public void testPriorityPreemptionWithMandatoryResourceForHierarchicalOfQueues() verify(mDisp, never()).handle(argThat( new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(3)))); - verify(mDisp, times(2)).handle(argThat( + verify(mDisp, times(1)).handle(argThat( new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(4)))); } @@ -505,4 +507,73 @@ public void testPriorityPreemptionWithMultipleResource() getAppAttemptId(3)))); } + @Test + public void test3ResourceTypesInterQueuePreemption() throws IOException { + rc = new DominantResourceCalculator(); + when(cs.getResourceCalculator()).thenReturn(rc); + + // Initialize resource map + Map riMap = new HashMap<>(); + String RESOURCE_1 = "res1"; + + // Initialize mandatory resources + ResourceInformation memory = ResourceInformation.newInstance( + ResourceInformation.MEMORY_MB.getName(), + ResourceInformation.MEMORY_MB.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + ResourceInformation vcores = ResourceInformation.newInstance( + ResourceInformation.VCORES.getName(), + ResourceInformation.VCORES.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + riMap.put(ResourceInformation.MEMORY_URI, memory); + riMap.put(ResourceInformation.VCORES_URI, vcores); + riMap.put(RESOURCE_1, ResourceInformation.newInstance(RESOURCE_1, "", 0, + ResourceTypes.COUNTABLE, 0, Integer.MAX_VALUE)); + + ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); + + /** + * Queue structure is: + * + *
+     *              root
+     *           /  \  \
+     *          a    b  c
+     * 
+ * A / B / C have 33.3 / 33.3 / 33.4 resources + * Total cluster resource have mem=30, cpu=18, GPU=6 + * A uses mem=6, cpu=3, GPU=3 + * B uses mem=6, cpu=3, GPU=3 + * C is asking mem=1,cpu=1,GPU=1 + * + * We expect it can preempt from one of the jobs + */ + String labelsConfig = + "=30:18:6,true;"; + String nodesConfig = + "n1= res=30:18:6;"; // n1 is default partition + String queuesConfig = + // guaranteed,max,used,pending + "root(=[30:18:6 30:18:6 12:12:6 1:1:1]){priority=1};" + //root + "-a(=[10:6:2 10:6:2 6:6:3 0:0:0]){priority=1};" + // a + "-b(=[10:6:2 10:6:2 6:6:3 0:0:0]){priority=1};" + // b + "-c(=[10:6:2 10:6:2 0:0:0 1:1:1]){priority=2}"; // c + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a1 + + "(1,2:2:1,n1,,3,false);" + + "b\t" // app2 in b2 + + "(1,2:2:1,n1,,3,false)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(mDisp, times(1)).handle( + argThat(new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(getAppAttemptId(1)))); + + riMap.remove(RESOURCE_1); + ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java index 67c09cd7e5d..480885ad5fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java @@ -667,24 +667,24 @@ public void testNormalizeGuaranteeWithMultipleResource() throws IOException { String queuesConfig = // guaranteed,max,used,pending "root(=[100:100:10 100:100:10 100:100:10 100:100:10]);" + //root - "-a(=[50:80:4 100:100:10 80:90:10 30:20:4]);" + // a - "--a1(=[25:30:2 100:50:10 80:90:10 0]);" + // a1 - "--a2(=[25:50:2 100:50:10 0 30:20:4]);" + // a2 - "-b(=[50:20:6 100:100:10 20:10 40:50:8]);" + // b - "--b1(=[25:5:4 100:20:10 0 20:10:4]);" + // b1 - "--b2(=[25:15:2 100:20:10 20:10 20:10:4])"; // b2 + "-a(=[50:80:4 100:100:10 80:90:10 30:20:4]);" + // a + "--a1(=[25:30:2 100:50:10 80:90:10 0]);" + // a1 + "--a2(=[25:50:2 100:50:10 0 30:20:4]);" + // a2 + "-b(=[50:20:6 100:100:10 20:10 40:50:8]);" + // b + "--b1(=[25:5:4 100:20:10 0 20:40:4]);" + // b1 + "--b2(=[25:15:2 100:20:10 20:10 20:10:4])"; // b2 String appsConfig= //queueName\t(priority,resource,host,expression,#repeat,reserved) "a1\t" // app1 in a1 - + "(1,8:9:1,n1,,10,false);" + - "b2\t" // app2 in b2 - + "(1,2:1,n1,,10,false)"; // 80 of y + + "(1,8:9:1,n1,,10,false);" + + "b2\t" // app2 in b2 + + "(1,2:1,n1,,10,false)"; // 80 of y buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); policy.editSchedule(); - verify(mDisp, times(7)).handle( - argThat(new IsPreemptionRequestFor(getAppAttemptId(1)))); + verify(mDisp, times(6)).handle( + argThat(new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(getAppAttemptId(1)))); riMap.remove(RESOURCE_1); ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java index 0d6d350f001..66958740609 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java @@ -18,8 +18,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.junit.Test; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -27,6 +36,12 @@ public class TestProportionalCapacityPreemptionPolicyInterQueueWithDRF extends ProportionalCapacityPreemptionPolicyMockFramework { + @Override + public void setup() { + rc = new DominantResourceCalculator(); + super.setup(); + } + @Test public void testInterQueuePreemptionWithMultipleResource() throws Exception { @@ -65,4 +80,71 @@ public void testInterQueuePreemptionWithMultipleResource() new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(2)))); } + + @Test + public void test3ResourceTypesInterQueuePreemption() throws IOException { + // Initialize resource map + Map riMap = new HashMap<>(); + String RESOURCE_1 = "res1"; + + // Initialize mandatory resources + ResourceInformation memory = ResourceInformation.newInstance( + ResourceInformation.MEMORY_MB.getName(), + ResourceInformation.MEMORY_MB.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + ResourceInformation vcores = ResourceInformation.newInstance( + ResourceInformation.VCORES.getName(), + ResourceInformation.VCORES.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + riMap.put(ResourceInformation.MEMORY_URI, memory); + riMap.put(ResourceInformation.VCORES_URI, vcores); + riMap.put(RESOURCE_1, ResourceInformation.newInstance(RESOURCE_1, "", 0, + ResourceTypes.COUNTABLE, 0, Integer.MAX_VALUE)); + + ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); + + /** + * Queue structure is: + * + *
+     *              root
+     *           /  \  \
+     *          a    b  c
+     * 
+ * A / B / C have 33.3 / 33.3 / 33.4 resources + * Total cluster resource have mem=30, cpu=18, GPU=6 + * A uses mem=6, cpu=3, GPU=3 + * B uses mem=6, cpu=3, GPU=3 + * C is asking mem=1,cpu=1,GPU=1 + * + * We expect it can preempt from one of the jobs + */ + String labelsConfig = + "=30:18:6,true;"; + String nodesConfig = + "n1= res=30:18:6;"; // n1 is default partition + String queuesConfig = + // guaranteed,max,used,pending + "root(=[30:18:6 30:18:6 12:12:6 1:1:1]);" + //root + "-a(=[10:6:2 10:6:2 6:6:3 0:0:0]);" + // a + "-b(=[10:6:2 10:6:2 6:6:3 0:0:0]);" + // b + "-c(=[10:6:2 10:6:2 0:0:0 1:1:1])"; // c + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a1 + + "(1,2:2:1,n1,,3,false);" + + "b\t" // app2 in b2 + + "(1,2:2:1,n1,,3,false)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(mDisp, times(1)).handle( + argThat(new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(getAppAttemptId(1)))); + + riMap.remove(RESOURCE_1); + ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); + } }