diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
index 9a5bc79ae08..767f4f42a0f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
@@ -66,8 +66,8 @@
// copy array, etc.
protected static final int NUM_MANDATORY_RESOURCES = 2;
- protected static final int MEMORY_INDEX = 0;
- protected static final int VCORES_INDEX = 1;
+ public static final int MEMORY_INDEX = 0;
+ public static final int VCORES_INDEX = 1;
@Public
@Stable
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
index e7a76e85bc0..9ce88c95739 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.api.records;
import com.google.common.collect.ImmutableMap;
-import org.apache.curator.shaded.com.google.common.reflect.ClassPath;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
index e58b3572968..52fc66126d6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
@@ -125,7 +125,185 @@ public void setFSContext(FSContext fsContext) {
@Override
public int compare(Schedulable s1, Schedulable s2) {
- ResourceInformation[] info = ResourceUtils.getResourceTypesArray();
+ int n = ResourceUtils.getNumberOfKnownResourceTypes();
+
+ if (n == 2) {
+ // To improve performance, if we know we're dealing with the common
+ // case of only CPU and memory, then handle CPU and memory explicitly.
+ return compare2(s1, s2);
+ } else {
+ // Otherwise, do it the generic way.
+ return compareN(n, s1, s2);
+ }
+ }
+
+ /**
+ * Do the comparison assuming only 2 resource types, which lets us use the
+ * CPU and memory indices directly, avoiding iterating over the resource
+ * types array.
+ *
+ * @param n the number of resource types
+ * @param s1 the first item to compare
+ * @param s2 the second item to compare
+ * @return -1 if the first item is larger, 1 if the second is larger, and
+ * 0 if they're both equal
+ */
+ private int compare2(Schedulable s1, Schedulable s2) {
+ ResourceInformation[] resourceInfo1 =
+ s1.getResourceUsage().getResources();
+ ResourceInformation[] resourceInfo2 =
+ s2.getResourceUsage().getResources();
+ ResourceInformation[] minShareInfo1 = s1.getMinShare().getResources();
+ ResourceInformation[] minShareInfo2 = s2.getMinShare().getResources();
+ ResourceInformation[] clusterInfo =
+ fsContext.getClusterResource().getResources();
+ double[] shares1 = new double[2];
+ double[] shares2 = new double[2];
+
+ int dominant1 = calculateClusterAndFairRatios2(resourceInfo1,
+ s1.getWeight(), clusterInfo, shares1);
+ int dominant2 = calculateClusterAndFairRatios2(resourceInfo2,
+ s2.getWeight(), clusterInfo, shares2);
+
+
+ // A queue is needy for its min share if its dominant resource
+ // (with respect to the cluster capacity) is below its configured min
+ // share for that resource
+ boolean s1Needy = resourceInfo1[dominant1].getValue() <
+ minShareInfo1[dominant1].getValue();
+ boolean s2Needy = resourceInfo1[dominant2].getValue() <
+ minShareInfo2[dominant2].getValue();
+
+ int res;
+
+ if (!s2Needy && !s1Needy) {
+ res = (int) Math.signum(shares1[dominant1] - shares2[dominant2]);
+
+ if (res == 0) {
+ res = (int) Math.signum(shares1[1 - dominant1] -
+ shares2[1 - dominant2]);
+ }
+ } else if (s1Needy && !s2Needy) {
+ res = -1;
+ } else if (s2Needy && !s1Needy) {
+ res = 1;
+ } else {
+ double[] minShares1 =
+ calculateMinShareRatios2(resourceInfo1, minShareInfo1);
+ double[] minShares2 =
+ calculateMinShareRatios2(resourceInfo2, minShareInfo2);
+
+ res = (int) Math.signum(minShares1[dominant1] - minShares2[dominant2]);
+
+ if (res == 0) {
+ res = (int) Math.signum(minShares1[1 - dominant1] -
+ minShares2[1 - dominant2]);
+ }
+ }
+
+ if (res == 0) {
+ // Apps are tied in fairness ratio. Break the tie by submit time and job
+ // name to get a deterministic ordering, which is useful for unit tests.
+ res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
+
+ if (res == 0) {
+ res = s1.getName().compareTo(s2.getName());
+ }
+ }
+
+ return res;
+ }
+
+ /**
+ * Calculate a resource's usage ratio and approximate fair share ratio
+ * assuming that CPU and memory are the only configured resource types.
+ * The {@code shares} array will be populated with the approximate fair
+ * share ratio for each resource type. The approximate fair share ratio
+ * is calculated as {@code resourceInfo} divided by {@code cluster} and
+ * the {@code weight}. If the cluster's resources are 100MB and
+ * 10 vcores, the usage ({@code resourceInfo}) is 10 MB and 5 CPU, and the
+ * weights are 2, the fair share ratios will be 0.05 and 0.25.
+ *
+ * The approximate fair share ratio is the usage divided by the
+ * approximate fair share, i.e. the cluster resources times the weight.
+ * The approximate fair share is an acceptable proxy for the fair share
+ * because when comparing resources, the resource with the higher weight
+ * will be assigned by the scheduler a proportionally higher fair share.
+ *
+ * The length of the {@code shares} array must be at least 2.
+ *
+ * The return value will be the index of the dominant resource type in the
+ * {@code shares} array. The dominant resource is the resource type for
+ * which {@code resourceInfo} has the largest usage ratio.
+ *
+ * @param resourceInfo the resource for which to calculate ratios
+ * @param weight the resource weight
+ * @param clusterInfo the total cluster resources
+ * @param shares the share ratios array to populate
+ * @return the index of the resource type with the largest cluster share
+ */
+ @VisibleForTesting
+ int calculateClusterAndFairRatios2(ResourceInformation[] resourceInfo,
+ float weight, ResourceInformation[] clusterInfo, double[] shares) {
+ int dominant;
+
+ shares[Resource.MEMORY_INDEX] =
+ ((double) resourceInfo[Resource.MEMORY_INDEX].getValue()) /
+ clusterInfo[Resource.MEMORY_INDEX].getValue();
+ shares[Resource.VCORES_INDEX] =
+ ((double) resourceInfo[Resource.VCORES_INDEX].getValue()) /
+ clusterInfo[Resource.VCORES_INDEX].getValue();
+ dominant =
+ shares[Resource.VCORES_INDEX] > shares[Resource.MEMORY_INDEX] ?
+ Resource.VCORES_INDEX : Resource.MEMORY_INDEX;
+
+ shares[Resource.MEMORY_INDEX] /= weight;
+ shares[Resource.VCORES_INDEX] /= weight;
+
+ return dominant;
+ }
+
+ /**
+ * Calculate a resource's min share ratios assuming that CPU and memory
+ * are the only configured resource types. The return array will be
+ * populated with the {@code resourceInfo} divided by {@code minShareInfo}
+ * for each resource type. If the min shares are 5 MB and 10 vcores, and
+ * the usage ({@code resourceInfo}) is 10 MB and 5 CPU, the ratios will
+ * be 2 and 0.5.
+ *
+ * The length of the {@code ratios} array must be 2.
+ *
+ * @param resourceInfo the resource for which to calculate min shares
+ * @param minShareInfo the min share
+ * @return the share ratios
+ */
+ @VisibleForTesting
+ double[] calculateMinShareRatios2(ResourceInformation[] resourceInfo,
+ ResourceInformation[] minShareInfo) {
+ double[] minShares1 = new double[2];
+
+ // both are needy below min share
+ minShares1[Resource.MEMORY_INDEX] =
+ ((double) resourceInfo[Resource.MEMORY_INDEX].getValue()) /
+ minShareInfo[Resource.MEMORY_INDEX].getValue();
+ minShares1[Resource.VCORES_INDEX] =
+ ((double) resourceInfo[Resource.VCORES_INDEX].getValue()) /
+ minShareInfo[Resource.VCORES_INDEX].getValue();
+
+ return minShares1;
+ }
+
+ /**
+ * Do the comparison assuming more than 2 resource types, which involves
+ * iterating through the resource types array.
+ *
+ * @param n the number of resource types
+ * @param s1 the first item to compare
+ * @param s2 the second item to compare
+ * @return -1 if the first item is larger, 1 if the second is larger, and
+ * 0 if they're both equal
+ */
+ private int compareN(int n, Schedulable s1, Schedulable s2) {
Resource usage1 = s1.getResourceUsage();
Resource usage2 = s2.getResourceUsage();
Resource minShare1 = s1.getMinShare();
@@ -135,8 +313,8 @@ public int compare(Schedulable s1, Schedulable s2) {
// These arrays hold the usage, fair, and min share ratios for each
// resource type. ratios[0][x] are the usage ratios, ratios[1][x] are
// the fair share ratios, and ratios[2][x] are the min share ratios.
- float[][] ratios1 = new float[info.length][3];
- float[][] ratios2 = new float[info.length][3];
+ float[][] ratios1 = new float[n][3];
+ float[][] ratios2 = new float[n][3];
// Calculate cluster shares and approximate fair shares for each
// resource type of both schedulables.
@@ -155,7 +333,7 @@ public int compare(Schedulable s1, Schedulable s2) {
usage2.getResources()[dominant2].getValue() <
minShare2.getResources()[dominant2].getValue();
- int res = 0;
+ int res;
if (!s2Needy && !s1Needy) {
// Sort shares by usage ratio and compare them by approximate fair share
@@ -206,7 +384,7 @@ void sortRatios(float[][] ratios1, float[][]ratios2) {
/**
* Calculate a resource's usage ratio and approximate fair share ratio.
- * The {@code shares} array will be populated with both the usage ratio
+ * The {@code ratios} array will be populated with both the usage ratio
* and the approximate fair share ratio for each resource type. The usage
* ratio is calculated as {@code resource} divided by {@code cluster}.
* The approximate fair share ratio is calculated as the usage ratio
@@ -221,18 +399,18 @@ void sortRatios(float[][] ratios1, float[][]ratios2) {
* because when comparing resources, the resource with the higher weight
* will be assigned by the scheduler a proportionally higher fair share.
*
- * The {@code shares} array must be at least n x 2, where n
+ * The {@code ratios} array must be at least n x 2, where n
* is the number of resource types. Only the first and second indices of
- * the inner arrays in the {@code shares} array will be used, e.g.
- * {@code shares[x][0]} and {@code shares[x][1]}.
+ * the inner arrays in the {@code ratios} array will be used, e.g.
+ * {@code ratios[x][0]} and {@code ratios[x][1]}.
*
* The return value will be the index of the dominant resource type in the
- * {@code shares} array. The dominant resource is the resource type for
+ * {@code ratios} array. The dominant resource is the resource type for
* which {@code resource} has the largest usage ratio.
*
* @param resource the resource for which to calculate ratios
* @param cluster the total cluster resources
- * @param ratios the shares array to populate
+ * @param ratios the share ratios array to populate
* @param weight the resource weight
* @return the index of the resource type with the largest cluster share
*/
@@ -275,7 +453,7 @@ int calculateClusterAndFairRatios(Resource resource, Resource cluster,
*
* @param resource the resource for which to calculate min shares
* @param minShare the min share
- * @param ratios the shares array to populate
+ * @param ratios the share ratios array to populate
*/
@VisibleForTesting
void calculateMinShareRatios(Resource resource, Resource minShare,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java
index 097558feb18..02b7eca729b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java
@@ -24,9 +24,9 @@
import java.util.Comparator;
import java.util.Map;
+
import org.apache.curator.shaded.com.google.common.base.Joiner;
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -38,7 +38,7 @@
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
import org.junit.Test;
/**
@@ -46,8 +46,8 @@
* container before sched2
*/
public class TestDominantResourceFairnessPolicy {
- @BeforeClass
- public static void setup() {
+ @Before
+ public void setup() {
addResources("test");
}
@@ -77,7 +77,6 @@ private Schedulable createSchedulable(int memUsage, int cpuUsage,
return createSchedulable(memUsage, cpuUsage, weights, 0, 0);
}
-
private Schedulable createSchedulable(int memUsage, int cpuUsage,
float weights, int minMemShare, int minCpuShare) {
Resource usage = BuilderUtils.newResource(memUsage, cpuUsage);
@@ -98,6 +97,12 @@ public void testSameDominantResource() {
}
@Test
+ public void testSameDominantResource2() {
+ ResourceUtils.resetResourceTypes(new Configuration());
+ testSameDominantResource();
+ }
+
+ @Test
public void testDifferentDominantResource() {
Comparator c = createComparator(8000, 8);
Schedulable s1 = createSchedulable(4000, 3);
@@ -108,6 +113,12 @@ public void testDifferentDominantResource() {
}
@Test
+ public void testDifferentDominantResource2() {
+ ResourceUtils.resetResourceTypes(new Configuration());
+ testDifferentDominantResource();
+ }
+
+ @Test
public void testOneIsNeedy() {
Comparator c = createComparator(8000, 8);
Schedulable s1 = createSchedulable(2000, 5, 0, 6);
@@ -118,6 +129,12 @@ public void testOneIsNeedy() {
}
@Test
+ public void testOneIsNeedy2() {
+ ResourceUtils.resetResourceTypes(new Configuration());
+ testOneIsNeedy();
+ }
+
+ @Test
public void testBothAreNeedy() {
Comparator c = createComparator(8000, 100);
// dominant share is 2000/8000
@@ -138,6 +155,12 @@ public void testBothAreNeedy() {
}
@Test
+ public void testBothAreNeedy2() {
+ ResourceUtils.resetResourceTypes(new Configuration());
+ testBothAreNeedy();
+ }
+
+ @Test
public void testEvenWeightsSameDominantResource() {
assertTrue(createComparator(8000, 8).compare(
createSchedulable(3000, 1, 2.0f),
@@ -148,6 +171,12 @@ public void testEvenWeightsSameDominantResource() {
}
@Test
+ public void testEvenWeightsSameDominantResource2() {
+ ResourceUtils.resetResourceTypes(new Configuration());
+ testEvenWeightsSameDominantResource();
+ }
+
+ @Test
public void testEvenWeightsDifferentDominantResource() {
assertTrue(createComparator(8000, 8).compare(
createSchedulable(1000, 3, 2.0f),
@@ -158,6 +187,12 @@ public void testEvenWeightsDifferentDominantResource() {
}
@Test
+ public void testEvenWeightsDifferentDominantResource2() {
+ ResourceUtils.resetResourceTypes(new Configuration());
+ testEvenWeightsDifferentDominantResource();
+ }
+
+ @Test
public void testSortShares() {
float[][] ratios1 = {{0.3f, 2.0f}, {0.2f, 1.0f}, {0.4f, 0.1f}};
float[][] ratios2 = {{0.2f, 9.0f}, {0.3f, 2.0f}, {0.25f, 0.1f}};
@@ -207,6 +242,26 @@ public void testCalculateClusterAndFairRatios() {
}
@Test
+ public void testCalculateClusterAndFairRatios2() {
+ ResourceUtils.resetResourceTypes(new Configuration());
+ Resource used = Resources.createResource(10, 5);
+ Resource capacity = Resources.createResource(100, 10);
+ double[] shares = new double[2];
+ DominantResourceFairnessComparator comparator =
+ new DominantResourceFairnessComparator();
+ int dominant =
+ comparator.calculateClusterAndFairRatios2(used.getResources(), 1.0f,
+ capacity.getResources(), shares);
+
+ assertEquals("Calculated usage ratio for memory (10MB out of 100MB) is "
+ + "incorrect", 0.1, shares[Resource.MEMORY_INDEX], .00001);
+ assertEquals("Calculated usage ratio for vcores (5 out of 10) is "
+ + "incorrect", 0.5, shares[Resource.VCORES_INDEX], .00001);
+ assertEquals("The wrong dominant resource index was returned",
+ Resource.VCORES_INDEX, dominant);
+ }
+
+ @Test
public void testCalculateMinShareRatios() {
Map index = ResourceUtils.getResourceTypeIndex();
Resource used = Resources.createResource(10, 5);
@@ -232,6 +287,24 @@ public void testCalculateMinShareRatios() {
}
@Test
+ public void testCalculateMinShareRatios2() {
+ ResourceUtils.resetResourceTypes(new Configuration());
+ Resource used = Resources.createResource(10, 5);
+ Resource minShares = Resources.createResource(5, 10);
+ DominantResourceFairnessComparator comparator =
+ new DominantResourceFairnessComparator();
+
+ double[] ratios =
+ comparator.calculateMinShareRatios2(used.getResources(),
+ minShares.getResources());
+
+ assertEquals("Calculated min share ratio for memory (10MB out of 5MB) is "
+ + "incorrect", 2.0, ratios[Resource.MEMORY_INDEX], .00001f);
+ assertEquals("Calculated min share ratio for vcores (5 out of 10) is "
+ + "incorrect", 0.5, ratios[Resource.VCORES_INDEX], .00001f);
+ }
+
+ @Test
public void testCompareShares() {
float[][] ratios1 = {
{0.4f, 0.1f, 2.0f},