diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index 42d0420..194be2a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -74,6 +74,7 @@ public String getName() { public int compare(Schedulable s1, Schedulable s2) { double minShareRatio1, minShareRatio2; double useToWeightRatio1, useToWeightRatio2; + double weight1, weight2; Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null, s1.getMinShare(), s1.getDemand()); Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null, @@ -86,10 +87,28 @@ public int compare(Schedulable s1, Schedulable s2) { / Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemorySize(); minShareRatio2 = (double) s2.getResourceUsage().getMemorySize() / Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemorySize(); - useToWeightRatio1 = s1.getResourceUsage().getMemorySize() / - s1.getWeights().getWeight(ResourceType.MEMORY); - useToWeightRatio2 = s2.getResourceUsage().getMemorySize() / - s2.getWeights().getWeight(ResourceType.MEMORY); + + weight1 = s1.getWeights().getWeight(ResourceType.MEMORY); + weight2 = s2.getWeights().getWeight(ResourceType.MEMORY); + if (weight1 > 0.0 && weight2 > 0.0) { + useToWeightRatio1 = s1.getResourceUsage().getMemorySize() / weight1; + useToWeightRatio2 = s2.getResourceUsage().getMemorySize() / weight2; + } else { // Either weight1 or weight2 equals to 0 + if (weight1 == weight2) { // If they have same weight, just compare usage + useToWeightRatio1 = s1.getResourceUsage().getMemorySize(); + useToWeightRatio2 = s2.getResourceUsage().getMemorySize(); + } else { + // When we arrive here, let's give slots to the one weight != 0 + if (weight1 != 0.0) { + useToWeightRatio1 = 0.0; + useToWeightRatio2 = 1.0; + } else { + useToWeightRatio1 = 1.0; + useToWeightRatio2 = 0.0; + } + } + } + int res = 0; if (s1Needy && !s2Needy) res = -1; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestFairShareComparator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestFairShareComparator.java new file mode 100644 index 0000000..f382411 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestFairShareComparator.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies; + +import java.util.Collection; +import java.util.Comparator; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestFairShareComparator { + + private static final Log LOG = LogFactory.getLog(TestFairShareComparator.class); + + private Comparator fairShareComparator; + + /* + * Use the following data collections to generate three Schedulable objects, + * and then verify the transitivity of FairShareComparator. We can get + * 100% code coverage by DFS. + * + */ + private Resource minShare = Resource.newInstance(0, 1); + + private Resource demand = Resource.newInstance(4, 1); + + private String[] nameCollection = {"A", "B", "C"}; + + private long[] startTimeColloection = {1L, 2L, 3L}; + + private Resource[] usageCollection = { + Resource.newInstance(0, 1), + Resource.newInstance(2, 1), + Resource.newInstance(4, 1) + }; + + private ResourceWeights[] weightsCollection = { + new ResourceWeights(0.0f), + new ResourceWeights(1.0f), + new ResourceWeights(2.0f) + }; + + + + @Before + public void setup() { + FairSharePolicy policy = new FairSharePolicy(); + fairShareComparator = policy.getComparator(); + } + + @Test + public void testTransitivity() { + generateAndTest(new Stack()); + } + + private void generateAndTest(Stack genSchedulable) { + + if (genSchedulable.size() == 3) { + // We get three Schedulable objects, let's use them to check the comparator. + Assert.assertTrue("The comparator must ensure transitivity", + checkTransitivity(genSchedulable)); + return; + } + + for (int i = 0; i < nameCollection.length; i++) { + for (int j = 0; j < startTimeColloection.length; j++) { + for (int k = 0; k < usageCollection.length; k++) { + for (int t = 0; t < weightsCollection.length; t++) { + genSchedulable.push(createSchedulable(i, j, k, t)); + generateAndTest(genSchedulable); + genSchedulable.pop(); + } + } + } + } + + } + + private Schedulable createSchedulable( + int nameIdx, int startTimeIdx, int usageIdx, int weightsIdx) { + return new MockSchedulable(minShare, demand, nameCollection[nameIdx], + startTimeColloection[startTimeIdx],usageCollection[usageIdx], + weightsCollection[weightsIdx]); + } + + private boolean checkTransitivity(Collection schedulableObjs) { + Assert.assertEquals(3, schedulableObjs.size()); + Schedulable[] copy = schedulableObjs.toArray(new Schedulable[3]); + + if (fairShareComparator.compare(copy[0], copy[1]) > 0) { + swap(copy, 0, 1); + } + + if (fairShareComparator.compare(copy[1], copy[2]) > 0) { + swap(copy, 1, 2); + + if (fairShareComparator.compare(copy[0], copy[1]) > 0) { + swap(copy, 0, 1); + } + } + + // Here, we have got the condition "copy[0] <= copy[1] && copy[1] <= copy[2]". + // Just check copy[0] <= copy[2] + if (fairShareComparator.compare(copy[0], copy[2]) <= 0) { + return true; + } else { + LOG.fatal("Failure data: " + copy[0] + " " + copy[1] + " " + copy[2]); + return false; + } + } + + private void swap(Schedulable[] array, int x, int y) { + Schedulable tmp = array[x]; + array[x] = array[y]; + array[y] = tmp; + } + + + + private class MockSchedulable implements Schedulable { + + private Resource minShare; + private Resource demand; + private String name; + private long startTime; + private Resource usage; + private ResourceWeights weights; + + + public MockSchedulable(Resource minShare, Resource demand, String name, long startTime, + Resource usage, ResourceWeights weights) { + super(); + this.minShare = minShare; + this.demand = demand; + this.name = name; + this.startTime = startTime; + this.usage = usage; + this.weights = weights; + } + + @Override + public String getName() { + return name; + } + + @Override + public Resource getDemand() { + return demand; + } + + @Override + public Resource getResourceUsage() { + return usage; + } + + @Override + public Resource getMinShare() { + return minShare; + } + + @Override + public ResourceWeights getWeights() { + return weights; + } + + @Override + public long getStartTime() { + return startTime; + } + + @Override + public Resource getMaxShare() { + throw new UnsupportedOperationException(); + } + + @Override + public Priority getPriority() { + throw new UnsupportedOperationException(); + } + + @Override + public void updateDemand() { + throw new UnsupportedOperationException(); + } + + @Override + public Resource assignContainer(FSSchedulerNode node) { + throw new UnsupportedOperationException(); + } + + @Override + public RMContainer preemptContainer() { + throw new UnsupportedOperationException(); + } + + @Override + public Resource getFairShare() { + throw new UnsupportedOperationException(); + } + + @Override + public void setFairShare(Resource fairShare) { + throw new UnsupportedOperationException(); + } + + @Override + public String toString() { + return "{name:" + name + ", start:" + startTime + ", usage:" + usage + ", weights:" + weights + + ", demand:" + demand + ", minShare:" + minShare + "}"; + } + } +}