diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java new file mode 100644 index 0000000..1e146f3 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; + +/** + * An OrderingPolicy which orders SchedulableEntities for fairness (see + * FairScheduler + * FairSharePolicy), generally, processes with lesser usage are lesser. If + * sizedBasedWeight is set to true then an application with high demand + * may be prioritized ahead of an application with less usage. This + * is to offset the tendency to favor small apps, which could result in + * starvation for large apps if many small ones enter and leave the queue + * continuously (optional, default false) + */ +public class FairOrderingPolicy extends AbstractComparatorOrderingPolicy { + + private static final Log LOG = LogFactory.getLog(FairOrderingPolicy.class); + + protected class FairComparator implements Comparator { + @Override + public int compare(SchedulableEntity r1, SchedulableEntity r2) { + int res = (int) Math.signum( getMagnitude(r1) - getMagnitude(r2) ); + return res; + } + } + + private AbstractComparatorOrderingPolicy.CompoundComparator fairComparator; + + private boolean sizeBasedWeight = false; + + public FairOrderingPolicy() { + List> comparators = + new ArrayList>(); + comparators.add(new FairComparator()); + comparators.add(new FifoOrderingPolicy.FifoComparator()); + fairComparator = new AbstractComparatorOrderingPolicy.CompoundComparator( + comparators + ); + setComparator(fairComparator); + } + + public double getMagnitude(SchedulableEntity r) { + double mag = r.getSchedulingConsumption().getMemory(); + if (sizeBasedWeight) { + double weight = Math.log1p(r.getSchedulingDemand().getMemory()) / Math.log(2); + mag = mag / weight; + } + return mag; + } + + public boolean getSizeBasedWeight() { + return sizeBasedWeight; + } + + public void setSizeBasedWeight(boolean sizeBasedWeight) { + this.sizeBasedWeight = sizeBasedWeight; + } + + @Override + public void configure(String conf) { + if (conf.indexOf("sizeBasedWeight") >= 0) { + sizeBasedWeight = true; + } + } + + @Override + public void containerAllocated(S schedulableEntity, + RMContainer r) { + reorderSchedulableEntity(schedulableEntity); + } + + @Override + public void containerReleased(S schedulableEntity, + RMContainer r) { + reorderSchedulableEntity(schedulableEntity); + } + + @Override + public String getInfo() { + String sbw = sizeBasedWeight ? " with sizeBasedWeight" : ""; + return "FairOrderingPolicy" + sbw; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java new file mode 100644 index 0000000..d92a9a6 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; + +public class TestFairOrderingPolicy { + + final static int GB = 1024; + + @Test + public void testSimpleComparison() { + FairOrderingPolicy policy = new FairOrderingPolicy(); + MockSchedulableEntity r1 = new MockSchedulableEntity(); + MockSchedulableEntity r2 = new MockSchedulableEntity(); + + Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0); + + //consumption + r1.setConsumption(Resources.createResource(1, 0)); + r1.updateSchedulingState(); + Assert.assertTrue(policy.getComparator().compare(r1, r2) > 0); + } + + @Test + public void testSizeBasedWeight() { + FairOrderingPolicy policy = new FairOrderingPolicy(); + policy.setSizeBasedWeight(true); + MockSchedulableEntity r1 = new MockSchedulableEntity(); + MockSchedulableEntity r2 = new MockSchedulableEntity(); + + //No changes, equal + Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0); + + r1.setConsumption(Resources.createResource(4 * GB)); + r2.setConsumption(Resources.createResource(4 * GB)); + + r1.setDemand(Resources.createResource(4 * GB)); + r2.setDemand(Resources.createResource(4 * GB)); + + r1.updateSchedulingState(); + r2.updateSchedulingState(); + + //Same, equal + Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0); + + r2.setConsumption(Resources.createResource(5 * GB)); + r2.setDemand(Resources.createResource(5 * GB)); + + r2.updateSchedulingState(); + + //More demand and consumption, but not enough more demand to overcome + //additional consumption + Assert.assertTrue(policy.getComparator().compare(r1, r2) < 0); + + //High demand, enough to reverse sbw + r2.setDemand(Resources.createResource(100 * GB)); + r2.updateSchedulingState(); + Assert.assertTrue(policy.getComparator().compare(r1, r2) > 0); + } + + @Test + public void testIterators() { + OrderingPolicy schedOrder = + new FairOrderingPolicy(); + + MockSchedulableEntity msp1 = new MockSchedulableEntity(); + MockSchedulableEntity msp2 = new MockSchedulableEntity(); + MockSchedulableEntity msp3 = new MockSchedulableEntity(); + + msp1.setConsumption(Resources.createResource(3)); + msp2.setConsumption(Resources.createResource(2)); + msp3.setConsumption(Resources.createResource(1)); + + msp1.updateSchedulingState(); + msp2.updateSchedulingState(); + msp3.updateSchedulingState(); + + schedOrder.addSchedulableEntity(msp1); + schedOrder.addSchedulableEntity(msp2); + schedOrder.addSchedulableEntity(msp3); + + + //Assignment, least to greatest consumption + checkConsumption(schedOrder.getAssignmentIterator(), new int[]{1, 2, 3}); + + //Preemption, greatest to least + checkConsumption(schedOrder.getPreemptionIterator(), new int[]{3, 2, 1}); + + //Change value without inform, should see no change + msp2.setConsumption(Resources.createResource(6)); + checkConsumption(schedOrder.getAssignmentIterator(), new int[]{1, 2, 3}); + checkConsumption(schedOrder.getPreemptionIterator(), new int[]{3, 2, 1}); + + //Do inform, will reorder + schedOrder.containerAllocated(msp2, null); + checkConsumption(schedOrder.getAssignmentIterator(), new int[]{1, 3, 6}); + checkConsumption(schedOrder.getPreemptionIterator(), new int[]{6, 3, 1}); + } + + public void checkConsumption(Iterator si, + int[] demands) { + for (int i = 0;i < demands.length;i++) { + Assert.assertEquals(si.next().getSchedulingConsumption().getMemory(), + demands[i]); + } + } + +}