diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java new file mode 100644 index 0000000..fec5cf4 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; + + +//Some policies will use multiple comparators joined together +class CompoundComparator implements Comparator { + + List> comparators; + + CompoundComparator(List> comparators) { + this.comparators = comparators; + } + + @Override + public int compare(SchedulableEntity r1, SchedulableEntity r2) { + for (Comparator comparator : comparators) { + int result = comparator.compare(r1, r2); + if (result != 0) return result; + } + return 0; + } +} + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java new file mode 100644 index 0000000..beedfac --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; + +/** + * An OrderingPolicy which orders SchedulableEntities for fairness (see + * FairScheduler + * FairSharePolicy), generally, processes with lesser usage are lesser. If + * sizedBasedWeight is set to true then an application with high demand + * may be prioritized ahead of an application with less usage. This + * is to offset the tendency to favor small apps, which could result in + * starvation for large apps if many small ones enter and leave the queue + * continuously (optional, default false) + */ +public class FairOrderingPolicy extends AbstractComparatorOrderingPolicy { + + private static final Log LOG = LogFactory.getLog(FairOrderingPolicy.class); + + protected class FairComparator implements Comparator { + @Override + public int compare(SchedulableEntity r1, SchedulableEntity r2) { + int res = (int) Math.signum( getMagnitude(r1) - getMagnitude(r2) ); + return res; + } + } + + private CompoundComparator fairComparator; + + private boolean sizeBasedWeight = false; + + public FairOrderingPolicy() { + List> comparators = + new ArrayList>(); + comparators.add(new FairComparator()); + comparators.add(new FifoOrderingPolicy.FifoComparator()); + fairComparator = new CompoundComparator( + comparators + ); + setComparator(fairComparator); + } + + public double getMagnitude(SchedulableEntity r) { + double mag = r.getSchedulingResourceUsage().getUsed( + CommonNodeLabelsManager.ANY).getMemory(); + if (sizeBasedWeight) { + double weight = Math.log1p(r.getSchedulingResourceUsage().getDemand( + CommonNodeLabelsManager.ANY).getMemory()) / Math.log(2); + mag = mag / weight; + } + return mag; + } + + public boolean getSizeBasedWeight() { + return sizeBasedWeight; + } + + public void setSizeBasedWeight(boolean sizeBasedWeight) { + this.sizeBasedWeight = sizeBasedWeight; + } + + @Override + public void configure(String conf) { + if (conf.indexOf("sizeBasedWeight") >= 0) { + sizeBasedWeight = true; + } + } + + @Override + public void containerAllocated(S schedulableEntity, + RMContainer r) { + reorderSchedulableEntity(schedulableEntity); + } + + @Override + public void containerReleased(S schedulableEntity, + RMContainer r) { + reorderSchedulableEntity(schedulableEntity); + } + + @Override + public String getInfo() { + String sbw = sizeBasedWeight ? " with sizeBasedWeight" : ""; + return "FairOrderingPolicy" + sbw; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java new file mode 100644 index 0000000..4192b82 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; + +public class TestFairOrderingPolicy { + + final static int GB = 1024; + + @Test + public void testSimpleComparison() { + FairOrderingPolicy policy = new FairOrderingPolicy(); + MockSchedulableEntity r1 = new MockSchedulableEntity(); + MockSchedulableEntity r2 = new MockSchedulableEntity(); + + Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0); + + //consumption + r1.setUsed(Resources.createResource(1, 0)); + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + r1.getSchedulingResourceUsage()); + Assert.assertTrue(policy.getComparator().compare(r1, r2) > 0); + } + + @Test + public void testSizeBasedWeight() { + FairOrderingPolicy policy = new FairOrderingPolicy(); + policy.setSizeBasedWeight(true); + MockSchedulableEntity r1 = new MockSchedulableEntity(); + MockSchedulableEntity r2 = new MockSchedulableEntity(); + + //No changes, equal + Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0); + + r1.setUsed(Resources.createResource(4 * GB)); + r2.setUsed(Resources.createResource(4 * GB)); + + r1.setPending(Resources.createResource(4 * GB)); + r2.setPending(Resources.createResource(4 * GB)); + + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + r1.getSchedulingResourceUsage()); + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + r2.getSchedulingResourceUsage()); + + //Same, equal + Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0); + + r2.setUsed(Resources.createResource(5 * GB)); + r2.setPending(Resources.createResource(5 * GB)); + + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + r2.getSchedulingResourceUsage()); + + //More demand and consumption, but not enough more demand to overcome + //additional consumption + Assert.assertTrue(policy.getComparator().compare(r1, r2) < 0); + + //High demand, enough to reverse sbw + r2.setPending(Resources.createResource(100 * GB)); + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + r2.getSchedulingResourceUsage()); + Assert.assertTrue(policy.getComparator().compare(r1, r2) > 0); + } + + @Test + public void testIterators() { + OrderingPolicy schedOrder = + new FairOrderingPolicy(); + + MockSchedulableEntity msp1 = new MockSchedulableEntity(); + MockSchedulableEntity msp2 = new MockSchedulableEntity(); + MockSchedulableEntity msp3 = new MockSchedulableEntity(); + + msp1.setId("1"); + msp2.setId("2"); + msp3.setId("3"); + + msp1.setUsed(Resources.createResource(3)); + msp2.setUsed(Resources.createResource(2)); + msp3.setUsed(Resources.createResource(1)); + + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + msp1.getSchedulingResourceUsage()); + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + msp2.getSchedulingResourceUsage()); + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + msp2.getSchedulingResourceUsage()); + + schedOrder.addSchedulableEntity(msp1); + schedOrder.addSchedulableEntity(msp2); + schedOrder.addSchedulableEntity(msp3); + + + //Assignment, least to greatest consumption + checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "2", "1"}); + + //Preemption, greatest to least + checkIds(schedOrder.getPreemptionIterator(), new String[]{"1", "2", "3"}); + + //Change value without inform, should see no change + msp2.setUsed(Resources.createResource(6)); + checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "2", "1"}); + checkIds(schedOrder.getPreemptionIterator(), new String[]{"1", "2", "3"}); + + //Do inform, will reorder + schedOrder.containerAllocated(msp2, null); + checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "1", "2"}); + checkIds(schedOrder.getPreemptionIterator(), new String[]{"2", "1", "3"}); + } + + public void checkIds(Iterator si, + String[] ids) { + for (int i = 0;i < ids.length;i++) { + Assert.assertEquals(si.next().getId(), + ids[i]); + } + } + +}