diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java index 9a5bc79ae08..767f4f42a0f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java @@ -66,8 +66,8 @@ // copy array, etc. protected static final int NUM_MANDATORY_RESOURCES = 2; - protected static final int MEMORY_INDEX = 0; - protected static final int VCORES_INDEX = 1; + public static final int MEMORY_INDEX = 0; + public static final int VCORES_INDEX = 1; @Public @Stable diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java index e7a76e85bc0..9ce88c95739 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.api.records; import com.google.common.collect.ImmutableMap; -import org.apache.curator.shaded.com.google.common.reflect.ClassPath; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; import org.apache.hadoop.yarn.util.UnitsConversionUtil; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index e58b3572968..ea6a5bb86bd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -48,7 +48,9 @@ public static final String NAME = "DRF"; private static final DominantResourceFairnessComparator COMPARATOR = - new DominantResourceFairnessComparator(); + new DominantResourceFairnessComparatorN(); + private static final DominantResourceFairnessComparator COMPARATOR2 = + new DominantResourceFairnessComparator2(); private static final DominantResourceCalculator CALCULATOR = new DominantResourceCalculator(); @@ -59,7 +61,17 @@ public String getName() { @Override public Comparator getComparator() { - return COMPARATOR; + int n = ResourceUtils.getNumberOfKnownResourceTypes(); + + if (n == 2) { + // To improve performance, if we know we're dealing with the common + // case of only CPU and memory, then handle CPU and memory explicitly. + return COMPARATOR2; + } else { + // Otherwise, do it the generic way. + return COMPARATOR; + } + } @Override @@ -113,19 +125,42 @@ public void initialize(FSContext fsContext) { /** * This class compares two {@link Schedulable} instances according to the * DRF policy. If neither instance is below min share, approximate fair share - * ratios are compared. + * ratios are compared. Subclasses of this class will do the actual work of + * the comparison, specialized for the number of configured resource types. */ - public static class DominantResourceFairnessComparator + public abstract static class DominantResourceFairnessComparator implements Comparator { - private FSContext fsContext; + protected FSContext fsContext; public void setFSContext(FSContext fsContext) { this.fsContext = fsContext; } + protected int compareAttribrutes(Schedulable s1, Schedulable s2) { + // Apps are tied in fairness ratio. Break the tie by submit time and job + // name to get a deterministic ordering, which is useful for unit tests. + int res = (int) Math.signum(s1.getStartTime() - s2.getStartTime()); + + if (res == 0) { + res = s1.getName().compareTo(s2.getName()); + } + + return res; + } + } + + /** + * This class compares two {@link Schedulable} instances according to the + * DRF policy. If neither instance is below min share, approximate fair share + * ratios are compared. This class makes no assumptions about the number of + * resource types. + */ + public static class DominantResourceFairnessComparatorN + extends DominantResourceFairnessComparator { @Override public int compare(Schedulable s1, Schedulable s2) { - ResourceInformation[] info = ResourceUtils.getResourceTypesArray(); + int n = ResourceUtils.getNumberOfKnownResourceTypes(); + Resource usage1 = s1.getResourceUsage(); Resource usage2 = s2.getResourceUsage(); Resource minShare1 = s1.getMinShare(); @@ -135,8 +170,8 @@ public int compare(Schedulable s1, Schedulable s2) { // These arrays hold the usage, fair, and min share ratios for each // resource type. ratios[0][x] are the usage ratios, ratios[1][x] are // the fair share ratios, and ratios[2][x] are the min share ratios. - float[][] ratios1 = new float[info.length][3]; - float[][] ratios2 = new float[info.length][3]; + float[][] ratios1 = new float[n][3]; + float[][] ratios2 = new float[n][3]; // Calculate cluster shares and approximate fair shares for each // resource type of both schedulables. @@ -155,7 +190,7 @@ public int compare(Schedulable s1, Schedulable s2) { usage2.getResources()[dominant2].getValue() < minShare2.getResources()[dominant2].getValue(); - int res = 0; + int res; if (!s2Needy && !s1Needy) { // Sort shares by usage ratio and compare them by approximate fair share @@ -176,13 +211,7 @@ public int compare(Schedulable s1, Schedulable s2) { } if (res == 0) { - // Apps are tied in fairness ratio. Break the tie by submit time and job - // name to get a deterministic ordering, which is useful for unit tests. - res = (int) Math.signum(s1.getStartTime() - s2.getStartTime()); - - if (res == 0) { - res = s1.getName().compareTo(s2.getName()); - } + res = compareAttribrutes(s1, s2); } return res; @@ -206,7 +235,7 @@ void sortRatios(float[][] ratios1, float[][]ratios2) { /** * Calculate a resource's usage ratio and approximate fair share ratio. - * The {@code shares} array will be populated with both the usage ratio + * The {@code ratios} array will be populated with both the usage ratio * and the approximate fair share ratio for each resource type. The usage * ratio is calculated as {@code resource} divided by {@code cluster}. * The approximate fair share ratio is calculated as the usage ratio @@ -221,18 +250,18 @@ void sortRatios(float[][] ratios1, float[][]ratios2) { * because when comparing resources, the resource with the higher weight * will be assigned by the scheduler a proportionally higher fair share. * - * The {@code shares} array must be at least n x 2, where n + * The {@code ratios} array must be at least n x 2, where n * is the number of resource types. Only the first and second indices of - * the inner arrays in the {@code shares} array will be used, e.g. - * {@code shares[x][0]} and {@code shares[x][1]}. + * the inner arrays in the {@code ratios} array will be used, e.g. + * {@code ratios[x][0]} and {@code ratios[x][1]}. * * The return value will be the index of the dominant resource type in the - * {@code shares} array. The dominant resource is the resource type for + * {@code ratios} array. The dominant resource is the resource type for * which {@code resource} has the largest usage ratio. * * @param resource the resource for which to calculate ratios * @param cluster the total cluster resources - * @param ratios the shares array to populate + * @param ratios the share ratios array to populate * @param weight the resource weight * @return the index of the resource type with the largest cluster share */ @@ -275,7 +304,7 @@ int calculateClusterAndFairRatios(Resource resource, Resource cluster, * * @param resource the resource for which to calculate min shares * @param minShare the min share - * @param ratios the shares array to populate + * @param ratios the share ratios array to populate */ @VisibleForTesting void calculateMinShareRatios(Resource resource, Resource minShare, @@ -320,4 +349,154 @@ int compareRatios(float[][] ratios1, float[][] ratios2, int index) { return ret; } } + + /** + * This class compares two {@link Schedulable} instances according to the + * DRF policy in the special case that only CPU and memory are configured. + * If neither instance is below min share, approximate fair share + * ratios are compared. + */ + public static class DominantResourceFairnessComparator2 + extends DominantResourceFairnessComparator { + @Override + public int compare(Schedulable s1, Schedulable s2) { + ResourceInformation[] resourceInfo1 = + s1.getResourceUsage().getResources(); + ResourceInformation[] resourceInfo2 = + s2.getResourceUsage().getResources(); + ResourceInformation[] minShareInfo1 = s1.getMinShare().getResources(); + ResourceInformation[] minShareInfo2 = s2.getMinShare().getResources(); + ResourceInformation[] clusterInfo = + fsContext.getClusterResource().getResources(); + double[] shares1 = new double[2]; + double[] shares2 = new double[2]; + + int dominant1 = calculateClusterAndFairRatios(resourceInfo1, + s1.getWeight(), clusterInfo, shares1); + int dominant2 = calculateClusterAndFairRatios(resourceInfo2, + s2.getWeight(), clusterInfo, shares2); + + // A queue is needy for its min share if its dominant resource + // (with respect to the cluster capacity) is below its configured min + // share for that resource + boolean s1Needy = resourceInfo1[dominant1].getValue() < + minShareInfo1[dominant1].getValue(); + boolean s2Needy = resourceInfo1[dominant2].getValue() < + minShareInfo2[dominant2].getValue(); + + int res; + + if (!s2Needy && !s1Needy) { + res = (int) Math.signum(shares1[dominant1] - shares2[dominant2]); + + if (res == 0) { + // Because memory and CPU are indices 0 and 1, we can find the + // non-dominant index by subtracting the dominant index from 1. + res = (int) Math.signum(shares1[1 - dominant1] - + shares2[1 - dominant2]); + } + } else if (s1Needy && !s2Needy) { + res = -1; + } else if (s2Needy && !s1Needy) { + res = 1; + } else { + double[] minShares1 = + calculateMinShareRatios(resourceInfo1, minShareInfo1); + double[] minShares2 = + calculateMinShareRatios(resourceInfo2, minShareInfo2); + + res = (int) Math.signum(minShares1[dominant1] - minShares2[dominant2]); + + if (res == 0) { + res = (int) Math.signum(minShares1[1 - dominant1] - + minShares2[1 - dominant2]); + } + } + + if (res == 0) { + res = compareAttribrutes(s1, s2); + } + + return res; + } + + /** + * Calculate a resource's usage ratio and approximate fair share ratio + * assuming that CPU and memory are the only configured resource types. + * The {@code shares} array will be populated with the approximate fair + * share ratio for each resource type. The approximate fair share ratio + * is calculated as {@code resourceInfo} divided by {@code cluster} and + * the {@code weight}. If the cluster's resources are 100MB and + * 10 vcores, the usage ({@code resourceInfo}) is 10 MB and 5 CPU, and the + * weights are 2, the fair share ratios will be 0.05 and 0.25. + * + * The approximate fair share ratio is the usage divided by the + * approximate fair share, i.e. the cluster resources times the weight. + * The approximate fair share is an acceptable proxy for the fair share + * because when comparing resources, the resource with the higher weight + * will be assigned by the scheduler a proportionally higher fair share. + * + * The length of the {@code shares} array must be at least 2. + * + * The return value will be the index of the dominant resource type in the + * {@code shares} array. The dominant resource is the resource type for + * which {@code resourceInfo} has the largest usage ratio. + * + * @param resourceInfo the resource for which to calculate ratios + * @param weight the resource weight + * @param clusterInfo the total cluster resources + * @param shares the share ratios array to populate + * @return the index of the resource type with the largest cluster share + */ + @VisibleForTesting + int calculateClusterAndFairRatios(ResourceInformation[] resourceInfo, + float weight, ResourceInformation[] clusterInfo, double[] shares) { + int dominant; + + shares[Resource.MEMORY_INDEX] = + ((double) resourceInfo[Resource.MEMORY_INDEX].getValue()) / + clusterInfo[Resource.MEMORY_INDEX].getValue(); + shares[Resource.VCORES_INDEX] = + ((double) resourceInfo[Resource.VCORES_INDEX].getValue()) / + clusterInfo[Resource.VCORES_INDEX].getValue(); + dominant = + shares[Resource.VCORES_INDEX] > shares[Resource.MEMORY_INDEX] ? + Resource.VCORES_INDEX : Resource.MEMORY_INDEX; + + shares[Resource.MEMORY_INDEX] /= weight; + shares[Resource.VCORES_INDEX] /= weight; + + return dominant; + } + + /** + * Calculate a resource's min share ratios assuming that CPU and memory + * are the only configured resource types. The return array will be + * populated with the {@code resourceInfo} divided by {@code minShareInfo} + * for each resource type. If the min shares are 5 MB and 10 vcores, and + * the usage ({@code resourceInfo}) is 10 MB and 5 CPU, the ratios will + * be 2 and 0.5. + * + * The length of the {@code ratios} array must be 2. + * + * @param resourceInfo the resource for which to calculate min shares + * @param minShareInfo the min share + * @return the share ratios + */ + @VisibleForTesting + double[] calculateMinShareRatios(ResourceInformation[] resourceInfo, + ResourceInformation[] minShareInfo) { + double[] minShares1 = new double[2]; + + // both are needy below min share + minShares1[Resource.MEMORY_INDEX] = + ((double) resourceInfo[Resource.MEMORY_INDEX].getValue()) / + minShareInfo[Resource.MEMORY_INDEX].getValue(); + minShares1[Resource.VCORES_INDEX] = + ((double) resourceInfo[Resource.VCORES_INDEX].getValue()) / + minShareInfo[Resource.VCORES_INDEX].getValue(); + + return minShares1; + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java index 097558feb18..6a3332fa889 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java @@ -24,9 +24,9 @@ import java.util.Comparator; import java.util.Map; + import org.apache.curator.shaded.com.google.common.base.Joiner; import org.apache.hadoop.conf.Configuration; - import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.Test; /** @@ -46,8 +46,8 @@ * container before sched2 */ public class TestDominantResourceFairnessPolicy { - @BeforeClass - public static void setup() { + @Before + public void setup() { addResources("test"); } @@ -77,7 +77,6 @@ private Schedulable createSchedulable(int memUsage, int cpuUsage, return createSchedulable(memUsage, cpuUsage, weights, 0, 0); } - private Schedulable createSchedulable(int memUsage, int cpuUsage, float weights, int minMemShare, int minCpuShare) { Resource usage = BuilderUtils.newResource(memUsage, cpuUsage); @@ -98,6 +97,12 @@ public void testSameDominantResource() { } @Test + public void testSameDominantResource2() { + ResourceUtils.resetResourceTypes(new Configuration()); + testSameDominantResource(); + } + + @Test public void testDifferentDominantResource() { Comparator c = createComparator(8000, 8); Schedulable s1 = createSchedulable(4000, 3); @@ -108,6 +113,12 @@ public void testDifferentDominantResource() { } @Test + public void testDifferentDominantResource2() { + ResourceUtils.resetResourceTypes(new Configuration()); + testDifferentDominantResource(); + } + + @Test public void testOneIsNeedy() { Comparator c = createComparator(8000, 8); Schedulable s1 = createSchedulable(2000, 5, 0, 6); @@ -118,6 +129,12 @@ public void testOneIsNeedy() { } @Test + public void testOneIsNeedy2() { + ResourceUtils.resetResourceTypes(new Configuration()); + testOneIsNeedy(); + } + + @Test public void testBothAreNeedy() { Comparator c = createComparator(8000, 100); // dominant share is 2000/8000 @@ -138,6 +155,12 @@ public void testBothAreNeedy() { } @Test + public void testBothAreNeedy2() { + ResourceUtils.resetResourceTypes(new Configuration()); + testBothAreNeedy(); + } + + @Test public void testEvenWeightsSameDominantResource() { assertTrue(createComparator(8000, 8).compare( createSchedulable(3000, 1, 2.0f), @@ -148,6 +171,12 @@ public void testEvenWeightsSameDominantResource() { } @Test + public void testEvenWeightsSameDominantResource2() { + ResourceUtils.resetResourceTypes(new Configuration()); + testEvenWeightsSameDominantResource(); + } + + @Test public void testEvenWeightsDifferentDominantResource() { assertTrue(createComparator(8000, 8).compare( createSchedulable(1000, 3, 2.0f), @@ -158,6 +187,12 @@ public void testEvenWeightsDifferentDominantResource() { } @Test + public void testEvenWeightsDifferentDominantResource2() { + ResourceUtils.resetResourceTypes(new Configuration()); + testEvenWeightsDifferentDominantResource(); + } + + @Test public void testSortShares() { float[][] ratios1 = {{0.3f, 2.0f}, {0.2f, 1.0f}, {0.4f, 0.1f}}; float[][] ratios2 = {{0.2f, 9.0f}, {0.3f, 2.0f}, {0.25f, 0.1f}}; @@ -207,6 +242,26 @@ public void testCalculateClusterAndFairRatios() { } @Test + public void testCalculateClusterAndFairRatios2() { + ResourceUtils.resetResourceTypes(new Configuration()); + Resource used = Resources.createResource(10, 5); + Resource capacity = Resources.createResource(100, 10); + double[] shares = new double[2]; + DominantResourceFairnessComparator comparator = + new DominantResourceFairnessComparator(); + int dominant = + comparator.calculateClusterAndFairRatios2(used.getResources(), 1.0f, + capacity.getResources(), shares); + + assertEquals("Calculated usage ratio for memory (10MB out of 100MB) is " + + "incorrect", 0.1, shares[Resource.MEMORY_INDEX], .00001); + assertEquals("Calculated usage ratio for vcores (5 out of 10) is " + + "incorrect", 0.5, shares[Resource.VCORES_INDEX], .00001); + assertEquals("The wrong dominant resource index was returned", + Resource.VCORES_INDEX, dominant); + } + + @Test public void testCalculateMinShareRatios() { Map index = ResourceUtils.getResourceTypeIndex(); Resource used = Resources.createResource(10, 5); @@ -232,6 +287,24 @@ public void testCalculateMinShareRatios() { } @Test + public void testCalculateMinShareRatios2() { + ResourceUtils.resetResourceTypes(new Configuration()); + Resource used = Resources.createResource(10, 5); + Resource minShares = Resources.createResource(5, 10); + DominantResourceFairnessComparator comparator = + new DominantResourceFairnessComparator(); + + double[] ratios = + comparator.calculateMinShareRatios2(used.getResources(), + minShares.getResources()); + + assertEquals("Calculated min share ratio for memory (10MB out of 5MB) is " + + "incorrect", 2.0, ratios[Resource.MEMORY_INDEX], .00001f); + assertEquals("Calculated min share ratio for vcores (5 out of 10) is " + + "incorrect", 0.5, ratios[Resource.VCORES_INDEX], .00001f); + } + + @Test public void testCompareShares() { float[][] ratios1 = { {0.4f, 0.1f, 2.0f},