diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java index ab6d7f57483..83d73470505 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java @@ -150,4 +150,9 @@ public boolean isAnyMajorResourceZeroOrNegative(Resource resource) { public boolean isAnyMajorResourceAboveZero(Resource resource) { return resource.getMemorySize() > 0; } + + @Override + public boolean isAnyMajorResourceGreaterThan(Resource lhs, Resource rhs) { + return lhs.getMemorySize() > rhs.getMemorySize(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java index 9aeb51cc2cc..261504f8ca2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java @@ -587,4 +587,19 @@ public boolean isAnyMajorResourceAboveZero(Resource resource) { } return false; } + + @Override + public boolean isAnyMajorResourceGreaterThan(Resource lhs, Resource rhs) { + int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + for (int i = 0; i < maxLength; i++) { + ResourceInformation lResourceInformation = lhs + .getResourceInformation(i); + ResourceInformation rResourceInformation = rhs + .getResourceInformation(i); + if (lResourceInformation.getValue() > rResourceInformation.getValue()) { + return true; + } + } + return false; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java index 27394f73a7a..ebecdb010e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java @@ -266,4 +266,16 @@ public abstract float divide( * @return returns true if any resource is {@literal >} 0 */ public abstract boolean isAnyMajorResourceAboveZero(Resource resource); + + /** + * Check if any major resource types of left resource is greater than + * that of right resource. + * + * @param lhs left resource + * @param rhs right resource + * @return returns true if any major resource types of left resource + * is greater than that of right resource + */ + public abstract boolean isAnyMajorResourceGreaterThan(Resource lhs, + Resource rhs); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index 48c2c364ae9..525ea353d44 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -553,4 +553,9 @@ public static Resource normalizeDown(ResourceCalculator calculator, Resource resource, Resource factor) { return calculator.normalizeDown(resource, factor); } + + public static boolean isAnyMajorResourceGreaterThan(ResourceCalculator rc, + Resource lhs, Resource rhs) { + return rc.isAnyMajorResourceGreaterThan(lhs, rhs); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java index ed50effee86..259018ce4c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java @@ -92,8 +92,7 @@ public static void deductPreemptableResourcesBasedSelectedCandidates( } if (null != res) { - tq.deductActuallyToBePreempted(context.getResourceCalculator(), - tq.totalPartitionResource, res); + tq.deductActuallyToBePreempted(context.getResourceCalculator(), res); Collection tas = tq.getApps(); if (null == tas || tas.isEmpty()) { continue; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java index 89a015e4122..c64d4e6a5c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java @@ -111,10 +111,10 @@ protected void computeIdealResourceDistribution(ResourceCalculator rc, // how much preemption is required overall Resource totPreemptionNeeded = Resource.newInstance(0, 0); for (TempQueuePerPartition t:queues) { - if (Resources.greaterThan(rc, tot_guarant, - t.getUsed(), t.idealAssigned)) { - Resources.addTo(totPreemptionNeeded, Resources - .subtract(t.getUsed(), t.idealAssigned)); + if (Resources.isAnyMajorResourceGreaterThan(rc, t.getUsed(), + t.idealAssigned)) { + Resources.addTo(totPreemptionNeeded, Resources.subtractFromNonNegative( + Resources.clone(t.getUsed()), t.idealAssigned)); } } @@ -177,7 +177,7 @@ private void calculateResToObtainByPartitionForLeafQueues( for (TempQueuePerPartition qT : context.getQueuePartitions(queueName)) { // we act only if we are violating balance by more than // maxIgnoredOverCapacity - if (Resources.greaterThan(rc, clusterResource, + if (Resources.isAnyMajorResourceGreaterThan(rc, qT.getUsed(), Resources .multiply(qT.getGuaranteed(), 1.0 + context.getMaxIgnoreOverCapacity()))) { @@ -198,16 +198,14 @@ private void calculateResToObtainByPartitionForLeafQueues( */ Resource resToObtain = qT.toBePreempted; if (!isReservedPreemptionCandidatesSelector) { - if (Resources.greaterThan(rc, clusterResource, resToObtain, - Resource.newInstance(0, 0))) { + if (rc.isAnyMajorResourceAboveZero(resToObtain)) { resToObtain = Resources.multiplyAndNormalizeUp(rc, qT.toBePreempted, context.getNaturalTerminationFactor(), Resource.newInstance(1, 1)); } } // Only add resToObtain when it >= 0 - if (Resources.greaterThan(rc, clusterResource, resToObtain, - Resources.none())) { + if (rc.isAnyMajorResourceAboveZero(resToObtain)) { if (LOG.isDebugEnabled()) { LOG.debug("Queue=" + queueName + " partition=" + qT.partition + " resource-to-obtain=" + resToObtain); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java index 4fb1862b887..757aaf8318c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java @@ -282,28 +282,26 @@ public void assignPreemption(float scalingFactor, ResourceCalculator rc, // guaranteed and total. We should avoid preempt from a queue if it is // already // <= its guaranteed resource. - Resource minimumQueueResource = Resources.max(rc, clusterResource, - Resources.min(rc, clusterResource, totalResource, getGuaranteed()), + Resource minimumQueueResource = Resources.componentwiseMax( + Resources.componentwiseMin(totalResource, getGuaranteed()), idealAssigned); - if (Resources.greaterThan(rc, clusterResource, usedDeductKillable, + if (Resources.isAnyMajorResourceGreaterThan(rc, usedDeductKillable, minimumQueueResource)) { - toBePreempted = Resources.multiply( - Resources.subtract(usedDeductKillable, minimumQueueResource), - scalingFactor); + Resource delta = Resources.subtractFromNonNegative( + Resources.clone(usedDeductKillable), minimumQueueResource); + toBePreempted = Resources.multiply(delta, scalingFactor); } else { toBePreempted = Resources.none(); } } public void deductActuallyToBePreempted(ResourceCalculator rc, - Resource cluster, Resource toBeDeduct) { - if (Resources.greaterThan(rc, cluster, getActuallyToBePreempted(), + Resource toBeDeduct) { + if (Resources.isAnyMajorResourceGreaterThan(rc, getActuallyToBePreempted(), toBeDeduct)) { - Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct); + Resources.subtractFromNonNegative(getActuallyToBePreempted(), toBeDeduct); } - setActuallyToBePreempted(Resources.max(rc, cluster, - getActuallyToBePreempted(), Resources.none())); } void appendLogString(StringBuilder sb) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java index 14a3a9ad7d7..73e465d9e97 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java @@ -173,4 +173,42 @@ public void test3ResourceTypesInterQueuePreemption() throws IOException { new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(2)))); } + + @Test + public void testResourceTypesInterQueuePreemption() throws IOException { + /* + * root + * / \ \ + * a b c + * Total cluster resource have mem=40, cpu=18 + * queue a: guaranteed=<10,10>, used=<6,10> + * queue b: guaranteed=<20,6>, used=<20,8> + * queue c: guaranteed=<10,2>, used=<0,0>, pending=<1,1> + * + * Queue c is an underserved queue, queue b overuses 2 cpu resource. + * We expect it can preempt from app2 in queue b + */ + String labelsConfig = "=40:18,true;"; + String nodesConfig = "n1= res=40:18;"; // n1 is default partition + String queuesConfig = + // guaranteed,max,used,pending + "root(=[40:18 40:18 26:18 1:1]);" + //root + "-a(=[10:10 10:18 6:10 0:0]);" + // a + "-b(=[20:6 20:18 20:8 0:0]);" + // b + "-c(=[10:2 10:18 0:0 1:1])"; // c + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t(1,1:1,n1,,3,false);" + "b\t" // app1 in a + + "(1,1:1,n1,,3,false)"; // app2 in b + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(1)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } } \ No newline at end of file