diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index c1ce6da2203..7334d689a74 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -170,6 +170,8 @@ public static final String FIFO_FOR_PENDING_APPS = "fifo-for-pending-apps"; + public static final String DRF_APP_ORDERING_POLICY = "drf"; + public static final String DEFAULT_APP_ORDERING_POLICY = FIFO_APP_ORDERING_POLICY; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/DominantResourceFairnessOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/DominantResourceFairnessOrderingPolicy.java new file mode 100644 index 00000000000..112ed0f552b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/DominantResourceFairnessOrderingPolicy.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListSet; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; + +/** + * An OrderingPolicy which orders SchedulableEntities by equalize dominant + * resource usage. + */ +public class DominantResourceFairnessOrderingPolicy + extends AbstractComparatorOrderingPolicy { + + public DominantResourceFairnessOrderingPolicy() { + List> comparators = + new ArrayList>(); + comparators.add(new DominantResourceFairnessComparator()); + this.comparator = new CompoundComparator(comparators); + this.schedulableEntities = new ConcurrentSkipListSet(comparator); + } + + @Override + public void configure(Map conf) { + + } + + @Override + public void containerAllocated(S schedulableEntity, RMContainer r) { + } + + @Override + public void containerReleased(S schedulableEntity, RMContainer r) { + } + + @Override + public void demandUpdated(S schedulableEntity) { + } + + @Override + public String getInfo() { + return "DominantResourceFairnessOrderingPolicy"; + } + + @Override + public String getConfigName() { + return CapacitySchedulerConfiguration.DRF_APP_ORDERING_POLICY; + } + + private class DominantResourceFairnessComparator + implements Comparator { + @Override + public int compare(final SchedulableEntity lhs, + final SchedulableEntity rhs) { + FiCaSchedulerApp lhsApplication = (FiCaSchedulerApp) lhs; + CSQueue queue = (CSQueue) lhsApplication.getQueue(); + Resource queueEffectiveResource = + queue.getEffectiveCapacity(CommonNodeLabelsManager.ANY); + + DominantResourceCalculator calc = new DominantResourceCalculator(); + + Resource lhsPending = lhs.getSchedulingResourceUsage() + .getPending(CommonNodeLabelsManager.ANY); + Resource rhsPending = rhs.getSchedulingResourceUsage() + .getPending(CommonNodeLabelsManager.ANY); + return calc.compare(queueEffectiveResource, lhsPending, rhsPending); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestDominantResourceFairnessOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestDominantResourceFairnessOrderingPolicy.java new file mode 100644 index 00000000000..345137ac16f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestDominantResourceFairnessOrderingPolicy.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Iterator; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Test; + +public class TestDominantResourceFairnessOrderingPolicy { + + final static int GB = 1024; + + @Test + public void testIterators() { + + FiCaSchedulerApp app1 = mock(FiCaSchedulerApp.class); + FiCaSchedulerApp app2 = mock(FiCaSchedulerApp.class); + FiCaSchedulerApp app3 = mock(FiCaSchedulerApp.class); + + when(app1.getApplicationId()) + .thenReturn(ApplicationId.fromString("application_1234_11")); + when(app2.getApplicationId()) + .thenReturn(ApplicationId.fromString("application_1234_12")); + when(app3.getApplicationId()) + .thenReturn(ApplicationId.fromString("application_1234_13")); + + CSQueue queue = mock(CSQueue.class); + + when(queue.getEffectiveCapacity(CommonNodeLabelsManager.ANY)) + .thenReturn(Resources.createResource(50 * GB, 50)); + when(app1.getQueue()).thenReturn(queue); + when(app2.getQueue()).thenReturn(queue); + when(app3.getQueue()).thenReturn(queue); + + OrderingPolicy schedOrder = + new DominantResourceFairnessOrderingPolicy(); + + // App 1 resource shares are 0.04 and 0.08 + Resource res1 = Resources.createResource(2 * GB, 4); + + // App 2 resource shares are 0.12 and 0.08 + Resource res2 = Resources.createResource(6 * GB, 4); + + // App 3 resource shares are 0.08 and 0.16 + Resource res3 = Resources.createResource(4 * GB, 8); + + ResourceUsage usage1 = mock(ResourceUsage.class); + when(usage1.getPending(CommonNodeLabelsManager.ANY)).thenReturn(res1); + when(app1.getSchedulingResourceUsage()).thenReturn(usage1); + + ResourceUsage usage2 = mock(ResourceUsage.class); + when(usage2.getPending(CommonNodeLabelsManager.ANY)).thenReturn(res2); + when(app2.getSchedulingResourceUsage()).thenReturn(usage2); + + ResourceUsage usage3 = mock(ResourceUsage.class); + when(usage3.getPending(CommonNodeLabelsManager.ANY)).thenReturn(res3); + when(app3.getSchedulingResourceUsage()).thenReturn(usage3); + + // Adding order: app3, app2 and app1 + schedOrder.addSchedulableEntity(app3); + schedOrder.addSchedulableEntity(app2); + schedOrder.addSchedulableEntity(app1); + + // Dominant share of app 3 > Dominant share of app 2 > Dominant share of app + // 1. Hence iterator result order: app1, app2, app3 + checkSerials( + schedOrder + .getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR), + new long[] { 11, 12, 13 }); + } + + public void checkSerials(Iterator si, long[] serials) { + for (int i = 0; i < serials.length; i++) { + long s = si.next().getApplicationId().getId(); + Assert.assertEquals(s, serials[i]); + } + } +} \ No newline at end of file