diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index ece8548..bcfb429 100644
--- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -146,6 +146,10 @@
+
+
+
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index c9e83a1..2162636 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -398,6 +398,9 @@ public int getUserLimit(String queue) {
if (policyType.trim().equals("fifo")) {
policyType = FifoOrderingPolicy.class.getName();
}
+ if (policyType.trim().equals("fair")) {
+ policyType = FairOrderingPolicy.class.getName();
+ }
try {
orderingPolicy = (OrderingPolicy)
Class.forName(policyType).newInstance();
@@ -405,6 +408,13 @@ public int getUserLimit(String queue) {
String message = "Unable to construct ordering policy for: " + policyType + ", " + e.getMessage();
throw new RuntimeException(message, e);
}
+ Map config = new HashMap();
+ String sizeBasedWeight = get(getQueuePrefix(queue) + ORDERING_POLICY +
+ ".sizeBasedWeight");
+ if (sizeBasedWeight != null && !sizeBasedWeight.trim().equals("")) {
+ config.put("sizeBasedWeight", sizeBasedWeight);
+ }
+ orderingPolicy.configure(config);
return orderingPolicy;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java
new file mode 100644
index 0000000..fec5cf4
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
+
+import java.util.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+
+
+//Some policies will use multiple comparators joined together
+class CompoundComparator implements Comparator {
+
+ List> comparators;
+
+ CompoundComparator(List> comparators) {
+ this.comparators = comparators;
+ }
+
+ @Override
+ public int compare(SchedulableEntity r1, SchedulableEntity r2) {
+ for (Comparator comparator : comparators) {
+ int result = comparator.compare(r1, r2);
+ if (result != 0) return result;
+ }
+ return 0;
+ }
+}
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java
new file mode 100644
index 0000000..7351976
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
+
+import java.util.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+
+/**
+ * An OrderingPolicy which orders SchedulableEntities for fairness (see
+ * FairScheduler
+ * FairSharePolicy), generally, processes with lesser usage are lesser. If
+ * sizedBasedWeight is set to true then an application with high demand
+ * may be prioritized ahead of an application with less usage. This
+ * is to offset the tendency to favor small apps, which could result in
+ * starvation for large apps if many small ones enter and leave the queue
+ * continuously (optional, default false)
+ */
+public class FairOrderingPolicy extends AbstractComparatorOrderingPolicy {
+
+ private static final Log LOG = LogFactory.getLog(FairOrderingPolicy.class);
+
+ protected class FairComparator implements Comparator {
+ @Override
+ public int compare(SchedulableEntity r1, SchedulableEntity r2) {
+ int res = (int) Math.signum( getMagnitude(r1) - getMagnitude(r2) );
+ return res;
+ }
+ }
+
+ private CompoundComparator fairComparator;
+
+ private boolean sizeBasedWeight = false;
+
+ public FairOrderingPolicy() {
+ List> comparators =
+ new ArrayList>();
+ comparators.add(new FairComparator());
+ comparators.add(new FifoComparator());
+ fairComparator = new CompoundComparator(
+ comparators
+ );
+ this.comparator = fairComparator;
+ this.schedulableEntities = new TreeSet(comparator);
+ }
+
+ public double getMagnitude(SchedulableEntity r) {
+ double mag = r.getSchedulingResourceUsage().getCachedUsed(
+ CommonNodeLabelsManager.ANY).getMemory();
+ if (sizeBasedWeight) {
+ double weight = Math.log1p(r.getSchedulingResourceUsage().getCachedDemand(
+ CommonNodeLabelsManager.ANY).getMemory()) / Math.log(2);
+ mag = mag / weight;
+ }
+ return mag;
+ }
+
+ public boolean getSizeBasedWeight() {
+ return sizeBasedWeight;
+ }
+
+ public void setSizeBasedWeight(boolean sizeBasedWeight) {
+ this.sizeBasedWeight = sizeBasedWeight;
+ }
+
+ @Override
+ public void configure(Map conf) {
+ if (conf.containsKey("sizeBasedWeight")) {
+ sizeBasedWeight = Boolean.valueOf(conf.get("sizeBasedWeight"));
+ }
+ }
+
+ @Override
+ public void containerAllocated(S schedulableEntity,
+ RMContainer r) {
+ reorderSchedulableEntity(schedulableEntity);
+ }
+
+ @Override
+ public void containerReleased(S schedulableEntity,
+ RMContainer r) {
+ reorderSchedulableEntity(schedulableEntity);
+ }
+
+ @Override
+ public String getInfo() {
+ String sbw = sizeBasedWeight ? " with sizeBasedWeight" : "";
+ return "FairOrderingPolicy" + sbw;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 33b8f56..317c9d1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -84,6 +84,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -452,6 +453,40 @@ public void testAppAttemptMetrics() throws Exception {
QueueMetrics userMetrics = a.getMetrics().getUserMetrics(user_0);
assertEquals(1, userMetrics.getAppsSubmitted());
}
+
+ @Test
+ public void testFairConfiguration() throws Exception {
+
+ CapacitySchedulerConfiguration testConf =
+ new CapacitySchedulerConfiguration();
+
+ String tproot = CapacitySchedulerConfiguration.ROOT + "." +
+ "testPolicyRoot" + System.currentTimeMillis();
+
+ OrderingPolicy schedOrder =
+ testConf.getOrderingPolicy(tproot);
+
+ //override default to fair
+ String policyType = CapacitySchedulerConfiguration.PREFIX + tproot +
+ "." + CapacitySchedulerConfiguration.ORDERING_POLICY;
+
+ testConf.set(policyType, "fair");
+ schedOrder =
+ testConf.getOrderingPolicy(tproot);
+ FairOrderingPolicy fop = (FairOrderingPolicy) schedOrder;
+ assertFalse(fop.getSizeBasedWeight());
+
+ //Now with sizeBasedWeight
+ String sbwConfig = CapacitySchedulerConfiguration.PREFIX + tproot +
+ "." + CapacitySchedulerConfiguration.ORDERING_POLICY + "." +
+ "sizeBasedWeight";
+ testConf.set(sbwConfig, "true");
+ schedOrder =
+ testConf.getOrderingPolicy(tproot);
+ fop = (FairOrderingPolicy) schedOrder;
+ assertTrue(fop.getSizeBasedWeight());
+
+ }
@Test
public void testSingleQueueWithOneUser() throws Exception {
@@ -2614,6 +2649,86 @@ public void run() {
rm.stop();
}
+
+ @Test
+ public void testFairAssignment() throws Exception {
+
+ LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+
+ OrderingPolicy schedulingOrder =
+ new FairOrderingPolicy();
+
+ a.setOrderingPolicy(schedulingOrder);
+
+ String host_0_0 = "127.0.0.1";
+ String rack_0 = "rack_0";
+ FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 16*GB);
+
+ final int numNodes = 4;
+ Resource clusterResource = Resources.createResource(
+ numNodes * (16*GB), numNodes * 16);
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+ String user_0 = "user_0";
+
+ final ApplicationAttemptId appAttemptId_0 =
+ TestUtils.getMockApplicationAttemptId(0, 0);
+ FiCaSchedulerApp app_0 =
+ spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
+ mock(ActiveUsersManager.class), spyRMContext));
+ a.submitApplicationAttempt(app_0, user_0);
+
+ final ApplicationAttemptId appAttemptId_1 =
+ TestUtils.getMockApplicationAttemptId(1, 0);
+ FiCaSchedulerApp app_1 =
+ spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a,
+ mock(ActiveUsersManager.class), spyRMContext));
+ a.submitApplicationAttempt(app_1, user_0);
+
+ Priority priority = TestUtils.createMockPriority(1);
+ List app_0_requests_0 = new ArrayList();
+ List app_1_requests_0 = new ArrayList();
+
+ app_0_requests_0.clear();
+ app_0_requests_0.add(
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1,
+ true, priority, recordFactory));
+ app_0.updateResourceRequests(app_0_requests_0);
+
+ app_1_requests_0.clear();
+ app_1_requests_0.add(
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1,
+ true, priority, recordFactory));
+ app_1.updateResourceRequests(app_1_requests_0);
+
+ a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+ a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
+
+ app_0_requests_0.clear();
+ app_0_requests_0.add(
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1,
+ true, priority, recordFactory));
+ app_0.updateResourceRequests(app_0_requests_0);
+
+ app_1_requests_0.clear();
+ app_1_requests_0.add(
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1,
+ true, priority, recordFactory));
+ app_1.updateResourceRequests(app_1_requests_0);
+
+ //Since it already has more resources, app_0 will not get
+ //assigned first, but app_1 will
+ a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+ Assert.assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
+
+ //and only then will app_0
+ a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
+
+ }
private List createListOfApps(int noOfApps, String user,
LeafQueue defaultQueue) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java
new file mode 100644
index 0000000..9b02982
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
+
+import java.util.*;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+
+public class TestFairOrderingPolicy {
+
+ final static int GB = 1024;
+
+ @Test
+ public void testSimpleComparison() {
+ FairOrderingPolicy policy =
+ new FairOrderingPolicy();
+ MockSchedulableEntity r1 = new MockSchedulableEntity();
+ MockSchedulableEntity r2 = new MockSchedulableEntity();
+
+ Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0);
+
+ //consumption
+ r1.setUsed(Resources.createResource(1, 0));
+ AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage(
+ r1.getSchedulingResourceUsage());
+ Assert.assertTrue(policy.getComparator().compare(r1, r2) > 0);
+ }
+
+ @Test
+ public void testSizeBasedWeight() {
+ FairOrderingPolicy policy =
+ new FairOrderingPolicy();
+ policy.setSizeBasedWeight(true);
+ MockSchedulableEntity r1 = new MockSchedulableEntity();
+ MockSchedulableEntity r2 = new MockSchedulableEntity();
+
+ //No changes, equal
+ Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0);
+
+ r1.setUsed(Resources.createResource(4 * GB));
+ r2.setUsed(Resources.createResource(4 * GB));
+
+ r1.setPending(Resources.createResource(4 * GB));
+ r2.setPending(Resources.createResource(4 * GB));
+
+ AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage(
+ r1.getSchedulingResourceUsage());
+ AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage(
+ r2.getSchedulingResourceUsage());
+
+ //Same, equal
+ Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0);
+
+ r2.setUsed(Resources.createResource(5 * GB));
+ r2.setPending(Resources.createResource(5 * GB));
+
+ AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage(
+ r2.getSchedulingResourceUsage());
+
+ //More demand and consumption, but not enough more demand to overcome
+ //additional consumption
+ Assert.assertTrue(policy.getComparator().compare(r1, r2) < 0);
+
+ //High demand, enough to reverse sbw
+ r2.setPending(Resources.createResource(100 * GB));
+ AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage(
+ r2.getSchedulingResourceUsage());
+ Assert.assertTrue(policy.getComparator().compare(r1, r2) > 0);
+ }
+
+ @Test
+ public void testIterators() {
+ OrderingPolicy schedOrder =
+ new FairOrderingPolicy();
+
+ MockSchedulableEntity msp1 = new MockSchedulableEntity();
+ MockSchedulableEntity msp2 = new MockSchedulableEntity();
+ MockSchedulableEntity msp3 = new MockSchedulableEntity();
+
+ msp1.setId("1");
+ msp2.setId("2");
+ msp3.setId("3");
+
+ msp1.setUsed(Resources.createResource(3));
+ msp2.setUsed(Resources.createResource(2));
+ msp3.setUsed(Resources.createResource(1));
+
+ AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage(
+ msp1.getSchedulingResourceUsage());
+ AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage(
+ msp2.getSchedulingResourceUsage());
+ AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage(
+ msp2.getSchedulingResourceUsage());
+
+ schedOrder.addSchedulableEntity(msp1);
+ schedOrder.addSchedulableEntity(msp2);
+ schedOrder.addSchedulableEntity(msp3);
+
+
+ //Assignment, least to greatest consumption
+ checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "2", "1"});
+
+ //Preemption, greatest to least
+ checkIds(schedOrder.getPreemptionIterator(), new String[]{"1", "2", "3"});
+
+ //Change value without inform, should see no change
+ msp2.setUsed(Resources.createResource(6));
+ checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "2", "1"});
+ checkIds(schedOrder.getPreemptionIterator(), new String[]{"1", "2", "3"});
+
+ //Do inform, will reorder
+ schedOrder.containerAllocated(msp2, null);
+ checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "1", "2"});
+ checkIds(schedOrder.getPreemptionIterator(), new String[]{"2", "1", "3"});
+ }
+
+ public void checkIds(Iterator si,
+ String[] ids) {
+ for (int i = 0;i < ids.length;i++) {
+ Assert.assertEquals(si.next().getId(),
+ ids[i]);
+ }
+ }
+
+}