From 9fadfbc4648517d9820fe85461a487e407d7e1e5 Mon Sep 17 00:00:00 2001 From: Sunil G Date: Thu, 18 May 2017 22:36:03 +0530 Subject: [PATCH] YARN-6471 --- .../java/org/apache/hadoop/util/StringUtils.java | 31 +++ .../hadoop/yarn/util/UnitsConversionUtil.java | 217 ++++++++++++++++++ .../util/resource/DefaultResourceCalculator.java | 6 + .../util/resource/DominantResourceCalculator.java | 7 + .../yarn/util/resource/ResourceCalculator.java | 10 + .../hadoop/yarn/util/resource/Resources.java | 5 + .../monitor/capacity/FifoCandidatesSelector.java | 9 +- .../ProportionalCapacityPreemptionPolicy.java | 11 +- .../monitor/capacity/TempQueuePerPartition.java | 16 +- .../reservation/CapacityReservationSystem.java | 6 +- .../reservation/CapacitySchedulerPlanFollower.java | 11 +- .../scheduler/AbstractResourceUsage.java | 194 ++++++++++++++++ .../scheduler/QueueResourceQuotas.java | 115 ++++++++++ .../resourcemanager/scheduler/ResourceUsage.java | 243 ++++----------------- .../scheduler/capacity/AbstractCSQueue.java | 153 ++++++++++++- .../scheduler/capacity/CSQueue.java | 63 +++++- .../scheduler/capacity/CSQueueUtils.java | 24 +- .../capacity/CapacitySchedulerConfiguration.java | 180 ++++++++++++++- .../scheduler/capacity/LeafQueue.java | 36 ++- .../scheduler/capacity/ParentQueue.java | 193 ++++++++++++++-- .../scheduler/capacity/UsersManager.java | 6 +- .../PriorityUtilizationQueueOrderingPolicy.java | 11 + .../webapp/dao/CapacitySchedulerQueueInfo.java | 15 ++ .../TestAbsoluteResourceConfiguration.java | 173 +++++++++++++++ 24 files changed, 1448 insertions(+), 287 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueResourceQuotas.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceConfiguration.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java index 72bd171..63f4fae 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java @@ -1152,4 +1152,35 @@ public static boolean equalsIgnoreCase(String s1, String s2) { return s1.equalsIgnoreCase(s2); } + /** + *

Checks if the String contains only unicode letters.

+ * + *

null will return false. + * An empty String (length()=0) will return true.

+ * + *
+   * StringUtils.isAlpha(null)   = false
+   * StringUtils.isAlpha("")     = true
+   * StringUtils.isAlpha("  ")   = false
+   * StringUtils.isAlpha("abc")  = true
+   * StringUtils.isAlpha("ab2c") = false
+   * StringUtils.isAlpha("ab-c") = false
+   * 
+ * + * @param str the String to check, may be null + * @return true if only contains letters, and is non-null + */ + public static boolean isAlpha(String str) { + if (str == null) { + return false; + } + int sz = str.length(); + for (int i = 0; i < sz; i++) { + if (Character.isLetter(str.charAt(i)) == false) { + return false; + } + } + return true; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java new file mode 100644 index 0000000..79ee0f7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.util; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.math.BigInteger; +import java.util.*; + +/** + * A util to convert values in one unit to another. Units refers to whether + * the value is expressed in pico, nano, etc. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class UnitsConversionUtil { + + /** + * Helper class for encapsulating conversion values. + */ + public static class Converter { + private long numerator; + private long denominator; + + Converter(long n, long d) { + this.numerator = n; + this.denominator = d; + } + } + + private static final String[] UNITS = + { "p", "n", "u", "m", "", "k", "M", "G", "T", "P", "Ki", "Mi", "Gi", "Ti", + "Pi" }; + private static final List SORTED_UNITS = Arrays.asList(UNITS); + public static final Set KNOWN_UNITS = createKnownUnitsSet(); + private static final Converter PICO = + new Converter(1L, 1000L * 1000L * 1000L * 1000L); + private static final Converter NANO = + new Converter(1L, 1000L * 1000L * 1000L); + private static final Converter MICRO = new Converter(1L, 1000L * 1000L); + private static final Converter MILLI = new Converter(1L, 1000L); + private static final Converter BASE = new Converter(1L, 1L); + private static final Converter KILO = new Converter(1000L, 1L); + private static final Converter MEGA = new Converter(1000L * 1000L, 1L); + private static final Converter GIGA = + new Converter(1000L * 1000L * 1000L, 1L); + private static final Converter TERA = + new Converter(1000L * 1000L * 1000L * 1000L, 1L); + private static final Converter PETA = + new Converter(1000L * 1000L * 1000L * 1000L * 1000L, 1L); + + private static final Converter KILO_BINARY = new Converter(1024L, 1L); + private static final Converter MEGA_BINARY = new Converter(1024L * 1024L, 1L); + private static final Converter GIGA_BINARY = + new Converter(1024L * 1024L * 1024L, 1L); + private static final Converter TERA_BINARY = + new Converter(1024L * 1024L * 1024L * 1024L, 1L); + private static final Converter PETA_BINARY = + new Converter(1024L * 1024L * 1024L * 1024L * 1024L, 1L); + + private static Set createKnownUnitsSet() { + Set ret = new HashSet<>(); + ret.addAll(Arrays.asList(UNITS)); + return ret; + } + + private static Converter getConverter(String unit) { + switch (unit) { + case "p": + return PICO; + case "n": + return NANO; + case "u": + return MICRO; + case "m": + return MILLI; + case "": + return BASE; + case "k": + return KILO; + case "M": + return MEGA; + case "G": + return GIGA; + case "T": + return TERA; + case "P": + return PETA; + case "Ki": + return KILO_BINARY; + case "Mi": + return MEGA_BINARY; + case "Gi": + return GIGA_BINARY; + case "Ti": + return TERA_BINARY; + case "Pi": + return PETA_BINARY; + default: + throw new IllegalArgumentException( + "Unknown unit '" + unit + "'. Known units are " + KNOWN_UNITS); + } + } + + /** + * Converts a value from one unit to another. Supported units can be obtained + * by inspecting the KNOWN_UNITS set. + * + * @param fromUnit the unit of the from value + * @param toUnit the target unit + * @param fromValue the value you wish to convert + * @return the value in toUnit + */ + public static Long convert(String fromUnit, String toUnit, Long fromValue) { + if (toUnit == null || fromUnit == null || fromValue == null) { + throw new IllegalArgumentException("One or more arguments are null"); + } + String overflowMsg = + "Converting " + fromValue + " from '" + fromUnit + "' to '" + toUnit + + "' will result in an overflow of Long"; + if (fromUnit.equals(toUnit)) { + return fromValue; + } + Converter fc = getConverter(fromUnit); + Converter tc = getConverter(toUnit); + Long numerator = fc.numerator * tc.denominator; + Long denominator = fc.denominator * tc.numerator; + Long numeratorMultiplierLimit = Long.MAX_VALUE / numerator; + if (numerator < denominator) { + if (numeratorMultiplierLimit < fromValue) { + throw new IllegalArgumentException(overflowMsg); + } + return (fromValue * numerator) / denominator; + } + if (numeratorMultiplierLimit > fromValue) { + return (numerator * fromValue) / denominator; + } + Long tmp = numerator / denominator; + if ((Long.MAX_VALUE / tmp) < fromValue) { + throw new IllegalArgumentException(overflowMsg); + } + return fromValue * tmp; + } + + /** + * Compare a value in a given unit with a value in another unit. The return + * value is equivalent to the value returned by compareTo. + * + * @param unitA first unit + * @param valueA first value + * @param unitB second unit + * @param valueB second value + * @return +1, 0 or -1 depending on whether the relationship is greater than, + * equal to or lesser than + */ + public static int compare(String unitA, Long valueA, String unitB, + Long valueB) { + if (unitA == null || unitB == null || !KNOWN_UNITS.contains(unitA) + || !KNOWN_UNITS.contains(unitB)) { + throw new IllegalArgumentException("Units cannot be null"); + } + if (!KNOWN_UNITS.contains(unitA)) { + throw new IllegalArgumentException("Unknown unit '" + unitA + "'"); + } + if (!KNOWN_UNITS.contains(unitB)) { + throw new IllegalArgumentException("Unknown unit '" + unitB + "'"); + } + Converter unitAC = getConverter(unitA); + Converter unitBC = getConverter(unitB); + if (unitA.equals(unitB)) { + return valueA.compareTo(valueB); + } + int unitAPos = SORTED_UNITS.indexOf(unitA); + int unitBPos = SORTED_UNITS.indexOf(unitB); + try { + Long tmpA = valueA; + Long tmpB = valueB; + if (unitAPos < unitBPos) { + tmpB = convert(unitB, unitA, valueB); + } else { + tmpA = convert(unitA, unitB, valueA); + } + return tmpA.compareTo(tmpB); + } catch (IllegalArgumentException ie) { + BigInteger tmpA = BigInteger.valueOf(valueA); + BigInteger tmpB = BigInteger.valueOf(valueB); + if (unitAPos < unitBPos) { + tmpB = tmpB.multiply(BigInteger.valueOf(unitBC.numerator)); + tmpB = tmpB.multiply(BigInteger.valueOf(unitAC.denominator)); + tmpB = tmpB.divide(BigInteger.valueOf(unitBC.denominator)); + tmpB = tmpB.divide(BigInteger.valueOf(unitAC.numerator)); + } else { + tmpA = tmpA.multiply(BigInteger.valueOf(unitAC.numerator)); + tmpA = tmpA.multiply(BigInteger.valueOf(unitBC.denominator)); + tmpA = tmpA.divide(BigInteger.valueOf(unitAC.denominator)); + tmpA = tmpA.divide(BigInteger.valueOf(unitBC.numerator)); + } + return tmpA.compareTo(tmpB); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java index ef7229c..977453d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java @@ -121,4 +121,10 @@ public boolean fitsIn(Resource cluster, Resource smaller, Resource bigger) { return smaller.getMemorySize() <= bigger.getMemorySize(); } + + @Override + public Resource normalizeDown(Resource r, Resource stepFactor) { + return Resources.createResource( + roundDown((r.getMemorySize()), stepFactor.getMemorySize())); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java index 032aa02..f2ebbe3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java @@ -239,4 +239,11 @@ public boolean fitsIn(Resource cluster, return smaller.getMemorySize() <= bigger.getMemorySize() && smaller.getVirtualCores() <= bigger.getVirtualCores(); } + + @Override + public Resource normalizeDown(Resource r, Resource stepFactor) { + return Resources.createResource( + roundDown(r.getMemorySize(), stepFactor.getMemorySize()), + roundDown(r.getVirtualCores(), stepFactor.getVirtualCores())); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java index a2f85b3..b0f1f72 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java @@ -204,4 +204,14 @@ public abstract float divide( */ public abstract boolean fitsIn(Resource cluster, Resource smaller, Resource bigger); + + /** + * Get resource rand normalize down using step-factor + * stepFactor. + * + * @param r resource to be multiplied + * @param stepFactor factor by which to normalize down + * @return resulting normalized resource + */ + public abstract Resource normalizeDown(Resource r, Resource stepFactor); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index 7020300..e6225e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -352,4 +352,9 @@ public static Resource componentwiseMax(Resource lhs, Resource rhs) { return createResource(Math.max(lhs.getMemorySize(), rhs.getMemorySize()), Math.max(lhs.getVirtualCores(), rhs.getVirtualCores())); } + + public static Resource normalizeDown(ResourceCalculator calculator, + Resource resource, Resource factor) { + return calculator.normalizeDown(resource, factor); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index f4d7e92..2648488 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -140,10 +140,10 @@ // Can try preempting AMContainers (still saving atmost // maxAMCapacityForThisQueue AMResource's) if more resources are // required to be preemptionCandidates from this Queue. - Resource maxAMCapacityForThisQueue = Resources.multiply( - Resources.multiply(clusterResource, - leafQueue.getAbsoluteCapacity()), - leafQueue.getMaxAMResourcePerQueuePercent()); + Resource maxAMCapacityForThisQueue = Resources + .multiply( + leafQueue.getEffectiveCapacity(RMNodeLabelsManager.NO_LABEL), + leafQueue.getMaxAMResourcePerQueuePercent()); preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist, resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue, @@ -199,7 +199,6 @@ private void preemptAMContainers(Resource clusterResource, * Given a target preemption for a specific application, select containers * to preempt (after unreserving all reservation for that app). */ - @SuppressWarnings("unchecked") private void preemptFrom(FiCaSchedulerApp app, Resource clusterResource, Map resToObtainByPartition, List skippedAMContainerlist, Resource skippedAMSize, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index dc6f1c2..f7f2374 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; @@ -242,7 +243,6 @@ public synchronized void editSchedule() { } } - @SuppressWarnings("unchecked") private void preemptOrkillSelectedContainerAfterWait( Map> selectedCandidates, long currentTime) { @@ -463,6 +463,13 @@ private TempQueuePerPartition cloneQueues(CSQueue curQueue, float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt); boolean preemptionDisabled = curQueue.getPreemptionDisabled(); + QueueResourceQuotas queueResourceQuotas = curQueue + .getQueueResourceQuotas(); + Resource effMinRes = queueResourceQuotas + .getEffectiveMinResource(partitionToLookAt); + Resource effMaxRes = queueResourceQuotas + .getEffectiveMaxResource(partitionToLookAt); + Resource current = Resources .clone(curQueue.getQueueResourceUsage().getUsed(partitionToLookAt)); Resource killable = Resources.none(); @@ -488,7 +495,7 @@ private TempQueuePerPartition cloneQueues(CSQueue curQueue, ret = new TempQueuePerPartition(queueName, current, preemptionDisabled, partitionToLookAt, killable, absCap, absMaxCap, partitionResource, - reserved, curQueue); + reserved, curQueue, effMinRes, effMaxRes); if (curQueue instanceof ParentQueue) { String configuredOrderingPolicy = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java index 7eab015..5979220 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java @@ -59,10 +59,14 @@ int relativePriority = 0; TempQueuePerPartition parent = null; + private Resource effMinRes; + private Resource effMaxRes; + TempQueuePerPartition(String queueName, Resource current, boolean preemptionDisabled, String partition, Resource killable, float absCapacity, float absMaxCapacity, Resource totalPartitionResource, - Resource reserved, CSQueue queue) { + Resource reserved, CSQueue queue, Resource effMinRes, + Resource effMaxRes) { super(queueName, current, Resource.newInstance(0, 0), reserved, Resource.newInstance(0, 0)); @@ -89,6 +93,8 @@ this.absCapacity = absCapacity; this.absMaxCapacity = absMaxCapacity; this.totalPartitionResource = totalPartitionResource; + this.effMinRes = effMinRes; + this.effMaxRes = effMaxRes; } public void setLeafQueue(LeafQueue l) { @@ -171,10 +177,18 @@ Resource offer(Resource avail, ResourceCalculator rc, } public Resource getGuaranteed() { + if(!effMinRes.equals(Resources.none())) { + return Resources.clone(effMinRes); + } + return Resources.multiply(totalPartitionResource, absCapacity); } public Resource getMax() { + if(!effMaxRes.equals(Resources.none())) { + return Resources.clone(effMaxRes); + } + return Resources.multiply(totalPartitionResource, absMaxCapacity); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityReservationSystem.java index be0a708..3df2b44 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityReservationSystem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityReservationSystem.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; @@ -93,10 +94,9 @@ protected String getPlanQueuePath(String planQueueName) { @Override protected Resource getPlanQueueCapacity(String planQueueName) { Resource minAllocation = getMinAllocation(); - ResourceCalculator rescCalc = getResourceCalculator(); CSQueue planQueue = capScheduler.getQueue(planQueueName); - return rescCalc.multiplyAndNormalizeDown(capScheduler.getClusterResource(), - planQueue.getAbsoluteCapacity(), minAllocation); + return planQueue.getEffectiveCapacityByNormalizeDown( + RMNodeLabelsManager.NO_LABEL, minAllocation); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java index 551f075..558062a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; @@ -131,9 +132,9 @@ protected void createDefaultReservationQueue( @Override protected Resource getPlanResources( Plan plan, Queue queue, Resource clusterResources) { - PlanQueue planQueue = (PlanQueue)queue; - float planAbsCap = planQueue.getAbsoluteCapacity(); - Resource planResources = Resources.multiply(clusterResources, planAbsCap); + PlanQueue planQueue = (PlanQueue) queue; + Resource planResources = planQueue + .getEffectiveCapacity(RMNodeLabelsManager.NO_LABEL); plan.setTotalCapacity(planResources); return planResources; } @@ -144,8 +145,8 @@ protected Resource getReservationQueueResourceIfExists(Plan plan, CSQueue resQueue = cs.getQueue(reservationId.toString()); Resource reservationResource = null; if (resQueue != null) { - reservationResource = Resources.multiply(cs.getClusterResource(), - resQueue.getAbsoluteCapacity()); + reservationResource = resQueue.getEffectiveCapacity( + RMNodeLabelsManager.NO_LABEL); } return reservationResource; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java new file mode 100644 index 0000000..89360c1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * This class can be used to track resource usage in queue/user/app. + * + * And it is thread-safe + */ +public class AbstractResourceUsage { + protected ReadLock readLock; + protected WriteLock writeLock; + protected Map usages; + // short for no-label :) + private static final String NL = CommonNodeLabelsManager.NO_LABEL; + + public AbstractResourceUsage() { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + + usages = new HashMap(); + usages.put(NL, new UsageByLabel(NL)); + } + + // Usage enum here to make implement cleaner + public enum ResourceType { + // CACHED_USED and CACHED_PENDING may be read by anyone, but must only + // be written by ordering policies + USED(0), PENDING(1), AMUSED(2), RESERVED(3), CACHED_USED(4), CACHED_PENDING( + 5), AMLIMIT(6), MIN_RESOURCE( + 7), MAX_RESOURCE(8), EFF_MIN_RESOURCE(9), EFF_MAX_RESOURCE(10); + + private int idx; + + private ResourceType(int value) { + this.idx = value; + } + } + + public static class UsageByLabel { + // usage by label, contains all UsageType + private Resource[] resArr; + + public UsageByLabel(String label) { + resArr = new Resource[ResourceType.values().length]; + for (int i = 0; i < resArr.length; i++) { + resArr[i] = Resource.newInstance(0, 0); + }; + } + + public Resource getUsed() { + return resArr[ResourceType.USED.idx]; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{used=" + resArr[0] + "%, "); + sb.append("pending=" + resArr[1] + "%, "); + sb.append("am_used=" + resArr[2] + "%, "); + sb.append("reserved=" + resArr[3] + "%}"); + return sb.toString(); + } + } + + private static Resource normalize(Resource res) { + if (res == null) { + return Resources.none(); + } + return res; + } + + protected Resource _get(String label, ResourceType type) { + if (label == null) { + label = RMNodeLabelsManager.NO_LABEL; + } + + try { + readLock.lock(); + UsageByLabel usage = usages.get(label); + if (null == usage) { + return Resources.none(); + } + return normalize(usage.resArr[type.idx]); + } finally { + readLock.unlock(); + } + } + + protected Resource _getAll(ResourceType type) { + try { + readLock.lock(); + Resource allOfType = Resources.createResource(0); + for (Map.Entry usageEntry : usages.entrySet()) { + //all usages types are initialized + Resources.addTo(allOfType, usageEntry.getValue().resArr[type.idx]); + } + return allOfType; + } finally { + readLock.unlock(); + } + } + + private UsageByLabel getAndAddIfMissing(String label) { + if (label == null) { + label = RMNodeLabelsManager.NO_LABEL; + } + if (!usages.containsKey(label)) { + UsageByLabel u = new UsageByLabel(label); + usages.put(label, u); + return u; + } + + return usages.get(label); + } + + protected void _set(String label, ResourceType type, Resource res) { + try { + writeLock.lock(); + UsageByLabel usage = getAndAddIfMissing(label); + usage.resArr[type.idx] = res; + } finally { + writeLock.unlock(); + } + } + + protected void _inc(String label, ResourceType type, Resource res) { + try { + writeLock.lock(); + UsageByLabel usage = getAndAddIfMissing(label); + Resources.addTo(usage.resArr[type.idx], res); + } finally { + writeLock.unlock(); + } + } + + protected void _dec(String label, ResourceType type, Resource res) { + try { + writeLock.lock(); + UsageByLabel usage = getAndAddIfMissing(label); + Resources.subtractFrom(usage.resArr[type.idx], res); + } finally { + writeLock.unlock(); + } + } + + @Override + public String toString() { + try { + readLock.lock(); + return usages.toString(); + } finally { + readLock.unlock(); + } + } + + public Set getNodePartitionsSet() { + try { + readLock.lock(); + return usages.keySet(); + } finally { + readLock.unlock(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueResourceQuotas.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueResourceQuotas.java new file mode 100644 index 0000000..38b035f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueResourceQuotas.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; + +/** + * QueueResourceQuotas by Labels for following fields by label + * - EFFECTIVE_MIN_CAPACITY + * - EFFECTIVE_MAX_CAPACITY + * This class can be used to track resource usage in queue/user/app. + * + * And it is thread-safe + */ +public class QueueResourceQuotas extends AbstractResourceUsage { + // short for no-label :) + private static final String NL = CommonNodeLabelsManager.NO_LABEL; + + public QueueResourceQuotas() { + super(); + } + + /* + * Configured Minimum Resource + */ + public Resource getConfiguredMinResource() { + return _get(NL, ResourceType.MIN_RESOURCE); + } + + public Resource getConfiguredMinResource(String label) { + return _get(label, ResourceType.MIN_RESOURCE); + } + + public void setConfiguredMinResource(String label, Resource res) { + _set(label, ResourceType.MIN_RESOURCE, res); + } + + public void setConfiguredMinResource(Resource res) { + _set(NL, ResourceType.MIN_RESOURCE, res); + } + + /* + * Configured Maximum Resource + */ + public Resource getConfiguredMaxResource() { + return getConfiguredMaxResource(NL); + } + + public Resource getConfiguredMaxResource(String label) { + return _get(label, ResourceType.EFF_MAX_RESOURCE); + } + + public void setConfiguredMaxResource(Resource res) { + setConfiguredMaxResource(NL, res); + } + + public void setConfiguredMaxResource(String label, Resource res) { + _set(label, ResourceType.EFF_MAX_RESOURCE, res); + } + + /* + * Effective Minimum Resource + */ + public Resource getEffectiveMinResource() { + return _get(NL, ResourceType.EFF_MIN_RESOURCE); + } + + public Resource getEffectiveMinResource(String label) { + return _get(label, ResourceType.EFF_MIN_RESOURCE); + } + + public void setEffectiveMinResource(String label, Resource res) { + _set(label, ResourceType.EFF_MIN_RESOURCE, res); + } + + public void setEffectiveMinResource(Resource res) { + _set(NL, ResourceType.EFF_MIN_RESOURCE, res); + } + + /* + * Effective Maximum Resource + */ + public Resource getEffectiveMaxResource() { + return getEffectiveMaxResource(NL); + } + + public Resource getEffectiveMaxResource(String label) { + return _get(label, ResourceType.EFF_MAX_RESOURCE); + } + + public void setEffectiveMaxResource(Resource res) { + setEffectiveMaxResource(NL, res); + } + + public void setEffectiveMaxResource(String label, Resource res) { + _set(label, ResourceType.EFF_MAX_RESOURCE, res); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java index 2857379..711a468 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java @@ -18,17 +18,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; -import java.util.HashMap; -import java.util.Map; import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -40,61 +33,12 @@ * * And it is thread-safe */ -public class ResourceUsage { - private ReadLock readLock; - private WriteLock writeLock; - private Map usages; +public class ResourceUsage extends AbstractResourceUsage { // short for no-label :) private static final String NL = CommonNodeLabelsManager.NO_LABEL; public ResourceUsage() { - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - readLock = lock.readLock(); - writeLock = lock.writeLock(); - - usages = new HashMap(); - usages.put(NL, new UsageByLabel(NL)); - } - - // Usage enum here to make implement cleaner - private enum ResourceType { - //CACHED_USED and CACHED_PENDING may be read by anyone, but must only - //be written by ordering policies - USED(0), PENDING(1), AMUSED(2), RESERVED(3), CACHED_USED(4), - CACHED_PENDING(5), AMLIMIT(6); - - private int idx; - - private ResourceType(int value) { - this.idx = value; - } - } - - private static class UsageByLabel { - // usage by label, contains all UsageType - private Resource[] resArr; - - public UsageByLabel(String label) { - resArr = new Resource[ResourceType.values().length]; - for (int i = 0; i < resArr.length; i++) { - resArr[i] = Resource.newInstance(0, 0); - }; - } - - public Resource getUsed() { - return resArr[ResourceType.USED.idx]; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("{used=" + resArr[0] + "%, "); - sb.append("pending=" + resArr[1] + "%, "); - sb.append("am_used=" + resArr[2] + "%, "); - sb.append("reserved=" + resArr[3] + "%}"); - sb.append("am_limit=" + resArr[6] + "%, "); - return sb.toString(); - } + super(); } /* @@ -108,22 +52,6 @@ public Resource getUsed(String label) { return _get(label, ResourceType.USED); } - public Resource getCachedUsed() { - return _get(NL, ResourceType.CACHED_USED); - } - - public Resource getCachedUsed(String label) { - return _get(label, ResourceType.CACHED_USED); - } - - public Resource getCachedPending() { - return _get(NL, ResourceType.CACHED_PENDING); - } - - public Resource getCachedPending(String label) { - return _get(label, ResourceType.CACHED_PENDING); - } - public void incUsed(String label, Resource res) { _inc(label, ResourceType.USED, res); } @@ -144,7 +72,7 @@ public void setUsed(Resource res) { setUsed(NL, res); } - public void copyAllUsed(ResourceUsage other) { + public void copyAllUsed(AbstractResourceUsage other) { try { writeLock.lock(); for (Entry entry : other.usages.entrySet()) { @@ -159,22 +87,6 @@ public void setUsed(String label, Resource res) { _set(label, ResourceType.USED, res); } - public void setCachedUsed(String label, Resource res) { - _set(label, ResourceType.CACHED_USED, res); - } - - public void setCachedUsed(Resource res) { - _set(NL, ResourceType.CACHED_USED, res); - } - - public void setCachedPending(String label, Resource res) { - _set(label, ResourceType.CACHED_PENDING, res); - } - - public void setCachedPending(Resource res) { - _set(NL, ResourceType.CACHED_PENDING, res); - } - /* * Pending */ @@ -280,6 +192,47 @@ public void setAMUsed(String label, Resource res) { _set(label, ResourceType.AMUSED, res); } + public Resource getAllPending() { + return _getAll(ResourceType.PENDING); + } + + public Resource getAllUsed() { + return _getAll(ResourceType.USED); + } + + // Cache Used + public Resource getCachedUsed() { + return _get(NL, ResourceType.CACHED_USED); + } + + public Resource getCachedUsed(String label) { + return _get(label, ResourceType.CACHED_USED); + } + + public Resource getCachedPending() { + return _get(NL, ResourceType.CACHED_PENDING); + } + + public Resource getCachedPending(String label) { + return _get(label, ResourceType.CACHED_PENDING); + } + + public void setCachedUsed(String label, Resource res) { + _set(label, ResourceType.CACHED_USED, res); + } + + public void setCachedUsed(Resource res) { + _set(NL, ResourceType.CACHED_USED, res); + } + + public void setCachedPending(String label, Resource res) { + _set(label, ResourceType.CACHED_PENDING, res); + } + + public void setCachedPending(Resource res) { + _set(NL, ResourceType.CACHED_PENDING, res); + } + /* * AM-Resource Limit */ @@ -315,95 +268,6 @@ public void setAMLimit(String label, Resource res) { _set(label, ResourceType.AMLIMIT, res); } - private static Resource normalize(Resource res) { - if (res == null) { - return Resources.none(); - } - return res; - } - - private Resource _get(String label, ResourceType type) { - if (label == null) { - label = RMNodeLabelsManager.NO_LABEL; - } - - try { - readLock.lock(); - UsageByLabel usage = usages.get(label); - if (null == usage) { - return Resources.none(); - } - return normalize(usage.resArr[type.idx]); - } finally { - readLock.unlock(); - } - } - - private Resource _getAll(ResourceType type) { - try { - readLock.lock(); - Resource allOfType = Resources.createResource(0); - for (Map.Entry usageEntry : usages.entrySet()) { - //all usages types are initialized - Resources.addTo(allOfType, usageEntry.getValue().resArr[type.idx]); - } - return allOfType; - } finally { - readLock.unlock(); - } - } - - public Resource getAllPending() { - return _getAll(ResourceType.PENDING); - } - - public Resource getAllUsed() { - return _getAll(ResourceType.USED); - } - - private UsageByLabel getAndAddIfMissing(String label) { - if (label == null) { - label = RMNodeLabelsManager.NO_LABEL; - } - if (!usages.containsKey(label)) { - UsageByLabel u = new UsageByLabel(label); - usages.put(label, u); - return u; - } - - return usages.get(label); - } - - private void _set(String label, ResourceType type, Resource res) { - try { - writeLock.lock(); - UsageByLabel usage = getAndAddIfMissing(label); - usage.resArr[type.idx] = res; - } finally { - writeLock.unlock(); - } - } - - private void _inc(String label, ResourceType type, Resource res) { - try { - writeLock.lock(); - UsageByLabel usage = getAndAddIfMissing(label); - Resources.addTo(usage.resArr[type.idx], res); - } finally { - writeLock.unlock(); - } - } - - private void _dec(String label, ResourceType type, Resource res) { - try { - writeLock.lock(); - UsageByLabel usage = getAndAddIfMissing(label); - Resources.subtractFrom(usage.resArr[type.idx], res); - } finally { - writeLock.unlock(); - } - } - public Resource getCachedDemand(String label) { try { readLock.lock(); @@ -415,23 +279,4 @@ public Resource getCachedDemand(String label) { readLock.unlock(); } } - - @Override - public String toString() { - try { - readLock.lock(); - return usages.toString(); - } finally { - readLock.unlock(); - } - } - - public Set getNodePartitionsSet() { - try { - readLock.lock(); - return usages.keySet(); - } finally { - readLock.unlock(); - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 6c141a1..b98fb62 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -55,6 +55,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.AbsoluteResourceType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -85,6 +87,7 @@ final ResourceCalculator resourceCalculator; Set accessibleLabels; + Set resourceTypes; final RMNodeLabelsManager labelManager; String defaultLabelExpression; @@ -100,6 +103,14 @@ // etc. QueueCapacities queueCapacities; + QueueResourceQuotas queueResourceQuotas; + + protected enum CapacityConfigType { + NONE, PERCENTAGE, ABSOLUTE_RESOURCE + }; + protected CapacityConfigType capacityConfigType = + CapacityConfigType.NONE; + private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); protected CapacitySchedulerContext csContext; @@ -136,6 +147,9 @@ public AbstractCSQueue(CapacitySchedulerContext cs, // initialize QueueCapacities queueCapacities = new QueueCapacities(parent == null); + // initialize queueResourceQuotas + queueResourceQuotas = new QueueResourceQuotas(); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); readLock = lock.readLock(); writeLock = lock.writeLock(); @@ -266,6 +280,10 @@ void setupQueueConfigs(Resource clusterResource) this.defaultLabelExpression = csContext.getConfiguration().getDefaultNodeLabelExpression( getQueuePath()); + this.resourceTypes = new HashSet(); + for (AbsoluteResourceType type : AbsoluteResourceType.values()) { + resourceTypes.add(type.toString().toLowerCase()); + } // inherit from parent if labels not set if (this.accessibleLabels == null && parent != null) { @@ -282,6 +300,11 @@ void setupQueueConfigs(Resource clusterResource) // After we setup labels, we can setup capacities setupConfigurableCapacities(); + // Also fetch minimum/maximum resource constraint for this queue if + // configured. + capacityConfigType = CapacityConfigType.NONE; + updateConfigurableResourceRequirement(getQueuePath(), clusterResource); + this.maximumAllocation = csContext.getConfiguration().getMaximumAllocationPerQueue( getQueuePath()); @@ -337,6 +360,87 @@ void setupQueueConfigs(Resource clusterResource) } } + protected void updateConfigurableResourceRequirement(String queuePath, + Resource clusterResource) { + CapacitySchedulerConfiguration conf = csContext.getConfiguration(); + Set configuredNodelabels = conf.getConfiguredNodeLabels(queuePath); + + for (String label : configuredNodelabels) { + Resource minResource = conf.getMinimumResourceRequirement(label, + queuePath, resourceTypes); + Resource maxResource = conf.getMaximumResourceRequirement(label, + queuePath, resourceTypes); + + if (this.capacityConfigType.equals(CapacityConfigType.NONE)) { + this.capacityConfigType = (!minResource.equals(Resources.none()) + && queueCapacities.getAbsoluteCapacity(label) == 0f) + ? CapacityConfigType.ABSOLUTE_RESOURCE + : CapacityConfigType.PERCENTAGE; + LOG.info("capacityConfigType is updated as '" + capacityConfigType + + "' for queue '" + getQueueName() + "' with minResource = " + + minResource); + } + + validateAbsoluteVsPercentageCapacityConfig(minResource); + + // If min resource for a resource type is greater than its max resource, + // throw exception to handle such error configs. + if (!maxResource.equals(Resources.none()) && Resources.greaterThan( + resourceCalculator, clusterResource, minResource, maxResource)) { + throw new IllegalArgumentException("Min resource configuration " + + minResource + " is greater than its max value:" + maxResource + + " in queue:" + getQueueName()); + } + + // If parent's max resource is lesser to a specific child's max + // resource, throw exception to handle such error configs. + if (parent != null) { + Resource parentMaxRes = parent.getQueueResourceQuotas() + .getConfiguredMaxResource(label); + if (Resources.greaterThan(resourceCalculator, clusterResource, + parentMaxRes, Resources.none())) { + if (Resources.greaterThan(resourceCalculator, clusterResource, + maxResource, parentMaxRes)) { + throw new IllegalArgumentException("Max resource configuration " + + maxResource + " is greater than parents max value:" + + parentMaxRes + " in queue:" + getQueueName()); + } + + // If child's max resource is not set, but its parent max resource is + // set, we must set child max resource to its parent's. + if (maxResource.equals(Resources.none()) + && !minResource.equals(Resources.none())) { + maxResource = Resources.clone(parentMaxRes); + } + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Updating absolute resource configuration for queue:" + + getQueueName() + " as minResource=" + minResource + + " and maxResource=" + maxResource); + } + + queueResourceQuotas.setConfiguredMinResource(label, minResource); + queueResourceQuotas.setConfiguredMaxResource(label, maxResource); + } + } + + private void validateAbsoluteVsPercentageCapacityConfig(Resource minResource) { + CapacityConfigType localType = CapacityConfigType.PERCENTAGE; + if (!minResource.equals(Resources.none())) { + localType = CapacityConfigType.ABSOLUTE_RESOURCE; + } + + if (!queueName.equals("root") + && !this.capacityConfigType.equals(localType)) { + throw new IllegalArgumentException("Queue '" + getQueueName() + + "' should use either percentage based capacity" + + "configuration or absolute resource. localType=" + localType + + ", and capacityConfigType=" + capacityConfigType); + } + } + private void initializeQueueState(QueueState previousState, QueueState configuredState, QueueState parentState) { // verify that we can not any value for State other than RUNNING/STOPPED @@ -528,6 +632,11 @@ public ResourceUsage getQueueResourceUsage() { } @Override + public QueueResourceQuotas getQueueResourceQuotas() { + return queueResourceQuotas; + } + + @Override public ReentrantReadWriteLock.ReadLock getReadLock() { return readLock; } @@ -577,7 +686,7 @@ private Resource getCurrentLimitResource(String nodePartition, * limit-set-by-parent) */ Resource queueMaxResource = - getQueueMaxResource(nodePartition, clusterResource); + getQueueMaxResource(nodePartition); return Resources.min(resourceCalculator, clusterResource, queueMaxResource, currentResourceLimits.getLimit()); @@ -590,11 +699,8 @@ private Resource getCurrentLimitResource(String nodePartition, return Resources.none(); } - Resource getQueueMaxResource(String nodePartition, Resource clusterResource) { - return Resources.multiplyAndNormalizeDown(resourceCalculator, - labelManager.getResourceByLabel(nodePartition, clusterResource), - queueCapacities.getAbsoluteMaximumCapacity(nodePartition), - minimumAllocation); + Resource getQueueMaxResource(String nodePartition) { + return getEffectiveMaxCapacityByNormalizeDown(nodePartition, minimumAllocation); } boolean canAssignToThisQueue(Resource clusterResource, @@ -750,7 +856,7 @@ public void incUsedResource(String nodeLabel, Resource resourceToInc, queueUsage.incUsed(nodeLabel, resourceToInc); CSQueueUtils.updateUsedCapacity(resourceCalculator, labelManager.getResourceByLabel(nodeLabel, Resources.none()), - nodeLabel, this); + Resources.none(), nodeLabel, this); if (null != parent) { parent.incUsedResource(nodeLabel, resourceToInc, null); } @@ -766,7 +872,7 @@ public void decUsedResource(String nodeLabel, Resource resourceToDec, queueUsage.decUsed(nodeLabel, resourceToDec); CSQueueUtils.updateUsedCapacity(resourceCalculator, labelManager.getResourceByLabel(nodeLabel, Resources.none()), - nodeLabel, this); + Resources.none(), nodeLabel, this); if (null != parent) { parent.decUsedResource(nodeLabel, resourceToDec, null); } @@ -872,7 +978,7 @@ public boolean accept(Resource cluster, Resource maxResourceLimit; if (allocation.getSchedulingMode() == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { - maxResourceLimit = getQueueMaxResource(partition, cluster); + maxResourceLimit = getQueueMaxResource(partition); } else{ maxResourceLimit = labelManager.getResourceByLabel( schedulerContainer.getNodePartition(), cluster); @@ -956,4 +1062,33 @@ protected void appFinished() { public Priority getPriority() { return this.priority; } + + @Override + public CapacityConfigType getCapacityConfigType() { + return capacityConfigType; + } + + @Override + public Resource getEffectiveCapacityByNormalizeDown(String label, + Resource normalizeDown) { + return Resources.normalizeDown(resourceCalculator, + queueResourceQuotas.getEffectiveMinResource(label), normalizeDown); + } + + @Override + public Resource getEffectiveMaxCapacityByNormalizeDown(String label, + Resource normalizeDown) { + return Resources.normalizeDown(resourceCalculator, + queueResourceQuotas.getEffectiveMaxResource(label), normalizeDown); + } + + @Override + public Resource getEffectiveCapacity(String label) { + return Resources.clone(queueResourceQuotas.getEffectiveMinResource(label)); + } + + @Override + public Resource getEffectiveMaxCapacity(String label) { + return Resources.clone(queueResourceQuotas.getEffectiveMaxResource(label)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index c6726ec..66c20e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -36,21 +36,20 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.security.PrivilegedEntity; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue.CapacityConfigType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; /** * CSQueue represents a node in the tree of @@ -350,4 +349,62 @@ public void validateSubmitApplication(ApplicationId applicationId, * @return queue priority */ Priority getPriority(); + + /** + * Get QueueResourceQuotas associated with each queue. + * @return QueueResourceQuotas + */ + public QueueResourceQuotas getQueueResourceQuotas(); + + /** + * Get CapacityConfigType as PERCENTAGE or ABSOLUTE_RESOURCE + * @return CapacityConfigType + */ + public CapacityConfigType getCapacityConfigType(); + + /** + * Get effective capacity of queue. If min/max resource is configured, + * preference will be given to absolute configuration over normal capacity. + * Also round down the result to normalizeDown. + * + * @param label + * partition + * @param normalizeDown + * Round Off delta + * @return effective queue capacity + */ + Resource getEffectiveCapacityByNormalizeDown(String label, + Resource normalizeDown); + + /** + * Get effective max capacity of queue. If min/max resource is configured, + * preference will be given to absolute configuration over normal capacity. + * Also round down the result to normalizeDown. + * + * @param label + * partition + * @param normalizeDown + * Round Off delta + * @return effective max queue capacity + */ + Resource getEffectiveMaxCapacityByNormalizeDown(String label, + Resource normalizeDown); + + /** + * Get effective capacity of queue for default label. + * + * @param label + * partition + * @return effective queue capacity for default label. + */ + Resource getEffectiveCapacity(String label); + + /** + * Get effective max capacity of queue for default label. + * + * @param label + * partition + * @return effective max queue capacity for default label. + */ + Resource getEffectiveMaxCapacity(String label); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index ba22541..08b19ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -150,7 +150,7 @@ private static void loadCapacitiesByLabelsFromConf(String queuePath, } } } - + // Set absolute capacities for {capacity, maximum-capacity} private static void updateAbsoluteCapacitiesByNodeLabels( QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities) { @@ -180,8 +180,8 @@ private static void updateAbsoluteCapacitiesByNodeLabels( * used resource for all partitions of this queue. */ public static void updateUsedCapacity(final ResourceCalculator rc, - final Resource totalPartitionResource, String nodePartition, - AbstractCSQueue childQueue) { + final Resource totalPartitionResource, Resource clusterResource, + String nodePartition, AbstractCSQueue childQueue) { QueueCapacities queueCapacities = childQueue.getQueueCapacities(); CSQueueMetrics queueMetrics = childQueue.getMetrics(); ResourceUsage queueResourceUsage = childQueue.getQueueResourceUsage(); @@ -193,11 +193,8 @@ public static void updateUsedCapacity(final ResourceCalculator rc, if (Resources.greaterThan(rc, totalPartitionResource, totalPartitionResource, Resources.none())) { - // queueGuaranteed = totalPartitionedResource * - // absolute_capacity(partition) - Resource queueGuranteedResource = - Resources.multiply(totalPartitionResource, - queueCapacities.getAbsoluteCapacity(nodePartition)); + Resource queueGuranteedResource = childQueue + .getEffectiveCapacity(nodePartition); // make queueGuranteed >= minimum_allocation to avoid divided by 0. queueGuranteedResource = @@ -248,9 +245,7 @@ private static Resource getMaxAvailableResourceToQueue( for (String partition : nodeLabels) { // Calculate guaranteed resource for a label in a queue by below logic. // (total label resource) * (absolute capacity of label in that queue) - Resource queueGuranteedResource = Resources.multiply(nlm - .getResourceByLabel(partition, cluster), queue.getQueueCapacities() - .getAbsoluteCapacity(partition)); + Resource queueGuranteedResource = queue.getEffectiveCapacity(partition); // Available resource in queue for a specific label will be calculated as // {(guaranteed resource for a label in a queue) - @@ -289,15 +284,14 @@ public static void updateQueueStatistics( ResourceUsage queueResourceUsage = childQueue.getQueueResourceUsage(); if (nodePartition == null) { - for (String partition : Sets.union( - queueCapacities.getNodePartitionsSet(), + for (String partition : Sets.union(queueCapacities.getNodePartitionsSet(), queueResourceUsage.getNodePartitionsSet())) { updateUsedCapacity(rc, nlm.getResourceByLabel(partition, cluster), - partition, childQueue); + cluster, partition, childQueue); } } else { updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster), - nodePartition, childQueue); + cluster, nodePartition, childQueue); } // Update queue metrics w.r.t node labels. In a generic way, we can diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 9fb92ec..1b05bf6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; +import org.apache.hadoop.yarn.util.UnitsConversionUtil; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -60,6 +61,8 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.Set; import java.util.StringTokenizer; @@ -292,6 +295,21 @@ @Private public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false; + /** Configuring absolute min/max resources in a queue **/ + @Private + public static final String MINIMUM_RESOURCE = "min-resource"; + + @Private + public static final String MAXIMUM_RESOURCE = "max-resource"; + + public static final String DEFAULT_RESOURCE_TYPES = "memory,vcores"; + + public static final String PATTERN_FOR_ABSOLUTE_RESOURCE = "\\[([^\\]]+)"; + + public enum AbsoluteResourceType { + MEMORY, VCORES; + } + AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser(); public CapacitySchedulerConfiguration() { @@ -369,7 +387,7 @@ public void setMaximumApplicationMasterResourcePerQueuePercent(String queue, public float getNonLabeledQueueCapacity(String queue) { float capacity = queue.equals("root") ? 100.0f : getFloat( - getQueuePrefix(queue) + CAPACITY, UNDEFINED); + getQueuePrefix(queue) + CAPACITY, 0f); if (capacity < MINIMUM_CAPACITY_VALUE || capacity > MAXIMUM_CAPACITY_VALUE) { throw new IllegalArgumentException("Illegal " + "capacity of " + capacity + " for queue " + queue); @@ -1404,4 +1422,164 @@ public void setPUOrderingPolicyUnderUtilizedPreemptionMoveReservation( QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY, UNDER_UTILIZED_PREEMPTION_MOVE_RESERVATION), allowMoveReservation); } + + public static String getUnits(String resourceValue) { + String units; + for (int i = 0; i < resourceValue.length(); i++) { + if (Character.isAlphabetic(resourceValue.charAt(i))) { + units = resourceValue.substring(i); + if (StringUtils.isAlpha(units)) { + return units; + } + } + } + return ""; + } + + /** + * Get absolute minimum resource requirement for a queue. + * + * @param label + * NodeLabel + * @param queue + * queue path + * @param resourceTypes + * Resource types + * @return ResourceInformation + */ + public Resource getMinimumResourceRequirement(String label, String queue, + Set resourceTypes) { + return internalGetLabeledResourceRequirementForQueue(queue, + label, resourceTypes, MINIMUM_RESOURCE); + } + + /** + * Get absolute maximum resource requirement for a queue. + * + * @param label + * NodeLabel + * @param queue + * queue path + * @param resourceTypes + * Resource types + * @return Resource + */ + public Resource getMaximumResourceRequirement(String label, String queue, + Set resourceTypes) { + return internalGetLabeledResourceRequirementForQueue(queue, + label, resourceTypes, MAXIMUM_RESOURCE); + } + + @VisibleForTesting + public void setMinimumResourceRequirement(String label, String queue, + Resource resource) { + updateMinMaxResourceToConf(label, queue, resource, MINIMUM_RESOURCE); + } + + @VisibleForTesting + public void setMaximumResourceRequirement(String label, String queue, + Resource resource) { + updateMinMaxResourceToConf(label, queue, resource, MAXIMUM_RESOURCE); + } + + private void updateMinMaxResourceToConf(String label, String queue, + Resource resource, String type) { + if (queue.equals("root")) { + throw new IllegalArgumentException( + "Cannot set resource, root queue will take 100% of cluster capacity"); + } + + StringBuilder resourceString = new StringBuilder(); + resourceString + .append("[" + AbsoluteResourceType.MEMORY.toString().toLowerCase() + "=" + + resource.getMemorySize() + "," + + AbsoluteResourceType.VCORES.toString().toLowerCase() + "=" + + resource.getVirtualCores() + "]"); + + String prefix = getQueuePrefix(queue) + type; + if (!label.isEmpty()) { + prefix = getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS + DOT + label + + DOT + type; + } + set(prefix, resourceString.toString()); + } + + private Resource internalGetLabeledResourceRequirementForQueue(String queue, + String label, Set resourceTypes, String suffix) { + String propertyName = getNodeLabelPrefix(queue, label) + suffix; + String resourceString = get(propertyName); + if (resourceString == null || resourceString.isEmpty()) { + return Resources.none(); + } + + // Define resource here. + Resource resource = Resource.newInstance(0l, 0); + Matcher matcher = Pattern.compile(PATTERN_FOR_ABSOLUTE_RESOURCE) + .matcher(resourceString); + + /* + * Absolute resource configuration for a queue will be grouped by "[]". + * Syntax of absolute resource config could be like below + * "memory=4Gi vcores=2". Ideally this means "4GB of memory and 2 vcores". + */ + if (matcher.find()) { + // Get the sub-group. + String subGroup = matcher.group(1); + if (subGroup.trim().isEmpty()) { + return Resources.none(); + } + + for (String kvPair : subGroup.trim().split(",")) { + String[] splits = kvPair.split("="); + + // Ensure that each sub string is key value pair separated by '='. + if (splits != null && splits.length > 1) { + updateResourceValuesFromConfig(resourceTypes, resource, splits); + } + } + } + + // Memory has to be configured always. + if (resource.getMemorySize() == 0l) { + return Resources.none(); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("CSConf - getAbsolueResourcePerQueue: prefix=" + + getNodeLabelPrefix(queue, label) + ", capacity=" + resource); + } + return resource; + } + + private void updateResourceValuesFromConfig(Set resourceTypes, + Resource resource, String[] splits) { + + // If key is not a valid type, skip it. + if (!resourceTypes.contains(splits[0])) { + return; + } + + String units = getUnits(splits[1]); + long resourceValue = Long + .valueOf(splits[1].substring(0, splits[1].length() - units.length())); + + // Convert all incoming units to MB if units is configured. + if (!units.isEmpty()) { + resourceValue = UnitsConversionUtil.convert(units, "Mi", resourceValue); + } + + // map it based on key. + AbsoluteResourceType resType = AbsoluteResourceType + .valueOf(StringUtils.toUpperCase(splits[0].trim())); + switch (resType) { + case MEMORY : + resource.setMemorySize(resourceValue); + break; + case VCORES : + resource.setVirtualCores((int) resourceValue); + break; + default : + break; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 9059ef0..e32c49c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -43,12 +43,10 @@ import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.security.AccessType; -import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -56,7 +54,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; @@ -638,12 +635,8 @@ public Resource getUserAMResourceLimitPerPartition( float effectiveUserLimit = Math.max(usersManager.getUserLimit() / 100.0f, 1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1)); - Resource queuePartitionResource = Resources - .multiplyAndNormalizeUp(resourceCalculator, - labelManager.getResourceByLabel(nodePartition, - lastClusterResource), - queueCapacities.getAbsoluteCapacity(nodePartition), - minimumAllocation); + Resource queuePartitionResource = getEffectiveCapacityByNormalizeDown( + nodePartition, minimumAllocation); Resource userAMLimit = Resources.multiplyAndNormalizeUp( resourceCalculator, queuePartitionResource, @@ -672,11 +665,8 @@ public Resource calculateAndGetAMResourceLimitPerPartition( * non-labeled), * with per-partition am-resource-percent to get the max am * resource limit for this queue and partition. */ - Resource queuePartitionResource = Resources.multiplyAndNormalizeUp( - resourceCalculator, - labelManager.getResourceByLabel(nodePartition, lastClusterResource), - queueCapacities.getAbsoluteCapacity(nodePartition), - minimumAllocation); + Resource queuePartitionResource = getEffectiveCapacityByNormalizeDown( + nodePartition, minimumAllocation); Resource queueCurrentLimit = Resources.none(); // For non-labeled partition, we need to consider the current queue @@ -930,6 +920,14 @@ private FiCaSchedulerApp getApplication( private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) { // Set preemption-allowed: // For leaf queue, only under-utilized queue is allowed to preempt resources from other queues + if (!queueResourceQuotas.getEffectiveMinResource(nodePartition) + .equals(Resources.none())) { + limits.setIsAllowPreemption(Resources.lessThan(resourceCalculator, + csContext.getClusterResource(), queueUsage.getUsed(nodePartition), + queueResourceQuotas.getEffectiveMinResource(nodePartition))); + return; + } + float usedCapacity = queueCapacities.getAbsoluteUsedCapacity(nodePartition); float guaranteedCapacity = queueCapacities.getAbsoluteCapacity(nodePartition); limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity); @@ -1278,7 +1276,7 @@ private Resource getHeadroom(User user, currentPartitionResourceLimit = partition.equals(RMNodeLabelsManager.NO_LABEL) ? currentPartitionResourceLimit - : getQueueMaxResource(partition, clusterResource); + : getQueueMaxResource(partition); Resource headroom = Resources.componentwiseMin( Resources.subtract(userLimitResource, user.getUsed(partition)), @@ -1647,12 +1645,8 @@ private void updateCurrentResourceLimits( // this. So need cap limits by queue's max capacity here. this.cachedResourceLimitsForHeadroom = new ResourceLimits(currentResourceLimits.getLimit()); - Resource queueMaxResource = - Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager - .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource), - queueCapacities - .getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL), - minimumAllocation); + Resource queueMaxResource = getEffectiveMaxCapacityByNormalizeDown( + RMNodeLabelsManager.NO_LABEL, minimumAllocation); this.cachedResourceLimitsForHeadroom.setLimit(Resources.min( resourceCalculator, clusterResource, queueMaxResource, currentResourceLimits.getLimit())); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 91fedbc..8a3f828 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -34,7 +34,6 @@ import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AccessType; @@ -45,7 +44,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; @@ -60,6 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import java.io.IOException; @@ -69,6 +68,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; @Private @Evolving @@ -163,31 +163,78 @@ void setChildQueues(Collection childQueues) { writeLock.lock(); // Validate float childCapacities = 0; + Resource minResDefaultLabel = Resources.createResource(0, 0); for (CSQueue queue : childQueues) { childCapacities += queue.getCapacity(); + Resources.addTo(minResDefaultLabel, queue.getQueueResourceQuotas() + .getConfiguredMinResource()); + + // If any child queue is using percentage based capacity model vs parent + // queues' absolute configuration or vice versa, throw back an + // exception. + if (!queueName.equals("root") && getCapacity() != 0f + && !queue.getQueueResourceQuotas().getConfiguredMinResource() + .equals(Resources.none())) { + throw new IllegalArgumentException("Parent queue '" + getQueueName() + + "' and child queue '" + queue.getQueueName() + + "' should use either percentage based capacity" + + "configuration or absolute resource together."); + } } + float delta = Math.abs(1.0f - childCapacities); // crude way to check // allow capacities being set to 0, and enforce child 0 if parent is 0 - if (((queueCapacities.getCapacity() > 0) && (delta > PRECISION)) || ( - (queueCapacities.getCapacity() == 0) && (childCapacities > 0))) { - throw new IllegalArgumentException( - "Illegal" + " capacity of " + childCapacities - + " for children of queue " + queueName); + if ((minResDefaultLabel.equals(Resources.none()) + && (queueCapacities.getCapacity() > 0) && (delta > PRECISION)) + || ((queueCapacities.getCapacity() == 0) && (childCapacities > 0))) { + throw new IllegalArgumentException("Illegal" + " capacity of " + + childCapacities + " for children of queue " + queueName); } // check label capacities for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { float capacityByLabel = queueCapacities.getCapacity(nodeLabel); // check children's labels float sum = 0; + Resource minRes = Resources.createResource(0, 0); + Resource resourceByLabel = labelManager.getResourceByLabel(nodeLabel, + scheduler.getClusterResource()); for (CSQueue queue : childQueues) { sum += queue.getQueueCapacities().getCapacity(nodeLabel); + + // If any child queue of a label is using percentage based capacity + // model vs parent queues' absolute configuration or vice versa, throw + // back an exception + if (!queueName.equals("root") && !this.capacityConfigType + .equals(queue.getCapacityConfigType())) { + throw new IllegalArgumentException("Parent queue '" + getQueueName() + + "' and child queue '" + queue.getQueueName() + + "' should use either percentage based capacity" + + "configuration or absolute resource together for label:" + + nodeLabel); + } + + // Accumulate all min/max resource configured for all child queues. + Resources.addTo(minRes, queue.getQueueResourceQuotas() + .getConfiguredMinResource(nodeLabel)); } - if ((capacityByLabel > 0 && Math.abs(1.0f - sum) > PRECISION) + if ((minResDefaultLabel.equals(Resources.none()) && capacityByLabel > 0 + && Math.abs(1.0f - sum) > PRECISION) || (capacityByLabel == 0) && (sum > 0)) { throw new IllegalArgumentException( "Illegal" + " capacity of " + sum + " for children of queue " + queueName + " for label=" + nodeLabel); } + + // Ensure that for each parent queue: parent.min-resource >= + // Σ(child.min-resource). + Resource parentMinResource = queueResourceQuotas + .getConfiguredMinResource(nodeLabel); + if (!parentMinResource.equals(Resources.none()) && Resources.lessThan( + resourceCalculator, resourceByLabel, parentMinResource, minRes)) { + throw new IllegalArgumentException("Parent Queues" + " capacity: " + + parentMinResource + " is less than" + " to its children:" + + minRes + " for queue:" + queueName); + } } this.childQueues.clear(); @@ -690,11 +737,8 @@ private ResourceLimits getResourceLimitsOfChild(CSQueue child, child.getQueueResourceUsage().getUsed(nodePartition)); // Get child's max resource - Resource childConfiguredMaxResource = Resources.multiplyAndNormalizeDown( - resourceCalculator, - labelManager.getResourceByLabel(nodePartition, clusterResource), - child.getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition), - minimumAllocation); + Resource childConfiguredMaxResource = getEffectiveMaxCapacityByNormalizeDown( + nodePartition, minimumAllocation); // Child's limit should be capped by child configured max resource childLimit = @@ -830,6 +874,14 @@ public void updateClusterResource(Resource clusterResource, ResourceLimits resourceLimits) { try { writeLock.lock(); + + // Update effective capacity in all parent queue. + Set configuredNodelabels = csContext.getConfiguration() + .getConfiguredNodeLabels(getQueuePath()); + for (String label : configuredNodelabels) { + calculateEffectiveResourcesAndCapacity(label, clusterResource); + } + // Update all children for (CSQueue childQueue : childQueues) { // Get ResourceLimits of child queue before assign containers @@ -845,7 +897,99 @@ public void updateClusterResource(Resource clusterResource, writeLock.unlock(); } } - + + private void calculateEffectiveResourcesAndCapacity(String label, + Resource clusterResource) { + + // For root queue, ensure that max/min resource is updated to latest + // cluster resource. + Resource resourceByLabel = labelManager.getResourceByLabel(label, + clusterResource); + if (getQueueName().equals("root")) { + queueResourceQuotas.setConfiguredMinResource(label, resourceByLabel); + queueResourceQuotas.setConfiguredMaxResource(label, resourceByLabel); + } + + // Total configured min resources of direct children of queue + Resource configuredMinResources = Resource.newInstance(0L, 0); + for (CSQueue childQueue : getChildQueues()) { + Resources.addTo(configuredMinResources, + childQueue.getQueueResourceQuotas().getConfiguredMinResource(label)); + } + + // Factor to scale down effective resource: When cluster has sufficient + // resources, effective_min_resources will be same as configured + // min_resources. + float effectiveMinRatio = 1; + ResourceCalculator rc = this.csContext.getResourceCalculator(); + if (getQueueName().equals("root")) { + if (!resourceByLabel.equals(Resources.none()) && Resources.lessThan(rc, + clusterResource, resourceByLabel, configuredMinResources)) { + effectiveMinRatio = Resources.divide(rc, clusterResource, + configuredMinResources, resourceByLabel); + } + } else { + if (Resources.lessThan(rc, clusterResource, + queueResourceQuotas.getEffectiveMinResource(label), + configuredMinResources)) { + effectiveMinRatio = Resources.divide(rc, clusterResource, + queueResourceQuotas.getEffectiveMinResource(label), + configuredMinResources); + } + } + + // loop and do this for all child queues + for (CSQueue childQueue : getChildQueues()) { + Resource minResource = childQueue.getQueueResourceQuotas() + .getConfiguredMinResource(label); + + // Update effective resource (min/max) to each child queue. + if (childQueue.getCapacityConfigType() + .equals(CapacityConfigType.ABSOLUTE_RESOURCE)) { + childQueue.getQueueResourceQuotas().setEffectiveMinResource(label, + Resources.multiply(minResource, effectiveMinRatio)); + + // Max resource of a queue should be a minimum of {configuredMaxRes, + // parentMaxRes}. parentMaxRes could be configured value. But if not + // present could also be taken from effective max resource of parent. + Resource parentMaxRes = queueResourceQuotas + .getConfiguredMaxResource(label); + if (parentMaxRes.equals(Resources.none())) { + parentMaxRes = parent.getQueueResourceQuotas() + .getEffectiveMaxResource(label); + } + + // Minimum of {childMaxResource, parentMaxRes}. However if + // childMaxResource is empty, consider parent's max resource alone. + Resource childMaxResource = childQueue.getQueueResourceQuotas() + .getConfiguredMaxResource(label); + Resource effMaxResource = Resources.min(resourceCalculator, + resourceByLabel, childMaxResource.equals(Resources.none()) + ? parentMaxRes + : childMaxResource, + parentMaxRes); + childQueue.getQueueResourceQuotas().setEffectiveMaxResource(label, + Resources.clone(effMaxResource)); + } else { + childQueue.getQueueResourceQuotas().setEffectiveMinResource(label, + Resources.multiply(resourceByLabel, + childQueue.getQueueCapacities().getAbsoluteCapacity(label))); + childQueue.getQueueResourceQuotas().setEffectiveMaxResource(label, + Resources.multiply(resourceByLabel, childQueue.getQueueCapacities() + .getAbsoluteMaximumCapacity(label))); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Updating effective min resource for queue:" + + childQueue.getQueueName() + " as effMinResource=" + + childQueue.getQueueResourceQuotas().getEffectiveMinResource(label) + + "and Updating effective max resource as effMaxResource=" + + childQueue.getQueueResourceQuotas() + .getEffectiveMaxResource(label)); + } + } + } + @Override public List getChildQueues() { try { @@ -975,9 +1119,21 @@ void allocateResource(Resource clusterResource, * When this happens, we have to preempt killable container (on same or different * nodes) of parent queue to avoid violating parent's max resource. */ - if (getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition) - < getQueueCapacities().getAbsoluteUsedCapacity(nodePartition)) { - killContainersToEnforceMaxQueueCapacity(nodePartition, clusterResource); + if (!queueResourceQuotas.getEffectiveMaxResource(nodePartition) + .equals(Resources.none())) { + if (Resources.lessThan(resourceCalculator, clusterResource, + queueResourceQuotas.getEffectiveMaxResource(nodePartition), + queueUsage.getUsed(nodePartition))) { + killContainersToEnforceMaxQueueCapacity(nodePartition, + clusterResource); + } + } else { + if (getQueueCapacities() + .getAbsoluteMaximumCapacity(nodePartition) < getQueueCapacities() + .getAbsoluteUsedCapacity(nodePartition)) { + killContainersToEnforceMaxQueueCapacity(nodePartition, + clusterResource); + } } } finally { writeLock.unlock(); @@ -994,8 +1150,7 @@ private void killContainersToEnforceMaxQueueCapacity(String partition, Resource partitionResource = labelManager.getResourceByLabel(partition, null); - Resource maxResource = Resources.multiply(partitionResource, - getQueueCapacities().getAbsoluteMaximumCapacity(partition)); + Resource maxResource = getEffectiveMaxCapacity(partition); while (Resources.greaterThan(resourceCalculator, partitionResource, queueUsage.getUsed(partition), maxResource)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java index c2134eb..97740ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java @@ -630,10 +630,8 @@ private Resource computeUserLimit(String userName, Resource clusterResource, * * If we're running over capacity, then its (usedResources + required) * (which extra resources we are allocating) */ - Resource queueCapacity = Resources.multiplyAndNormalizeUp( - resourceCalculator, partitionResource, - lQueue.getQueueCapacities().getAbsoluteCapacity(nodePartition), - lQueue.getMinimumAllocation()); + Resource queueCapacity = lQueue.getEffectiveCapacityByNormalizeDown( + nodePartition, lQueue.getMinimumAllocation()); /* * Assume we have required resource equals to minimumAllocation, this can diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java index 0544387..7b28f8c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java @@ -20,9 +20,11 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; import java.util.Collections; @@ -121,6 +123,15 @@ public int compare(CSQueue q1, CSQueue q2) { // For queue with same used ratio / priority, queue with higher configured // capacity goes first if (0 == rc) { + Resource minEffRes1 = q1.getQueueResourceQuotas() + .getEffectiveMinResource(p); + Resource minEffRes2 = q2.getQueueResourceQuotas() + .getEffectiveMinResource(p); + if (!minEffRes1.equals(Resources.none()) + && !minEffRes2.equals(Resources.none())) { + return minEffRes2.compareTo(minEffRes1); + } + float abs1 = q1.getQueueCapacities().getAbsoluteCapacity(p); float abs2 = q2.getQueueCapacities().getAbsoluteCapacity(p); return Float.compare(abs2, abs1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java index 22705cc..86b2fea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java @@ -62,6 +62,8 @@ protected long pendingContainers; protected QueueCapacitiesInfo capacities; protected ResourcesInfo resources; + protected ResourceInfo minEffectiveCapacity; + protected ResourceInfo maxEffectiveCapacity; CapacitySchedulerQueueInfo() { }; @@ -105,6 +107,11 @@ ResourceUsage queueResourceUsage = q.getQueueResourceUsage(); populateQueueResourceUsage(queueResourceUsage); + + minEffectiveCapacity = new ResourceInfo( + q.getQueueResourceQuotas().getEffectiveMinResource()); + maxEffectiveCapacity = new ResourceInfo( + q.getQueueResourceQuotas().getEffectiveMaxResource()); } protected void populateQueueResourceUsage(ResourceUsage queueResourceUsage) { @@ -200,4 +207,12 @@ public QueueCapacitiesInfo getCapacities() { public ResourcesInfo getResources() { return resources; } + + public ResourceInfo getMinEffectiveCapacity(){ + return minEffectiveCapacity; + } + + public ResourceInfo getMaxEffectiveCapacity(){ + return maxEffectiveCapacity; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceConfiguration.java new file mode 100644 index 0000000..f729ae8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceConfiguration.java @@ -0,0 +1,173 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.junit.Assert; +import org.junit.Test; + +public class TestAbsoluteResourceConfiguration { + + private static final int GB = 1024; + + private static final String QUEUEA = "queueA"; + private static final String QUEUEB = "queueB"; + private static final String QUEUEC = "queueC"; + + private static final String QUEUEA_FULL = CapacitySchedulerConfiguration.ROOT + + "." + QUEUEA; + private static final String QUEUEB_FULL = CapacitySchedulerConfiguration.ROOT + + "." + QUEUEB; + private static final String QUEUEC_FULL = CapacitySchedulerConfiguration.ROOT + + "." + QUEUEC; + + private static final Resource QUEUE_A_MINRES = Resource.newInstance(100 * GB, + 10); + private static final Resource QUEUE_A_MAXRES = Resource.newInstance(200 * GB, + 30); + private static final Resource QUEUE_B_MINRES = Resource.newInstance(50 * GB, + 10); + private static final Resource QUEUE_B_MAXRES = Resource.newInstance(150 * GB, + 30); + private static final Resource QUEUE_C_MINRES = Resource.newInstance(50 * GB, + 10); + private static final Resource QUEUE_C_MAXRES = Resource.newInstance(150 * GB, + 20); + + private static Set resourceTypes = new HashSet<>( + Arrays.asList("memory", "vcores")); + + private CapacitySchedulerConfiguration setupQueueConfiguration( + boolean isCapacityNeeded) { + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{QUEUEA, QUEUEB, QUEUEC}); + + // Set default capacities like normal configuration. + if (isCapacityNeeded) { + csConf.setCapacity(QUEUEA_FULL, 50f); + csConf.setCapacity(QUEUEB_FULL, 25f); + csConf.setCapacity(QUEUEC_FULL, 25f); + } + + return csConf; + } + + private CapacitySchedulerConfiguration setupMinMaxResourceConfiguration( + CapacitySchedulerConfiguration csConf) { + // Update min/max resource to queueA/B/C + csConf.setMinimumResourceRequirement("", QUEUEA_FULL, QUEUE_A_MINRES); + csConf.setMinimumResourceRequirement("", QUEUEB_FULL, QUEUE_B_MINRES); + csConf.setMinimumResourceRequirement("", QUEUEC_FULL, QUEUE_C_MINRES); + + csConf.setMaximumResourceRequirement("", QUEUEA_FULL, QUEUE_A_MAXRES); + csConf.setMaximumResourceRequirement("", QUEUEB_FULL, QUEUE_B_MAXRES); + csConf.setMaximumResourceRequirement("", QUEUEC_FULL, QUEUE_C_MAXRES); + + return csConf; + } + + @Test + public void testSimpleMinMaxResourceConfigurartionPerQueue() { + + CapacitySchedulerConfiguration csConf = setupQueueConfiguration(true); + setupMinMaxResourceConfiguration(csConf); + + Assert.assertEquals("Min resource configured for QUEUEA is not correct", + QUEUE_A_MINRES, + csConf.getMinimumResourceRequirement("", QUEUEA_FULL, resourceTypes)); + Assert.assertEquals("Max resource configured for QUEUEA is not correct", + QUEUE_A_MAXRES, + csConf.getMaximumResourceRequirement("", QUEUEA_FULL, resourceTypes)); + Assert.assertEquals("Min resource configured for QUEUEB is not correct", + QUEUE_B_MINRES, + csConf.getMinimumResourceRequirement("", QUEUEB_FULL, resourceTypes)); + Assert.assertEquals("Max resource configured for QUEUEB is not correct", + QUEUE_B_MAXRES, + csConf.getMaximumResourceRequirement("", QUEUEB_FULL, resourceTypes)); + Assert.assertEquals("Min resource configured for QUEUEC is not correct", + QUEUE_C_MINRES, + csConf.getMinimumResourceRequirement("", QUEUEC_FULL, resourceTypes)); + Assert.assertEquals("Max resource configured for QUEUEC is not correct", + QUEUE_C_MAXRES, + csConf.getMaximumResourceRequirement("", QUEUEC_FULL, resourceTypes)); + } + + @Test + public void testEffectiveMinMaxResourceConfigurartionPerQueue() + throws Exception { + // create conf with basic queue configuration. + CapacitySchedulerConfiguration csConf = setupQueueConfiguration(false); + setupMinMaxResourceConfiguration(csConf); + + csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + @SuppressWarnings("resource") + MockRM rm = new MockRM(csConf); + rm.start(); + + // Add few nodes + rm.registerNode("127.0.0.1:1234", 250 * GB, 40); + + // Get queue object to verify min/max resource configuration. + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + LeafQueue qA = (LeafQueue) cs.getQueue(QUEUEA); + Assert.assertNotNull(qA); + Assert.assertEquals("Min resource configured for QUEUEA is not correct", + QUEUE_A_MINRES, qA.queueResourceQuotas.getConfiguredMinResource()); + Assert.assertEquals("Max resource configured for QUEUEA is not correct", + QUEUE_A_MAXRES, qA.queueResourceQuotas.getConfiguredMaxResource()); + Assert.assertEquals("Effective Min resource for QUEUEA is not correct", + QUEUE_A_MINRES, qA.queueResourceQuotas.getEffectiveMinResource()); + Assert.assertEquals("Effective Max resource for QUEUEA is not correct", + QUEUE_A_MAXRES, qA.queueResourceQuotas.getEffectiveMaxResource()); + + LeafQueue qB = (LeafQueue) cs.getQueue(QUEUEB); + Assert.assertNotNull(qB); + Assert.assertEquals("Min resource configured for QUEUEB is not correct", + QUEUE_B_MINRES, qB.queueResourceQuotas.getConfiguredMinResource()); + Assert.assertEquals("Max resource configured for QUEUEB is not correct", + QUEUE_B_MAXRES, qB.queueResourceQuotas.getConfiguredMaxResource()); + Assert.assertEquals("Effective Min resource for QUEUEB is not correct", + QUEUE_B_MINRES, qB.queueResourceQuotas.getEffectiveMinResource()); + Assert.assertEquals("Effective Max resource for QUEUEB is not correct", + QUEUE_B_MAXRES, qB.queueResourceQuotas.getEffectiveMaxResource()); + + LeafQueue qC = (LeafQueue) cs.getQueue(QUEUEC); + Assert.assertNotNull(qC); + Assert.assertEquals("Min resource configured for QUEUEC is not correct", + QUEUE_C_MINRES, qC.queueResourceQuotas.getConfiguredMinResource()); + Assert.assertEquals("Max resource configured for QUEUEC is not correct", + QUEUE_C_MAXRES, qC.queueResourceQuotas.getConfiguredMaxResource()); + Assert.assertEquals("Effective Min resource for QUEUEC is not correct", + QUEUE_C_MINRES, qC.queueResourceQuotas.getEffectiveMinResource()); + Assert.assertEquals("Effective Max resource for QUEUEC is not correct", + QUEUE_C_MAXRES, qC.queueResourceQuotas.getEffectiveMaxResource()); + + rm.stop(); + } +} -- 2.10.1 (Apple Git-78)