diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index c1ce6da2203..7334d689a74 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -170,6 +170,8 @@ public static final String FIFO_FOR_PENDING_APPS = "fifo-for-pending-apps"; + public static final String DRF_APP_ORDERING_POLICY = "drf"; + public static final String DEFAULT_APP_ORDERING_POLICY = FIFO_APP_ORDERING_POLICY; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 1028a7d0b2e..2c301bd3b76 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -2111,6 +2111,7 @@ void setOrderingPolicy( writeLock.lock(); try { if (null != this.orderingPolicy) { + orderingPolicy.initRMContext(this.csContext.getRMContext()); orderingPolicy.addAllSchedulableEntities( this.orderingPolicy.getSchedulableEntities()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java index b5e870bf34f..f920db8cf58 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java @@ -23,6 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; @@ -142,4 +143,6 @@ public abstract void containerReleased(S schedulableEntity, @Override public abstract String getConfigName(); + @Override + public abstract void initRMContext(RMContext rmContext); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/DominantResourceFairnessOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/DominantResourceFairnessOrderingPolicy.java new file mode 100644 index 00000000000..c8a14fd8ea7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/DominantResourceFairnessOrderingPolicy.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.Comparator; +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListSet; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; + +/** + * An OrderingPolicy which orders SchedulableEntities by equalize dominant + * resource usage. + */ +public class DominantResourceFairnessOrderingPolicy + extends AbstractComparatorOrderingPolicy { + + private RMContext rmContext; + + public DominantResourceFairnessOrderingPolicy() { + this.schedulableEntities = + new ConcurrentSkipListSet(new DominantResourceFairnessComparator()); + } + + @Override + public void configure(Map conf) { + //no op + } + + @Override + public void containerAllocated(S schedulableEntity, RMContainer r) { + // no op + } + + @Override + public void containerReleased(S schedulableEntity, RMContainer r) { + // no op + } + + @Override + public void demandUpdated(S schedulableEntity) { + // no op + } + + @Override + public String getInfo() { + return "DominantResourceFairnessOrderingPolicy"; + } + + @Override + public String getConfigName() { + return CapacitySchedulerConfiguration.DRF_APP_ORDERING_POLICY; + } + + private class DominantResourceFairnessComparator + implements Comparator { + @Override + public int compare(final SchedulableEntity lhs, + final SchedulableEntity rhs) { + Resource clusterResource = rmContext.getScheduler().getClusterResource(); + + DominantResourceCalculator calc = new DominantResourceCalculator(); + + Resource lhsPending = lhs.getSchedulingResourceUsage() + .getPending(CommonNodeLabelsManager.ANY); + Resource rhsPending = rhs.getSchedulingResourceUsage() + .getPending(CommonNodeLabelsManager.ANY); + return calc.compare(clusterResource, lhsPending, rhsPending); + } + } + + @Override + public void initRMContext(RMContext rmContext) { + this.rmContext = rmContext; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java index 1f4522ce3f9..76bbde327d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java @@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; @@ -125,4 +126,8 @@ public String getConfigName() { return CapacitySchedulerConfiguration.FAIR_APP_ORDERING_POLICY; } + @Override + public void initRMContext(RMContext rmContext) { + // no op + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java index 6e545a029b6..ca23d6779d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentSkipListSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; /** @@ -68,4 +69,8 @@ public String getConfigName() { return CapacitySchedulerConfiguration.FIFO_APP_ORDERING_POLICY; } + @Override + public void initRMContext(RMContext rmContext) { + // no op + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyForPendingApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyForPendingApps.java index 0007033e9b1..c65c93d1421 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyForPendingApps.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyForPendingApps.java @@ -21,6 +21,7 @@ import java.util.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import java.util.concurrent.ConcurrentSkipListSet; @@ -77,4 +78,8 @@ public void containerReleased(S schedulableEntity, RMContainer r) { public void demandUpdated(S schedulableEntity) { } + @Override + public void initRMContext(RMContext rmContext) { + // no op + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyWithExclusivePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyWithExclusivePartitions.java index e6f6139305a..465f232635d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyWithExclusivePartitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyWithExclusivePartitions.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -148,4 +149,8 @@ public String getConfigName() { partition : DEFAULT_PARTITION; return orderingPolicies.get(keyPartition); } -} + + @Override + public void initRMContext(RMContext rmContext) { + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java index 5c622282b8a..11ca817a39c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; import java.util.*; + +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -134,5 +136,11 @@ * @return configuration name */ String getConfigName(); + + /** + * Initialize RMContext. + * @param rmContext + */ + void initRMContext(RMContext rmContext); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestDominantResourceFairnessOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestDominantResourceFairnessOrderingPolicy.java new file mode 100644 index 00000000000..49e16978f1c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestDominantResourceFairnessOrderingPolicy.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Iterator; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Test; + +public class TestDominantResourceFairnessOrderingPolicy { + + final static int GB = 1024; + + @Test + public void testIterators() { + + FiCaSchedulerApp app1 = mock(FiCaSchedulerApp.class); + FiCaSchedulerApp app2 = mock(FiCaSchedulerApp.class); + FiCaSchedulerApp app3 = mock(FiCaSchedulerApp.class); + + when(app1.getApplicationId()) + .thenReturn(ApplicationId.fromString("application_1234_11")); + when(app2.getApplicationId()) + .thenReturn(ApplicationId.fromString("application_1234_12")); + when(app3.getApplicationId()) + .thenReturn(ApplicationId.fromString("application_1234_13")); + + + CapacityScheduler scheduler = mock(CapacityScheduler.class); + when(scheduler.getClusterResource()) + .thenReturn(Resources.createResource(50 * GB, 50)); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getScheduler()).thenReturn(scheduler); + + OrderingPolicy schedOrder = + new DominantResourceFairnessOrderingPolicy(); + schedOrder.initRMContext(rmContext); + + // App 1 resource shares are 0.04 and 0.08 + Resource res1 = Resources.createResource(2 * GB, 4); + + // App 2 resource shares are 0.12 and 0.08 + Resource res2 = Resources.createResource(6 * GB, 4); + + // App 3 resource shares are 0.08 and 0.16 + Resource res3 = Resources.createResource(4 * GB, 8); + + ResourceUsage usage1 = mock(ResourceUsage.class); + when(usage1.getPending(CommonNodeLabelsManager.ANY)).thenReturn(res1); + when(app1.getSchedulingResourceUsage()).thenReturn(usage1); + + ResourceUsage usage2 = mock(ResourceUsage.class); + when(usage2.getPending(CommonNodeLabelsManager.ANY)).thenReturn(res2); + when(app2.getSchedulingResourceUsage()).thenReturn(usage2); + + ResourceUsage usage3 = mock(ResourceUsage.class); + when(usage3.getPending(CommonNodeLabelsManager.ANY)).thenReturn(res3); + when(app3.getSchedulingResourceUsage()).thenReturn(usage3); + + // Adding order: app3, app2 and app1 + schedOrder.addSchedulableEntity(app3); + schedOrder.addSchedulableEntity(app2); + schedOrder.addSchedulableEntity(app1); + + // Dominant share of app 3 > Dominant share of app 2 > Dominant share of app + // 1. Hence iterator result order: app1, app2, app3 + checkSerials( + schedOrder + .getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR), + new long[] { 11, 12, 13 }); + } + + public void checkSerials(Iterator si, long[] serials) { + for (int i = 0; i < serials.length; i++) { + long s = si.next().getApplicationId().getId(); + Assert.assertEquals(s, serials[i]); + } + } +} \ No newline at end of file