diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairComparator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairComparator.java new file mode 100644 index 0000000..a0aa61c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairComparator.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; + +/** + * A Comparator which orders SchedulerProcesses for fairness (see FairScheduler + * FairSharePolicy), generally, processes with lesser usage are lesser. If + * sizedBasedWeight is set to true then an application with high demand + * may actually be prioritized ahead of an application with less usage. This + * is to offset the tendency to favor small apps, which could result in + * starvation for large apps if many small ones enter and leave the queue + * continuously (optional, default false) + */ +public class FairComparator implements SchedulerComparator { + private static final Log LOG = LogFactory.getLog(FairComparator.class); + + private boolean sizeBasedWeight = false; + + public double getMagnitude(SchedulerProcess r) { + double mag = r.getCachedConsumption().getMemory(); + if (sizeBasedWeight) { + double weight = Math.log1p(r.getCachedDemand().getMemory()) / Math.log(2); + mag = mag / weight; + } + return mag; + } + + @Override + public int compare(SchedulerProcess r1, SchedulerProcess r2) { + int res = (int) Math.signum( getMagnitude(r1) - getMagnitude(r2) ); + if (res == 0) { + //cannot return equality for different processses, will result in + //"loss". process must always have unique ids to use as a fallback + res = r1.getId().compareTo(r2.getId()); + } + return res; + } + + @Override + public boolean isReorderingEvent( + SchedulerProcess.SchedulerProcessEvent event) { + return true; + } + + @Override + public void configure(String conf) { + if (conf.indexOf("sizeBasedWeight") >= 0) { + sizeBasedWeight = true; + } + } + + public boolean getSizeBasedWeight() { + return sizeBasedWeight; + } + + public void setSizeBasedWeight(boolean sizeBasedWeight) { + this.sizeBasedWeight = sizeBasedWeight; + } + + @Override + public String getInfo() { + String sbw = sizeBasedWeight ? " with sizeBasedWeight" : ""; + return "FairComparator" + sbw; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 972cabb..a9ad062 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairComparator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -435,6 +436,41 @@ public void testAppAttemptMetrics() throws Exception { QueueMetrics userMetrics = a.getMetrics().getUserMetrics(user_0); assertEquals(1, userMetrics.getAppsSubmitted()); } + + @Test + public void testFairConfiguration() throws Exception { + + CapacitySchedulerConfiguration testConf = + new CapacitySchedulerConfiguration(); + + String tproot = CapacitySchedulerConfiguration.ROOT + "." + + "testPolicyRoot" + System.currentTimeMillis(); + + OrderingPolicy policy = + testConf.getOrderingPolicy(tproot); + + //override comparator default to fair + String comparatorConfig = CapacitySchedulerConfiguration.PREFIX + tproot + + "." + CapacitySchedulerConfiguration.COMPARATOR_CONFIGURATION; + + testConf.set(comparatorConfig, "org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairComparator"); + policy = + testConf.getOrderingPolicy(tproot); + SchedulerComparatorPolicy comPol = + (SchedulerComparatorPolicy) policy; + FairComparator ccomp = (FairComparator) comPol.getSchedulerComparator(); + assertFalse(ccomp.getSizeBasedWeight()); + + //Now with sizeBasedWeight + testConf.set(comparatorConfig, "org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairComparator:sizeBasedWeight"); + policy = + testConf.getOrderingPolicy(tproot); + comPol = + (SchedulerComparatorPolicy) policy; + ccomp = (FairComparator) comPol.getSchedulerComparator(); + assertTrue(ccomp.getSizeBasedWeight()); + + } @Test public void testSingleQueueWithOneUser() throws Exception { @@ -2520,6 +2556,88 @@ public void run() { rm.stop(); } + + @Test + public void testFairAssignment() throws Exception { + + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + + CompoundComparator comp = new CompoundComparator(); + comp.getComparators().add(new FairComparator()); + comp.getComparators().add(new FifoComparator()); + + a.setOrderingPolicy( + new SchedulerComparatorPolicy(comp)); + + String host_0_0 = "127.0.0.1"; + String rack_0 = "rack_0"; + FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 16*GB); + + final int numNodes = 4; + Resource clusterResource = Resources.createResource( + numNodes * (16*GB), numNodes * 16); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + String user_0 = "user_0"; + + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), spyRMContext)); + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), spyRMContext)); + a.submitApplicationAttempt(app_1, user_0); + + Priority priority = TestUtils.createMockPriority(1); + List app_0_requests_0 = new ArrayList(); + List app_1_requests_0 = new ArrayList(); + + app_0_requests_0.clear(); + app_0_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, + true, priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + app_1_requests_0.clear(); + app_1_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, + true, priority, recordFactory)); + app_1.updateResourceRequests(app_1_requests_0); + + a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource)); + Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource)); + Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); + + app_0_requests_0.clear(); + app_0_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, + true, priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + app_1_requests_0.clear(); + app_1_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, + true, priority, recordFactory)); + app_1.updateResourceRequests(app_1_requests_0); + + //Since it already has more resources, app_0 will not get + //assigned first, but app_1 will + a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource)); + Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + Assert.assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); + + //and only then will app_0 + a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource)); + Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); + + } private List createListOfApps(int noOfApps, String user, LeafQueue defaultQueue) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestComparatorPolicyFair.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestComparatorPolicyFair.java new file mode 100644 index 0000000..23b4492 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestComparatorPolicyFair.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; + +public class TestComparatorPolicyFair { + + @Test + public void testIterators() { + SchedulerComparatorPolicy scp = + new SchedulerComparatorPolicy(new FairComparator()); + + MockSchedulerProcess msp1 = new MockSchedulerProcess(); + MockSchedulerProcess msp2 = new MockSchedulerProcess(); + MockSchedulerProcess msp3 = new MockSchedulerProcess(); + + msp1.setConsumption(Resources.createResource(3)); + msp2.setConsumption(Resources.createResource(2)); + msp3.setConsumption(Resources.createResource(1)); + + msp1.updateCaches(); + msp2.updateCaches(); + msp3.updateCaches(); + + scp.getSchedulerProcesses().add(msp1); + scp.getSchedulerProcesses().add(msp2); + scp.getSchedulerProcesses().add(msp3); + + + //Assignment, least to greatest consumption + checkConsumption(scp.getAssignmentIterator(), new int[]{1, 2, 3}); + + //Preemption, greatest to least + checkConsumption(scp.getPreemptionIterator(), new int[]{3, 2, 1}); + + //Change value without spe, should see no change + msp2.setConsumption(Resources.createResource(6)); + checkConsumption(scp.getAssignmentIterator(), new int[]{1, 2, 3}); + checkConsumption(scp.getPreemptionIterator(), new int[]{3, 2, 1}); + + //Do spe, will reorder + scp.handleSchedulerProcessEvent(msp2, + SchedulerProcess.SchedulerProcessEvent.CONTAINER_ALLOCATED); + checkConsumption(scp.getAssignmentIterator(), new int[]{1, 3, 6}); + checkConsumption(scp.getPreemptionIterator(), new int[]{6, 3, 1}); + + } + + public void checkConsumption(Iterator si, + int[] demands) { + for (int i = 0;i < demands.length;i++) { + Assert.assertEquals(si.next().getCachedConsumption().getMemory(), + demands[i]); + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairComparator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairComparator.java new file mode 100644 index 0000000..2f0042f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairComparator.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; + +public class TestFairComparator { + + final static int GB = 1024; + + @Test + public void testSimpleComparison() { + Comparator comp = new FairComparator(); + MockSchedulerProcess r1 = new MockSchedulerProcess(); + MockSchedulerProcess r2 = new MockSchedulerProcess(); + + Assert.assertTrue(comp.compare(r1, r2) == 0); + + //consumption + r1.setConsumption(Resources.createResource(1, 0)); + r1.updateCaches(); + Assert.assertTrue(comp.compare(r1, r2) > 0); + + } + + @Test + public void testSizeBasedWeight() { + FairComparator comp = new FairComparator(); + comp.setSizeBasedWeight(true); + MockSchedulerProcess r1 = new MockSchedulerProcess(); + MockSchedulerProcess r2 = new MockSchedulerProcess(); + + //No changes, equal + Assert.assertTrue(comp.compare(r1, r2) == 0); + + r1.setConsumption(Resources.createResource(4 * GB)); + r2.setConsumption(Resources.createResource(4 * GB)); + + r1.setDemand(Resources.createResource(4 * GB)); + r2.setDemand(Resources.createResource(4 * GB)); + + r1.updateCaches(); + r2.updateCaches(); + + //Same, equal + Assert.assertTrue(comp.compare(r1, r2) == 0); + + r2.setConsumption(Resources.createResource(5 * GB)); + r2.setDemand(Resources.createResource(5 * GB)); + + r2.updateCaches(); + + //More demand and consumption, but not enough more demand to overcome + //additional consumption + Assert.assertTrue(comp.compare(r1, r2) < 0); + + //High demand, enough to reverse sbw + r2.setDemand(Resources.createResource(100 * GB)); + r2.updateCaches(); + Assert.assertTrue(comp.compare(r1, r2) > 0); + + } + +}