diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 5dfef73..09d2230 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -79,6 +79,7 @@ private Resource demand = Resources.createResource(0); private final FairScheduler scheduler; private Resource fairShare = Resources.createResource(0, 0); + private SchedulingAttributes attributes; // Preemption related variables private final Object preemptionVariablesLock = new Object(); @@ -120,6 +121,7 @@ public FSAppAttempt(FairScheduler scheduler, this.lastTimeAtFairShare = this.startTime; this.appPriority = Priority.newInstance(1); this.resourceWeights = new ResourceWeights(); + buildSchedulingAttributes(); } ResourceWeights getResourceWeights() { @@ -1378,4 +1380,14 @@ public String toString() { public boolean isPreemptable() { return getQueue().isPreemptable(); } + + @Override + public void buildSchedulingAttributes() { + attributes = new SchedulingAttributes(this); + } + + @Override + public SchedulingAttributes getSchedulingAttributes() { + return attributes; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index b911a1a..7fe3b47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -391,12 +391,13 @@ public Resource assignContainer(FSSchedulerNode node) { */ private TreeSet fetchAppsWithDemand(boolean assignment) { TreeSet pendingForResourceApps = - new TreeSet<>(policy.getComparator()); + new TreeSet<>(policy.getOptimizedComparator()); readLock.lock(); try { for (FSAppAttempt app : runnableApps) { if (!Resources.isNone(app.getPendingDemand()) && (assignment || app.shouldCheckForStarvation())) { + app.buildSchedulingAttributes(); pendingForResourceApps.add(app); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 1016823..2b199b61 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -83,6 +83,7 @@ private long minSharePreemptionTimeout = Long.MAX_VALUE; private float fairSharePreemptionThreshold = 0.5f; private boolean preemptable = true; + private SchedulingAttributes attributes; public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { this.name = name; @@ -390,6 +391,16 @@ private void updatePreemptionVariables() { * Gets the children of this queue, if any. */ public abstract List getChildQueues(); + + @Override + public void buildSchedulingAttributes() { + attributes = new SchedulingAttributes(this); + } + + @Override + public SchedulingAttributes getSchedulingAttributes() { + return attributes; + } /** * Adds all applications in the queue and its subqueues to the given collection. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java index fcdc056..10035f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java @@ -102,4 +102,12 @@ * false otherwise */ boolean isPreemptable(); + + /** Build the scheduling attributes of the schedulable, + * and it will be used in sort + */ + void buildSchedulingAttributes(); + + /** Get the scheduling attributes of the Schedulable. */ + SchedulingAttributes getSchedulingAttributes(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingAttributes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingAttributes.java new file mode 100644 index 0000000..40d6ecd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingAttributes.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +public class SchedulingAttributes { + private static final Resource ONE = Resources.createResource(1); + private static final DefaultResourceCalculator RESOURCE_CALCULATOR + = new DefaultResourceCalculator(); + private int demandCompareResult; + private boolean needy; + private double minShareRatio; + private boolean weightExceedZero; + private double memorySizePerWeight; + + public SchedulingAttributes(Schedulable sched) { + this.demandCompareResult = -1; + if (Resources.greaterThan(RESOURCE_CALCULATOR, null, + sched.getDemand(), Resources.none())) { + demandCompareResult = 1; + } else if (sched.getDemand().equals(Resources.none())) { + demandCompareResult = 0; + } + + Resource resourceUsage = sched.getResourceUsage(); + Resource minShare = Resources.min(RESOURCE_CALCULATOR, null, + sched.getMinShare(), sched.getDemand()); + + this.needy = Resources.lessThan(RESOURCE_CALCULATOR, null, + resourceUsage, minShare); + if (this.isNeedy()) { + this.minShareRatio = (double) resourceUsage.getMemorySize() / + Resources.max(RESOURCE_CALCULATOR, null, minShare, ONE) + .getMemorySize(); + } + + double weight = sched.getWeights().getWeight(ResourceType.MEMORY); + this.weightExceedZero = weight > 0.0; + + this.memorySizePerWeight = resourceUsage.getMemorySize(); + if (this.weightExceedZero) { + this.memorySizePerWeight /= weight; + } + } + + public int getDemandCompareResult() { + return demandCompareResult; + } + + public boolean isNeedy() { + return needy; + } + + public boolean isWeightExceedZero() { + return weightExceedZero; + } + + public double getMinShareRatio() { + return minShareRatio; + } + + public double getMemorySizePerWeight() { + return memorySizePerWeight; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java index 1fed9b0..bf90946 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java @@ -155,6 +155,14 @@ public void initialize(FSContext fsContext) {} public abstract Comparator getComparator(); /** + * The comparator returned by this method is to be used for sorting the + * {@link Schedulable}s in that queue. + * + * @return the comparator to sort by + */ + public abstract Comparator getOptimizedComparator(); + + /** * Computes and updates the shares of {@link Schedulable}s as per * the {@link SchedulingPolicy}, to be used later for scheduling decisions. * The shares computed are instantaneous and only consider queues with diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index 72377b0..a9a03d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -65,6 +65,11 @@ public String getName() { } @Override + public Comparator getOptimizedComparator() { + return null; + } + + @Override public ResourceCalculator getResourceCalculator() { return CALCULATOR; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index 0ef90a1..fd6182b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingAttributes; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; @@ -48,7 +49,9 @@ private static final DefaultResourceCalculator RESOURCE_CALCULATOR = new DefaultResourceCalculator(); private static final FairShareComparator COMPARATOR = - new FairShareComparator(); + new FairShareComparator(); + private static final OptimizedFairShareComparator OPTIMIZED_COMPARATOR = + new OptimizedFairShareComparator(); @Override public String getName() { @@ -189,12 +192,67 @@ private int compareFairShareUsage(Schedulable s1, Schedulable s2, } } + private static class OptimizedFairShareComparator implements + Comparator, Serializable { + @Override + public int compare(Schedulable s1, Schedulable s2) { + SchedulingAttributes attr1 = s1.getSchedulingAttributes(); + SchedulingAttributes attr2 = s2.getSchedulingAttributes(); + + int res = 0; + if (attr1.getDemandCompareResult() > 0 && + attr2.getDemandCompareResult() == 0) { + res = -1; + } else if (attr1.getDemandCompareResult() == 0 && + attr2.getDemandCompareResult() > 0) { + res = 1; + } + + if (res == 0) { + if (attr1.isNeedy() && !attr2.isNeedy()) { + res = -1; + } else if (!attr1.isNeedy() && attr2.isNeedy()) { + res = 1; + } else if (attr1.isNeedy() && attr2.isNeedy()) { + res = (int) Math.signum( + attr1.getMinShareRatio() - attr2.getMinShareRatio()); + } + } + + if (res == 0) { + if (attr1.isWeightExceedZero() && !attr2.isWeightExceedZero()) { + res = 1; + } else if (!attr1.isWeightExceedZero() && attr2.isWeightExceedZero()) { + res = -1; + } + } + + if (res == 0) { + res = (int) Math.signum( + attr1.getMemorySizePerWeight() - attr2.getMemorySizePerWeight()); + } + if (res == 0) { + res = (int) Math.signum(s1.getStartTime() - s2.getStartTime()); + } + + if (res == 0) { + res = s1.getName().compareTo(s2.getName()); + } + return res; + } + } + @Override public Comparator getComparator() { return COMPARATOR; } @Override + public Comparator getOptimizedComparator() { + return OPTIMIZED_COMPARATOR; + } + + @Override public ResourceCalculator getResourceCalculator() { return RESOURCE_CALCULATOR; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java index 7dd45cb..10ce345 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java @@ -80,6 +80,11 @@ public int compare(Schedulable s1, Schedulable s2) { } @Override + public Comparator getOptimizedComparator() { + return null; + } + + @Override public ResourceCalculator getResourceCalculator() { return CALCULATOR; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java index 36ff85e..90517f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java @@ -142,4 +142,13 @@ public void updateDemand() {} public boolean isPreemptable() { return true; } + + @Override + public void buildSchedulingAttributes() { + } + + @Override + public SchedulingAttributes getSchedulingAttributes() { + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java index 3a16454..8e081de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java @@ -226,6 +226,7 @@ private void swap(Schedulable[] array, int x, int y) { private long startTime; private Resource usage; private ResourceWeights weights; + private SchedulingAttributes attributes; public MockSchedulable(Resource minShare, Resource demand, String name, long startTime, Resource usage, ResourceWeights weights) { @@ -235,6 +236,7 @@ public MockSchedulable(Resource minShare, Resource demand, String name, this.startTime = startTime; this.usage = usage; this.weights = weights; + attributes = new SchedulingAttributes(this); } @Override @@ -308,6 +310,16 @@ public String toString() { public boolean isPreemptable() { return true; } + + @Override + public void buildSchedulingAttributes() { + attributes = new SchedulingAttributes(this); + } + + @Override + public SchedulingAttributes getSchedulingAttributes() { + return attributes; + } } }