diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index bfaeba4..f3ee1e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.DominantResourceFairnessOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; @@ -132,6 +133,8 @@ public static final String FAIR_ORDERING_POLICY = "fair"; + public static final String DOMINANT_RESOURCE_FAIRNESS_ORDERING_POLICY = "dominant-resource-fairness"; + public static final String DEFAULT_ORDERING_POLICY = FIFO_ORDERING_POLICY; @Private @@ -412,6 +415,9 @@ public int getUserLimit(String queue) { if (policyType.trim().equals(FAIR_ORDERING_POLICY)) { policyType = FairOrderingPolicy.class.getName(); } + if (policyType.trim().equals(DOMINANT_RESOURCE_FAIRNESS_ORDERING_POLICY)) { + policyType = DominantResourceFairnessOrderingPolicy.class.getName(); + } try { orderingPolicy = (OrderingPolicy) Class.forName(policyType).newInstance(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 18b38f4..2c73aeb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1868,6 +1868,9 @@ public void updateClusterResource(Resource clusterResource, RMNodeLabelsManager.NO_LABEL, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); } + + // Inform the ordering policy + orderingPolicy.clusterResourceUpdated(clusterResource); } finally { writeLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java index b7cb1bf..2c39734 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; @@ -139,5 +140,8 @@ public abstract void containerReleased(S schedulableEntity, @Override public abstract String getInfo(); - + + @Override + public void clusterResourceUpdated(Resource r) {} + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/DominantResourceFairnessOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/DominantResourceFairnessOrderingPolicy.java new file mode 100644 index 0000000..29257c1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/DominantResourceFairnessOrderingPolicy.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; +import java.util.concurrent.ConcurrentSkipListSet; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; + +/** + * An OrderingPolicy which orders SchedulableEntities for Dominant Resource Fairness. + */ +public class DominantResourceFairnessOrderingPolicy extends AbstractComparatorOrderingPolicy { + + private static final Log LOG = LogFactory.getLog(DominantResourceFairnessOrderingPolicy.class); + + public static final String ENABLE_SIZE_BASED_WEIGHT = + "fair.enable-size-based-weight"; + + protected class FairComparator implements Comparator { + @Override + public int compare(final SchedulableEntity r1, final SchedulableEntity r2) { + Resource lhs = r1.getSchedulingResourceUsage().getCachedUsed(CommonNodeLabelsManager.ANY); + Resource rhs = r2.getSchedulingResourceUsage().getCachedUsed(CommonNodeLabelsManager.ANY); + if (isInvalidDivisor()) { + if ((lhs.getMemory() < rhs.getMemory() && lhs.getVirtualCores() > rhs.getVirtualCores()) + || (lhs.getMemory() > rhs.getMemory() && lhs.getVirtualCores() < rhs.getVirtualCores())) { + return 0; + } else if (lhs.getMemory() > rhs.getMemory() + || lhs.getVirtualCores() > rhs.getVirtualCores()) { + return 1; + } else if (lhs.getMemory() < rhs.getMemory() + || lhs.getVirtualCores() < rhs.getVirtualCores()) { + return -1; + } + } + + double r1v = getValue(r1, true); + double r2v = getValue(r2, true); + if (r1v < r2v) { + return -1; + } else if (r1v > r2v) { + return 1; + } else { + r1v = getValue(r1, false); + r2v = getValue(r2, false); + if (r1v < r2v) { + return -1; + } else if (r1v > r2v) { + return 1; + } + } + + return 0; + } + } + + private CompoundComparator fairComparator; + + private boolean sizeBasedWeight = false; + + private Resource clusterResource = Resource.newInstance(0, 0); + + public DominantResourceFairnessOrderingPolicy() { + List> comparators = + new ArrayList>(); + comparators.add(new FairComparator()); + comparators.add(new FifoComparator()); + fairComparator = new CompoundComparator( + comparators + ); + this.comparator = fairComparator; + this.schedulableEntities = new ConcurrentSkipListSet(comparator); + } + + public boolean isInvalidDivisor() { + if (clusterResource.getMemory() == 0.0f || clusterResource.getVirtualCores() == 0.0f) { + return true; + } + return false; + } + + private float getResourceAsValue(Resource resource, boolean dominant) { + return (dominant) ? + Math.max( + (float)resource.getMemory() / clusterResource.getMemory(), + (float)resource.getVirtualCores() / clusterResource.getVirtualCores() + ) + : + Math.min( + (float)resource.getMemory() / clusterResource.getMemory(), + (float)resource.getVirtualCores() / clusterResource.getVirtualCores() + ); + } + + private double getValue(SchedulableEntity r, boolean dominant) { + Resource usedResource = r.getSchedulingResourceUsage().getCachedUsed( + CommonNodeLabelsManager.ANY); + double value = getResourceAsValue(usedResource, dominant); + + if (sizeBasedWeight) { + Resource demandResource = r.getSchedulingResourceUsage().getCachedDemand( + CommonNodeLabelsManager.ANY); + double demandDominantShare = getResourceAsValue(demandResource, true); + double weight = Math.log1p(demandDominantShare) / Math.log(2); + value = value / weight; + } + return value; + } + + @VisibleForTesting + public boolean getSizeBasedWeight() { + return sizeBasedWeight; + } + + @VisibleForTesting + public void setSizeBasedWeight(boolean sizeBasedWeight) { + this.sizeBasedWeight = sizeBasedWeight; + } + + @Override + public void configure(Map conf) { + if (conf.containsKey(ENABLE_SIZE_BASED_WEIGHT)) { + sizeBasedWeight = Boolean.valueOf(conf.get(ENABLE_SIZE_BASED_WEIGHT)); + } + } + + @Override + public void containerAllocated(S schedulableEntity, + RMContainer r) { + entityRequiresReordering(schedulableEntity); + } + + @Override + public void containerReleased(S schedulableEntity, + RMContainer r) { + entityRequiresReordering(schedulableEntity); + } + + @Override + public void demandUpdated(S schedulableEntity) { + if (sizeBasedWeight) { + entityRequiresReordering(schedulableEntity); + } + } + + @Override + public String getInfo() { + String sbw = sizeBasedWeight ? " with sizeBasedWeight" : ""; + return "DominantResourceFairnessOrderingPolicy" + sbw; + } + + @Override + public void clusterResourceUpdated(Resource r) { + LOG.info("update clusterResource memory=" + r.getMemory() + + " vcores=" + r.getVirtualCores()); + clusterResource.setMemory(r.getMemory()); + clusterResource.setVirtualCores(r.getVirtualCores()); + for (S entity: getSchedulableEntities()) { + entityRequiresReordering(entity); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java index 1616bb1..5a6e273 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; @@ -110,5 +111,7 @@ public void containerReleased(S schedulableEntity, * Display information regarding configuration and status */ public String getInfo(); + + public void clusterResourceUpdated(Resource r); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestDominantResourceFairnessOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestDominantResourceFairnessOrderingPolicy.java new file mode 100644 index 0000000..d8d3e6b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestDominantResourceFairnessOrderingPolicy.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.yarn.util.resource.Resources; + + +public class TestDominantResourceFairnessOrderingPolicy { + + final static int GB = 1024; + + @Test + public void testSimpleComparison() { + DominantResourceFairnessOrderingPolicy policy = + new DominantResourceFairnessOrderingPolicy(); + MockSchedulableEntity r1 = new MockSchedulableEntity(); + MockSchedulableEntity r2 = new MockSchedulableEntity(); + + policy.clusterResourceUpdated(Resources.createResource(24 * GB, 16)); + Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0); + + // r1: max(2/24, 8/16) => 1/2 + // r2: max(4/24, 2/16) => 1/6 + r1.setUsed(Resources.createResource(2 * GB, 8)); + r2.setUsed(Resources.createResource(4 * GB, 2)); + + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + r1.getSchedulingResourceUsage()); + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + r2.getSchedulingResourceUsage()); + Assert.assertTrue(policy.getComparator().compare(r1, r2) > 0); + + // r1: max(2/24, 8/16) => 1/2 + // r2: max(16/24, 4/16) => 2/3 + r2.setUsed(Resources.createResource(16 * GB, 4)); + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + r2.getSchedulingResourceUsage()); + Assert.assertTrue(policy.getComparator().compare(r1, r2) < 0); + + // r1: max(2/24, 8/16) => 1/2, min(2/24, 8/16) => 1/12 + // r2: max(12/24, 1/16) => 1/2, min(12/24, 1/16) => 1/16 + r2.setUsed(Resources.createResource(12 * GB, 1)); + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + r2.getSchedulingResourceUsage()); + Assert.assertTrue(policy.getComparator().compare(r1, r2) > 0); + } + + @Test + public void testSizeBasedWeight() { + DominantResourceFairnessOrderingPolicy policy = + new DominantResourceFairnessOrderingPolicy(); + policy.setSizeBasedWeight(true); + MockSchedulableEntity r1 = new MockSchedulableEntity(); + MockSchedulableEntity r2 = new MockSchedulableEntity(); + + policy.clusterResourceUpdated(Resources.createResource(240 * GB, 16)); + //No changes, equal + Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0); + + r1.setUsed(Resources.createResource(4 * GB, 2)); + r2.setUsed(Resources.createResource(4 * GB, 2)); + + r1.setPending(Resources.createResource(4 * GB, 2)); + r2.setPending(Resources.createResource(4 * GB, 2)); + + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + r1.getSchedulingResourceUsage()); + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + r2.getSchedulingResourceUsage()); + + //Same, equal + Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0); + + r2.setUsed(Resources.createResource(5 * GB, 2)); + r2.setPending(Resources.createResource(5 * GB, 2)); + + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + r2.getSchedulingResourceUsage()); + + //More demand and consumption, but not enough more demand to overcome + //additional consumption + Assert.assertTrue(policy.getComparator().compare(r1, r2) < 0); + + //High demand, enough to reverse sbw + r2.setPending(Resources.createResource(100 * GB, 2)); + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + r2.getSchedulingResourceUsage()); + Assert.assertTrue(policy.getComparator().compare(r1, r2) > 0); + } + + @Test + public void testIterators() { + OrderingPolicy schedOrder = + new DominantResourceFairnessOrderingPolicy(); + + schedOrder.clusterResourceUpdated(Resources.createResource(10 * GB, 16)); + + MockSchedulableEntity msp1 = new MockSchedulableEntity(); + MockSchedulableEntity msp2 = new MockSchedulableEntity(); + MockSchedulableEntity msp3 = new MockSchedulableEntity(); + + msp1.setId("1"); + msp2.setId("2"); + msp3.setId("3"); + + msp1.setUsed(Resources.createResource(3 * GB, 1)); + msp2.setUsed(Resources.createResource(2 * GB, 1)); + msp3.setUsed(Resources.createResource(1 * GB, 2)); + + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + msp1.getSchedulingResourceUsage()); + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + msp2.getSchedulingResourceUsage()); + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + msp2.getSchedulingResourceUsage()); + + schedOrder.addSchedulableEntity(msp1); + schedOrder.addSchedulableEntity(msp2); + schedOrder.addSchedulableEntity(msp3); + + // msp1: max(3/10, 1/16) => 3/10 + // msp2: max(2/10, 1/16) => 2/10 + // msp3: max(1/10, 2/16) => 1/8 + //Assignment, least to greatest consumption + checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "2", "1"}); + //Preemption, greatest to least + checkIds(schedOrder.getPreemptionIterator(), new String[]{"1", "2", "3"}); + + //Change value without inform, should see no change + msp2.setUsed(Resources.createResource(4 * GB, 1)); + checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "2", "1"}); + checkIds(schedOrder.getPreemptionIterator(), new String[]{"1", "2", "3"}); + //Do inform, will reorder + schedOrder.containerAllocated(msp2, null); + + // msp1: max(3/10, 1/16) => 3/10 + // msp2: max(4/10, 1/16) => 4/10 + // msp3: max(1/10, 2/16) => 1/8 + checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "1", "2"}); + checkIds(schedOrder.getPreemptionIterator(), new String[]{"2", "1", "3"}); + + // Reordering when cluster resource is updated. + schedOrder.clusterResourceUpdated(Resources.createResource(100 * GB, 16)); + + // msp1: max(3/100, 1/16) => 1/16 + // msp2: max(4/100, 1/16) => 1/16 + // msp3: max(1/100, 2/16) => 2/16 + checkIds(schedOrder.getAssignmentIterator(), new String[]{"1", "2", "3"}); + checkIds(schedOrder.getPreemptionIterator(), new String[]{"3", "2", "1"}); + } + + public void checkIds(Iterator si, + String[] ids) { + for (int i = 0;i < ids.length;i++) { + Assert.assertEquals(si.next().getId(), + ids[i]); + } + } + +}