/usr/bin/git diff "--no-prefix" "HEAD^"
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
index e3e25d18b6a..89f832e7ca5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
@@ -350,8 +350,7 @@ private static void initializeResourceTypesIfNeeded(Configuration conf,
initializeResourcesMap(conf);
initializedResources = true;
} catch (FileNotFoundException fe) {
- LOG.info("Unable to find '" + resourceFile
- + "'. Falling back to memory and vcores as resources.");
+ LOG.debug("Unable to find '" + resourceFile + "'.");
initializeResourcesMap(conf);
initializedResources = true;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java
index 3ce15170eb4..372d363dd16 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java
@@ -20,52 +20,149 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+/**
+ * {@code ResourceWeights} holds a list of weights for each resource type.
+ * This class is typically used in the scheduler code to represent weights or
+ * shares. It is effectively an array of floating point values that can be
+ * accessed by resource type as well as index.
+ */
@Private
@Evolving
public class ResourceWeights {
+ /**
+ * A {@code ResourceWeights} instance that has all weights set to 1.
+ * DO NOT MODIFY this instance.
+ */
public static final ResourceWeights NEUTRAL = new ResourceWeights(1.0f);
+ private final float[] weights;
- private final float[] weights = new float[ResourceType.values().length];
+ /**
+ * Create an instance with all weight set to 0.
+ */
+ public ResourceWeights() {
+ this(0.0f);
+ }
+ /**
+ * Create an instance with the given memory and CPU weights and all other
+ * weights set to 0.
+ *
+ * @param memoryWeight the memory weight
+ * @param cpuWeight the CPU weight
+ */
public ResourceWeights(float memoryWeight, float cpuWeight) {
+ this();
weights[ResourceType.MEMORY.ordinal()] = memoryWeight;
weights[ResourceType.CPU.ordinal()] = cpuWeight;
}
+ /**
+ * Create an instance with all weights set to the given weight.
+ *
+ * @param weight the weight to use for all resource types
+ */
public ResourceWeights(float weight) {
+ ResourceInformation[] infos = ResourceUtils.getResourceTypesArray();
+
+ weights = new float[infos.length];
+
setWeight(weight);
}
- public ResourceWeights() { }
-
+ /**
+ * Set the weight for all resource types to the given weight.
+ *
+ * @param weight the weight to use for all resource types
+ */
public final void setWeight(float weight) {
for (int i = 0; i < weights.length; i++) {
weights[i] = weight;
}
}
- public void setWeight(ResourceType resourceType, float weight) {
- weights[resourceType.ordinal()] = weight;
+ /**
+ * Set the weight for the given resource type to the given weight.
+ *
+ * @param resource the name of the resource type to set
+ * @param weight the weight to set for the given resource type
+ */
+ public void setWeight(String resource, float weight) {
+ setWeight(indexOfResouce(resource), weight);
+ }
+
+ /**
+ * Set the weight at the given index to the given weight. Use
+ * {@link ResourceUtils#getResourceTypeIndex()} to map a resource type to
+ * an index.
+ *
+ * @param index the index of the resource type to set
+ * @param weight the weight to set at the given index
+ * @see ResourceUtils#getResourceTypeIndex()
+ */
+ public void setWeight(int index, float weight) {
+ weights[index] = weight;
+ }
+
+ /**
+ * Private wrapper method to lookup the resource index in the mapping and
+ * handle the case when it's not found.
+ *
+ * @param resource the resource type to lookup
+ * @return the index of that resource type in the arrays returned from
+ * {@link ResourceUtils}.
+ */
+ private int indexOfResouce(String resource) {
+ Integer index = ResourceUtils.getResourceTypeIndex().get(resource);
+
+ if (index == null) {
+ throw new ResourceNotFoundException(resource);
+ }
+
+ return index;
+ }
+
+ /**
+ * Return the weight for the given resource type.
+ *
+ * @param resource the resource type for which to return the weight
+ * @return the weight
+ */
+ public float getWeight(String resource) {
+ return getWeight(indexOfResouce(resource));
}
-
- public float getWeight(ResourceType resourceType) {
- return weights[resourceType.ordinal()];
+
+ /**
+ * Return the weight at the given index. Use
+ * {@link ResourceUtils#getResourceTypeIndex()} to map a resource type to
+ * an index.
+ *
+ * @param index the index from which to return the weight
+ * @return the weight
+ * @see ResourceUtils#getResourceTypeIndex()
+ */
+ public float getWeight(int index) {
+ return weights[index];
}
-
+
+ @Override
public String toString() {
- StringBuffer sb = new StringBuffer();
+ ResourceInformation[] info = ResourceUtils.getResourceTypesArray();
+ StringBuilder sb = new StringBuilder();
+
sb.append("<");
- for (int i = 0; i < ResourceType.values().length; i++) {
- if (i != 0) {
- sb.append(", ");
- }
- ResourceType resourceType = ResourceType.values()[i];
- sb.append(StringUtils.toLowerCase(resourceType.name()));
- sb.append(StringUtils.format(" weight=%.1f", getWeight(resourceType)));
+
+ for (int i = 0; i < weights.length; i++) {
+ sb.append(String.format("%s weight=%.1f", info[i].getName(), weights[i]));
+ sb.append(", ");
}
+
+ sb.delete(sb.length() - 2, sb.length());
sb.append(">");
+
return sb.toString();
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java
index 440c73cefdd..c2be0667cae 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java
@@ -21,7 +21,6 @@
import java.util.Collection;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
@@ -47,7 +46,7 @@
*/
public static void computeShares(
Collection extends Schedulable> schedulables, Resource totalResources,
- ResourceType type) {
+ String type) {
computeSharesInternal(schedulables, totalResources, type, false);
}
@@ -62,7 +61,7 @@ public static void computeShares(
*/
public static void computeSteadyShares(
Collection extends FSQueue> queues, Resource totalResources,
- ResourceType type) {
+ String type) {
computeSharesInternal(queues, totalResources, type, true);
}
@@ -110,9 +109,9 @@ public static void computeSteadyShares(
*/
private static void computeSharesInternal(
Collection extends Schedulable> allSchedulables,
- Resource totalResources, ResourceType type, boolean isSteadyShare) {
+ Resource totalResources, String type, boolean isSteadyShare) {
- Collection schedulables = new ArrayList();
+ Collection schedulables = new ArrayList<>();
int takenResources = handleFixedFairShares(
allSchedulables, schedulables, isSteadyShare, type);
@@ -124,7 +123,7 @@ private static void computeSharesInternal(
// have met all Schedulables' max shares.
int totalMaxShare = 0;
for (Schedulable sched : schedulables) {
- long maxShare = getResourceValue(sched.getMaxShare(), type);
+ long maxShare = sched.getMaxShare().getResourceValue(type);
totalMaxShare = (int) Math.min(maxShare + (long)totalMaxShare,
Integer.MAX_VALUE);
if (totalMaxShare == Integer.MAX_VALUE) {
@@ -132,7 +131,7 @@ private static void computeSharesInternal(
}
}
- long totalResource = Math.max((getResourceValue(totalResources, type) -
+ long totalResource = Math.max((totalResources.getResourceValue(type) -
takenResources), 0);
totalResource = Math.min(totalMaxShare, totalResource);
@@ -159,13 +158,15 @@ private static void computeSharesInternal(
}
// Set the fair shares based on the value of R we've converged to
for (Schedulable sched : schedulables) {
+ Resource target;
+
if (isSteadyShare) {
- setResourceValue(computeShare(sched, right, type),
- ((FSQueue) sched).getSteadyFairShare(), type);
+ target = ((FSQueue) sched).getSteadyFairShare();
} else {
- setResourceValue(
- computeShare(sched, right, type), sched.getFairShare(), type);
+ target = sched.getFairShare();
}
+
+ target.setResourceValue(type, (long)computeShare(sched, right, type));
}
}
@@ -174,7 +175,7 @@ private static void computeSharesInternal(
* w2rRatio, for use in the computeFairShares algorithm as described in #
*/
private static int resourceUsedWithWeightToResourceRatio(double w2rRatio,
- Collection extends Schedulable> schedulables, ResourceType type) {
+ Collection extends Schedulable> schedulables, String type) {
int resourcesTaken = 0;
for (Schedulable sched : schedulables) {
int share = computeShare(sched, w2rRatio, type);
@@ -188,10 +189,10 @@ private static int resourceUsedWithWeightToResourceRatio(double w2rRatio,
* weight-to-resource ratio w2rRatio.
*/
private static int computeShare(Schedulable sched, double w2rRatio,
- ResourceType type) {
+ String type) {
double share = sched.getWeights().getWeight(type) * w2rRatio;
- share = Math.max(share, getResourceValue(sched.getMinShare(), type));
- share = Math.min(share, getResourceValue(sched.getMaxShare(), type));
+ share = Math.max(share, sched.getMinShare().getResourceValue(type));
+ share = Math.min(share, sched.getMaxShare().getResourceValue(type));
return (int) share;
}
@@ -203,7 +204,7 @@ private static int computeShare(Schedulable sched, double w2rRatio,
private static int handleFixedFairShares(
Collection extends Schedulable> schedulables,
Collection nonFixedSchedulables,
- boolean isSteadyShare, ResourceType type) {
+ boolean isSteadyShare, String type) {
int totalResource = 0;
for (Schedulable sched : schedulables) {
@@ -211,11 +212,15 @@ private static int handleFixedFairShares(
if (fixedShare < 0) {
nonFixedSchedulables.add(sched);
} else {
- setResourceValue(fixedShare,
- isSteadyShare
- ? ((FSQueue)sched).getSteadyFairShare()
- : sched.getFairShare(),
- type);
+ Resource target;
+
+ if (isSteadyShare) {
+ target = ((FSQueue)sched).getSteadyFairShare();
+ } else {
+ target = sched.getFairShare();
+ }
+
+ target.setResourceValue(type, fixedShare);
totalResource = (int) Math.min((long)totalResource + (long)fixedShare,
Integer.MAX_VALUE);
}
@@ -230,10 +235,10 @@ private static int handleFixedFairShares(
* or the Schedulable is not active for instantaneous fairshare.
*/
private static long getFairShareIfFixed(Schedulable sched,
- boolean isSteadyShare, ResourceType type) {
+ boolean isSteadyShare, String type) {
// Check if maxShare is 0
- if (getResourceValue(sched.getMaxShare(), type) <= 0) {
+ if (sched.getMaxShare().getResourceValue(type) <= 0) {
return 0;
}
@@ -245,34 +250,10 @@ private static long getFairShareIfFixed(Schedulable sched,
// Check if weight is 0
if (sched.getWeights().getWeight(type) <= 0) {
- long minShare = getResourceValue(sched.getMinShare(), type);
+ long minShare = sched.getMinShare().getResourceValue(type);
return (minShare <= 0) ? 0 : minShare;
}
return -1;
}
-
- private static long getResourceValue(Resource resource, ResourceType type) {
- switch (type) {
- case MEMORY:
- return resource.getMemorySize();
- case CPU:
- return resource.getVirtualCores();
- default:
- throw new IllegalArgumentException("Invalid resource");
- }
- }
-
- private static void setResourceValue(long val, Resource resource, ResourceType type) {
- switch (type) {
- case MEMORY:
- resource.setMemorySize(val);
- break;
- case CPU:
- resource.setVirtualCores((int)val);
- break;
- default:
- throw new IllegalArgumentException("Invalid resource");
- }
- }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
index 72377b0c096..7ce0e3e2698 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
@@ -25,18 +26,16 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
-
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
-
-import static org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType.*;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
/**
* Makes scheduling decisions by trying to equalize dominant resource usage.
@@ -72,16 +71,16 @@ public ResourceCalculator getResourceCalculator() {
@Override
public void computeShares(Collection extends Schedulable> schedulables,
Resource totalResources) {
- for (ResourceType type : ResourceType.values()) {
- ComputeFairShares.computeShares(schedulables, totalResources, type);
+ for (String name: ResourceUtils.getResourceNamesArray()) {
+ ComputeFairShares.computeShares(schedulables, totalResources, name);
}
}
@Override
public void computeSteadyShares(Collection extends FSQueue> queues,
Resource totalResources) {
- for (ResourceType type : ResourceType.values()) {
- ComputeFairShares.computeSteadyShares(queues, totalResources, type);
+ for (String name: ResourceUtils.getResourceNamesArray()) {
+ ComputeFairShares.computeSteadyShares(queues, totalResources, name);
}
}
@@ -110,9 +109,13 @@ public void initialize(FSContext fsContext) {
COMPARATOR.setFSContext(fsContext);
}
- public static class DominantResourceFairnessComparator implements Comparator {
- private static final int NUM_RESOURCES = ResourceType.values().length;
-
+ /**
+ * This class compares two {@link Schedulable} instances according to the
+ * DRF policy. If neither instance is below min share, approximate fair share
+ * ratios are compared.
+ */
+ public static class DominantResourceFairnessComparator
+ implements Comparator {
private FSContext fsContext;
public void setFSContext(FSContext fsContext) {
@@ -121,89 +124,196 @@ public void setFSContext(FSContext fsContext) {
@Override
public int compare(Schedulable s1, Schedulable s2) {
- ResourceWeights sharesOfCluster1 = new ResourceWeights();
- ResourceWeights sharesOfCluster2 = new ResourceWeights();
- ResourceWeights sharesOfMinShare1 = new ResourceWeights();
- ResourceWeights sharesOfMinShare2 = new ResourceWeights();
- ResourceType[] resourceOrder1 = new ResourceType[NUM_RESOURCES];
- ResourceType[] resourceOrder2 = new ResourceType[NUM_RESOURCES];
+ ResourceInformation[] info = ResourceUtils.getResourceTypesArray();
+ Resource usage1 = s1.getResourceUsage();
+ Resource usage2 = s2.getResourceUsage();
+ Resource minShare1 = s1.getMinShare();
+ Resource minShare2 = s2.getMinShare();
Resource clusterCapacity = fsContext.getClusterResource();
- // Calculate shares of the cluster for each resource both schedulables.
- calculateShares(s1.getResourceUsage(),
- clusterCapacity, sharesOfCluster1, resourceOrder1, s1.getWeights());
- calculateShares(s1.getResourceUsage(),
- s1.getMinShare(), sharesOfMinShare1, null, ResourceWeights.NEUTRAL);
- calculateShares(s2.getResourceUsage(),
- clusterCapacity, sharesOfCluster2, resourceOrder2, s2.getWeights());
- calculateShares(s2.getResourceUsage(),
- s2.getMinShare(), sharesOfMinShare2, null, ResourceWeights.NEUTRAL);
-
+ // These arrays hold the usage, fair, and min share ratios for each
+ // resource type. ratios[0][x] are the usage ratios, ratios[1][x] are
+ // the fair share ratios, and ratios[2][x] are the min share ratios.
+ float[][] ratios1 = new float[info.length][3];
+ float[][] ratios2 = new float[info.length][3];
+
+ // Calculate cluster shares and approximate fair shares for each
+ // resource type of both schedulables.
+ int dominant1 = calculateClusterAndFairRatios(usage1, clusterCapacity,
+ ratios1, s1.getWeights());
+ int dominant2 = calculateClusterAndFairRatios(usage2, clusterCapacity,
+ ratios2, s2.getWeights());
+
// A queue is needy for its min share if its dominant resource
- // (with respect to the cluster capacity) is below its configured min share
- // for that resource
- boolean s1Needy = sharesOfMinShare1.getWeight(resourceOrder1[0]) < 1.0f;
- boolean s2Needy = sharesOfMinShare2.getWeight(resourceOrder2[0]) < 1.0f;
+ // (with respect to the cluster capacity) is below its configured min
+ // share for that resource
+ boolean s1Needy =
+ usage1.getResources()[dominant1].getValue() <
+ minShare1.getResources()[dominant1].getValue();
+ boolean s2Needy =
+ usage2.getResources()[dominant2].getValue() <
+ minShare2.getResources()[dominant2].getValue();
int res = 0;
+
if (!s2Needy && !s1Needy) {
- res = compareShares(sharesOfCluster1, sharesOfCluster2,
- resourceOrder1, resourceOrder2);
+ // Sort shares by usage ratio and compare them by approximate fair share
+ // ratio
+ sortRatios(ratios1, ratios2);
+ res = compareRatios(ratios1, ratios2, 1);
} else if (s1Needy && !s2Needy) {
res = -1;
} else if (s2Needy && !s1Needy) {
res = 1;
} else { // both are needy below min share
- res = compareShares(sharesOfMinShare1, sharesOfMinShare2,
- resourceOrder1, resourceOrder2);
+ // Calculate the min share ratios, then sort by usage ratio, and compare
+ // by min share ratio
+ calculateMinShareRatios(usage1, minShare1, ratios1);
+ calculateMinShareRatios(usage2, minShare2, ratios2);
+ sortRatios(ratios1, ratios2);
+ res = compareRatios(ratios1, ratios2, 2);
}
+
if (res == 0) {
// Apps are tied in fairness ratio. Break the tie by submit time and job
// name to get a deterministic ordering, which is useful for unit tests.
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
+
if (res == 0) {
res = s1.getName().compareTo(s2.getName());
}
}
+
return res;
}
-
+
+ /**
+ * Sort both ratios arrays according to the usage ratios (the
+ * first index of the inner arrays, e.g. {@code ratios1[x][0]}).
+ *
+ * @param ratios1 the first ratios array
+ * @param ratios2 the second ratios array
+ */
+ @VisibleForTesting
+ void sortRatios(float[][] ratios1, float[][]ratios2) {
+ // sort order descending by resource share
+ Arrays.sort(ratios1, (float[] o1, float[] o2) ->
+ (int) Math.signum(o2[0] - o1[0]));
+ Arrays.sort(ratios2, (float[] o1, float[] o2) ->
+ (int) Math.signum(o2[0] - o1[0]));
+ }
+
/**
- * Calculates and orders a resource's share of a pool in terms of two vectors.
- * The shares vector contains, for each resource, the fraction of the pool that
- * it takes up. The resourceOrder vector contains an ordering of resources
- * by largest share. So if resource=<10 MB, 5 CPU>, and pool=<100 MB, 10 CPU>,
- * shares will be [.1, .5] and resourceOrder will be [CPU, MEMORY].
+ * Calculate a resource's usage ratio and approximate fair share ratio.
+ * The {@code shares} array will be populated with both the usage ratio
+ * and the approximate fair share ratio for each resource type. The usage
+ * ratio is calculated as {@code resource} divided by {@code cluster}.
+ * The approximate fair share ratio is calculated as the usage ratio
+ * divided by {@code weight}. If the cluster's resources are 100MB and
+ * 10 vcores, and the usage ({@code resource}) is 10 MB and 5 CPU, the
+ * usage ratios will be 0.1 and 0.5. If the weights are 2, the fair
+ * share ratios will be 0.05 and 0.25.
+ *
+ * The approximate fair share ratio is the usage divided by the
+ * approximate fair share, i.e. the cluster resources times the weight.
+ * The approximate fair share is an acceptable proxy for the fair share
+ * because when comparing resources, the resource with the higher weight
+ * will be assigned by the scheduler a proportionally higher fair share.
+ *
+ * The {@code shares} array must be at least n x 2, where n
+ * is the number of resource types. Only the first and second indices of
+ * the inner arrays in the {@code shares} array will be used, e.g.
+ * {@code shares[x][0]} and {@code shares[x][1]}.
+ *
+ * The return value will be the index of the dominant resource type in the
+ * {@code shares} array. The dominant resource is the resource type for
+ * which {@code resource} has the largest usage ratio.
+ *
+ * @param resource the resource for which to calculate ratios
+ * @param cluster the total cluster resources
+ * @param ratios the shares array to populate
+ * @param weights the resource weights
+ * @return the index of the resource type with the largest cluster share
*/
@VisibleForTesting
- void calculateShares(Resource resource, Resource pool,
- ResourceWeights shares, ResourceType[] resourceOrder, ResourceWeights weights) {
- shares.setWeight(MEMORY, (float)resource.getMemorySize() /
- (pool.getMemorySize() * weights.getWeight(MEMORY)));
- shares.setWeight(CPU, (float)resource.getVirtualCores() /
- (pool.getVirtualCores() * weights.getWeight(CPU)));
- // sort order vector by resource share
- if (resourceOrder != null) {
- if (shares.getWeight(MEMORY) > shares.getWeight(CPU)) {
- resourceOrder[0] = MEMORY;
- resourceOrder[1] = CPU;
- } else {
- resourceOrder[0] = CPU;
- resourceOrder[1] = MEMORY;
+ int calculateClusterAndFairRatios(Resource resource, Resource cluster,
+ float[][] ratios, ResourceWeights weights) {
+ ResourceInformation[] resourceInfo = resource.getResources();
+ ResourceInformation[] clusterInfo = cluster.getResources();
+ int max = 0;
+
+ for (int i = 0; i < clusterInfo.length; i++) {
+ // First calculate the cluster share
+ ratios[i][0] = resourceInfo[i].getValue() / clusterInfo[i].getValue();
+
+ // Use the cluster share to find the dominant resource
+ if (ratios[i][0] > ratios[max][0]) {
+ max = i;
}
+
+ // Now divide by the weight to get the approximate fair share.
+ // It's OK if the weight is zero, because the floating point division
+ // will yield Infinity, i.e. this Schedulable will lose out to any
+ // other Schedulable with non-zero weight.
+ ratios[i][1] = ratios[i][0] / weights.getWeight(i);
}
+
+ return max;
}
- private int compareShares(ResourceWeights shares1, ResourceWeights shares2,
- ResourceType[] resourceOrder1, ResourceType[] resourceOrder2) {
- for (int i = 0; i < resourceOrder1.length; i++) {
- int ret = (int)Math.signum(shares1.getWeight(resourceOrder1[i])
- - shares2.getWeight(resourceOrder2[i]));
+ /**
+ * Calculate a resource's min share ratios. The {@code ratios} array will be
+ * populated with the {@code resource} divided by {@code minShare} for each
+ * resource type. If the min shares are 5 MB and 10 vcores, and the usage
+ * ({@code resource}) is 10 MB and 5 CPU, the ratios will be 2 and 0.5.
+ *
+ * The {@code ratios} array must be n x 3, where n is the
+ * number of resource types. Only the third index of the inner arrays in
+ * the {@code ratios} array will be used, e.g. {@code ratios[x][2]}.
+ *
+ * @param resource the resource for which to calculate min shares
+ * @param minShare the min share
+ * @param ratios the shares array to populate
+ */
+ @VisibleForTesting
+ void calculateMinShareRatios(Resource resource, Resource minShare,
+ float[][] ratios) {
+ ResourceInformation[] resourceInfo = resource.getResources();
+ ResourceInformation[] minShareInfo = minShare.getResources();
+
+ for (int i = 0; i < minShareInfo.length; i++) {
+ ratios[i][2] = resourceInfo[i].getValue() / minShareInfo[i].getValue();
+ }
+ }
+
+ /**
+ * Compare the two ratios arrays and return -1, 0, or 1 if the first array
+ * is less than, equal to, or greater than the second array, respectively.
+ * The {@code index} parameter determines which index of the inner arrays
+ * will be used for the comparisons. 0 is for usage ratios, 1 is for
+ * fair share ratios, and 2 is for the min share ratios. The ratios arrays
+ * are assumed to be sorted in descending order by usage ratio.
+ *
+ * @param ratios1 the first shares array
+ * @param ratios2 the second shares array
+ * @param index the outer index of the shares arrays to compare. 0 is for
+ * fair shares, and 1 is for min shares
+ * @return -1, 0, or 1 if the first array is less than, equal to, or
+ * greater than the second array, respectively
+ */
+ @VisibleForTesting
+ int compareRatios(float[][] ratios1, float[][] ratios2, int index) {
+ int ret = 0;
+
+ for (int i = 0; i < ratios1.length; i++) {
+ ret = (int) Math.signum(ratios1[i][index] - ratios2[i][index]);
+
if (ret != 0) {
- return ret;
+ break;
}
}
- return 0;
+
+ return ret;
}
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
index 0ef90a1d72f..074f2545b3a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
@@ -26,7 +26,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
@@ -42,9 +42,10 @@
@Private
@Unstable
public class FairSharePolicy extends SchedulingPolicy {
- private static final Log LOG = LogFactory.getLog(FairSharePolicy.class);
@VisibleForTesting
public static final String NAME = "fair";
+ private static final Log LOG = LogFactory.getLog(FairSharePolicy.class);
+ private static final String MEMORY = ResourceInformation.MEMORY_MB.getName();
private static final DefaultResourceCalculator RESOURCE_CALCULATOR =
new DefaultResourceCalculator();
private static final FairShareComparator COMPARATOR =
@@ -164,10 +165,11 @@ private int compareMinShareUsage(Schedulable s1, Schedulable s2,
*/
private int compareFairShareUsage(Schedulable s1, Schedulable s2,
Resource resourceUsage1, Resource resourceUsage2) {
- double weight1 = s1.getWeights().getWeight(ResourceType.MEMORY);
- double weight2 = s2.getWeights().getWeight(ResourceType.MEMORY);
+ double weight1 = s1.getWeights().getWeight(MEMORY);
+ double weight2 = s2.getWeights().getWeight(MEMORY);
double useToWeightRatio1;
double useToWeightRatio2;
+
if (weight1 > 0.0 && weight2 > 0.0) {
useToWeightRatio1 = resourceUsage1.getMemorySize() / weight1;
useToWeightRatio2 = resourceUsage2.getMemorySize() / weight2;
@@ -213,14 +215,13 @@ public Resource getHeadroom(Resource queueFairShare,
@Override
public void computeShares(Collection extends Schedulable> schedulables,
Resource totalResources) {
- ComputeFairShares.computeShares(schedulables, totalResources, ResourceType.MEMORY);
+ ComputeFairShares.computeShares(schedulables, totalResources, MEMORY);
}
@Override
public void computeSteadyShares(Collection extends FSQueue> queues,
Resource totalResources) {
- ComputeFairShares.computeSteadyShares(queues, totalResources,
- ResourceType.MEMORY);
+ ComputeFairShares.computeSteadyShares(queues, totalResources, MEMORY);
}
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java
index 9561234d633..bedc7627ed3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java
@@ -34,11 +34,11 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
-import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -137,7 +137,8 @@ protected void checkDefaultQueueBeforePlanFollowerRun() {
}
@Override
protected void verifyCapacity(Queue defQ) {
- assertTrue(((FSQueue) defQ).getWeights().getWeight(ResourceType.MEMORY) > 0.9);
+ assertTrue(((FSQueue) defQ).getWeights()
+ .getWeight(ResourceInformation.MEMORY_MB.getName()) > 0.9);
}
@Override
@@ -174,7 +175,8 @@ protected void assertReservationQueueExists(ReservationId r,
assertNotNull(q);
// For now we are setting both to same weight
Assert.assertEquals(expectedCapacity,
- q.getWeights().getWeight(ResourceType.MEMORY), 0.01);
+ q.getWeights().getWeight(ResourceInformation.MEMORY_MB.getName()),
+ 0.01);
}
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResourceWeights.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResourceWeights.java
index f420b9ecd22..059c71134d3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResourceWeights.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResourceWeights.java
@@ -17,39 +17,31 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.junit.Assert;
import org.junit.Test;
public class TestResourceWeights {
- @Test(timeout=3000)
+ @Test
public void testWeights() {
ResourceWeights rw1 = new ResourceWeights();
Assert.assertEquals("Default CPU weight should be 0.0f.", 0.0f,
- rw1.getWeight(ResourceType.CPU), 0.00001f);
+ rw1.getWeight(ResourceInformation.VCORES.getName()), 0.00001f);
Assert.assertEquals("Default memory weight should be 0.0f", 0.0f,
- rw1.getWeight(ResourceType.MEMORY), 0.00001f);
+ rw1.getWeight(ResourceInformation.MEMORY_MB.getName()), 0.00001f);
ResourceWeights rw2 = new ResourceWeights(2.0f);
Assert.assertEquals("The CPU weight should be 2.0f.", 2.0f,
- rw2.getWeight(ResourceType.CPU), 0.00001f);
+ rw2.getWeight(ResourceInformation.VCORES.getName()), 0.00001f);
Assert.assertEquals("The memory weight should be 2.0f", 2.0f,
- rw2.getWeight(ResourceType.MEMORY), 0.00001f);
+ rw2.getWeight(ResourceInformation.MEMORY_MB.getName()), 0.00001f);
- // set each individually
- ResourceWeights rw3 = new ResourceWeights(1.5f, 2.0f);
- Assert.assertEquals("The CPU weight should be 2.0f", 2.0f,
- rw3.getWeight(ResourceType.CPU), 0.00001f);
- Assert.assertEquals("The memory weight should be 1.5f", 1.5f,
- rw3.getWeight(ResourceType.MEMORY), 0.00001f);
-
- // reset weights
- rw3.setWeight(ResourceType.CPU, 2.5f);
- Assert.assertEquals("The CPU weight should be set to 2.5f.", 2.5f,
- rw3.getWeight(ResourceType.CPU), 0.00001f);
- rw3.setWeight(ResourceType.MEMORY, 4.0f);
- Assert.assertEquals("The memory weight should be set to 4.0f.", 4.0f,
- rw3.getWeight(ResourceType.MEMORY), 0.00001f);
+ ResourceWeights rwn = ResourceWeights.NEUTRAL;
+ Assert.assertEquals("Default CPU weight should be 1.0f.", 1.0f,
+ rwn.getWeight(ResourceInformation.VCORES.getName()), 1.00001f);
+ Assert.assertEquals("Default memory weight should be 1.0f", 1.0f,
+ rwn.getWeight(ResourceInformation.MEMORY_MB.getName()), 1.00001f);
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java
index 4f3ccb2acd4..bba09f1e080 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java
@@ -20,11 +20,11 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.junit.Assert;
import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.ComputeFairShares;
import org.junit.Before;
@@ -52,7 +52,7 @@ public void testEqualSharing() {
scheds.add(new FakeSchedulable());
scheds.add(new FakeSchedulable());
ComputeFairShares.computeShares(scheds,
- Resources.createResource(40), ResourceType.MEMORY);
+ Resources.createResource(40), ResourceInformation.MEMORY_MB.getName());
verifyMemoryShares(10, 10, 10, 10);
}
@@ -70,7 +70,7 @@ public void testLowMaxShares() {
scheds.add(new FakeSchedulable(0, 11));
scheds.add(new FakeSchedulable(0, 3));
ComputeFairShares.computeShares(scheds,
- Resources.createResource(40), ResourceType.MEMORY);
+ Resources.createResource(40), ResourceInformation.MEMORY_MB.getName());
verifyMemoryShares(13, 13, 11, 3);
}
@@ -90,7 +90,7 @@ public void testMinShares() {
scheds.add(new FakeSchedulable(0));
scheds.add(new FakeSchedulable(2));
ComputeFairShares.computeShares(scheds,
- Resources.createResource(40), ResourceType.MEMORY);
+ Resources.createResource(40), ResourceInformation.MEMORY_MB.getName());
verifyMemoryShares(20, 18, 0, 2);
}
@@ -105,7 +105,7 @@ public void testWeightedSharing() {
scheds.add(new FakeSchedulable(0, 1.0));
scheds.add(new FakeSchedulable(0, 0.5));
ComputeFairShares.computeShares(scheds,
- Resources.createResource(45), ResourceType.MEMORY);
+ Resources.createResource(45), ResourceInformation.MEMORY_MB.getName());
verifyMemoryShares(20, 10, 10, 5);
}
@@ -123,7 +123,7 @@ public void testWeightedSharingWithMaxShares() {
scheds.add(new FakeSchedulable(0, 30, 1.0));
scheds.add(new FakeSchedulable(0, 20, 0.5));
ComputeFairShares.computeShares(scheds,
- Resources.createResource(45), ResourceType.MEMORY);
+ Resources.createResource(45), ResourceInformation.MEMORY_MB.getName());
verifyMemoryShares(10, 11, 16, 8);
}
@@ -142,7 +142,7 @@ public void testWeightedSharingWithMinShares() {
scheds.add(new FakeSchedulable(5, 1.0));
scheds.add(new FakeSchedulable(15, 0.5));
ComputeFairShares.computeShares(scheds,
- Resources.createResource(45), ResourceType.MEMORY);
+ Resources.createResource(45), ResourceInformation.MEMORY_MB.getName());
verifyMemoryShares(20, 5, 5, 15);
}
@@ -158,7 +158,8 @@ public void testLargeShares() {
scheds.add(new FakeSchedulable());
scheds.add(new FakeSchedulable());
ComputeFairShares.computeShares(scheds,
- Resources.createResource(40 * million), ResourceType.MEMORY);
+ Resources.createResource(40 * million),
+ ResourceInformation.MEMORY_MB.getName());
verifyMemoryShares(10 * million, 10 * million, 10 * million, 10 * million);
}
@@ -168,7 +169,7 @@ public void testLargeShares() {
@Test
public void testEmptyList() {
ComputeFairShares.computeShares(scheds,
- Resources.createResource(40), ResourceType.MEMORY);
+ Resources.createResource(40), ResourceInformation.MEMORY_MB.getName());
verifyMemoryShares();
}
@@ -186,7 +187,7 @@ public void testCPU() {
scheds.add(new FakeSchedulable(Resources.createResource(0, 15),
new ResourceWeights(0.5f)));
ComputeFairShares.computeShares(scheds,
- Resources.createResource(0, 45), ResourceType.CPU);
+ Resources.createResource(0, 45), ResourceInformation.VCORES.getName());
verifyCPUShares(20, 5, 5, 15);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 446b6ee4e9a..ec80c176cd0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -79,7 +79,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -123,6 +122,7 @@
import org.xml.sax.SAXException;
import com.google.common.collect.Sets;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
@SuppressWarnings("unchecked")
public class TestFairScheduler extends FairSchedulerTestBase {
@@ -1984,7 +1984,8 @@ public void testFairShareAndWeightsInNestedUserQueueRule() throws Exception {
// assert that the steady fair share is 1/4th node1's capacity
assertEquals(capacity / 4, leaf.getSteadyFairShare().getMemorySize());
// assert weights are equal for both the user queues
- assertEquals(1.0, leaf.getWeights().getWeight(ResourceType.MEMORY), 0);
+ assertEquals(1.0, leaf.getWeights()
+ .getWeight(ResourceInformation.MEMORY_MB.getName()), 0);
}
}
}
@@ -5275,7 +5276,7 @@ public void testDumpState() throws IOException {
child1.updateDemand();
String childQueueString = "{Name: root.parent.child1,"
- + " Weight: ,"
+ + " Weight: ,"
+ " Policy: fair,"
+ " FairShare: ,"
+ " SteadyFairShare: ,"
@@ -5292,14 +5293,15 @@ public void testDumpState() throws IOException {
+ " LastTimeAtMinShare: " + clock.getTime()
+ "}";
- assertTrue(child1.dumpState().equals(childQueueString));
+ assertEquals("Unexpected state dump string",
+ childQueueString, child1.dumpState());
FSParentQueue parent =
scheduler.getQueueManager().getParentQueue("parent", false);
parent.setMaxShare(resource);
parent.updateDemand();
String parentQueueString = "{Name: root.parent,"
- + " Weight: ,"
+ + " Weight: ,"
+ " Policy: fair,"
+ " FairShare: ,"
+ " SteadyFairShare: ,"
@@ -5310,7 +5312,7 @@ public void testDumpState() throws IOException {
+ " MaxAMShare: 0.5,"
+ " Runnable: 0}";
- assertTrue(parent.dumpState().equals(
- parentQueueString + ", " + childQueueString));
+ assertEquals("Unexpected state dump string",
+ parentQueueString + ", " + childQueueString, parent.dumpState());
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java
index 3719e2aee08..88ca040126c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java
@@ -23,15 +23,23 @@
import static org.mockito.Mockito.when;
import java.util.Comparator;
+import java.util.Map;
+import org.apache.curator.shaded.com.google.common.base.Joiner;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FakeSchedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy.DominantResourceFairnessComparator;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
/**
@@ -39,10 +47,15 @@
* container before sched2
*/
public class TestDominantResourceFairnessPolicy {
+ @BeforeClass
+ public static void setup() {
+ addResources("test");
+ }
private Comparator createComparator(int clusterMem,
int clusterCpu) {
- DominantResourceFairnessPolicy policy = new DominantResourceFairnessPolicy();
+ DominantResourceFairnessPolicy policy =
+ new DominantResourceFairnessPolicy();
FSContext fsContext = mock(FSContext.class);
when(fsContext.getClusterResource()).
thenReturn(Resources.createResource(clusterMem, clusterCpu));
@@ -77,37 +90,52 @@ private Schedulable createSchedulable(int memUsage, int cpuUsage,
@Test
public void testSameDominantResource() {
- assertTrue(createComparator(8000, 4).compare(
- createSchedulable(1000, 1),
- createSchedulable(2000, 1)) < 0);
+ Comparator c = createComparator(8000, 4);
+ Schedulable s1 = createSchedulable(1000, 1);
+ Schedulable s2 = createSchedulable(2000, 1);
+
+ assertTrue("Comparison didn't return a value less than 0",
+ c.compare(s1, s2) < 0);
}
@Test
public void testDifferentDominantResource() {
- assertTrue(createComparator(8000, 8).compare(
- createSchedulable(4000, 3),
- createSchedulable(2000, 5)) < 0);
+ Comparator c = createComparator(8000, 8);
+ Schedulable s1 = createSchedulable(4000, 3);
+ Schedulable s2 = createSchedulable(2000, 5);
+
+ assertTrue("Comparison didn't return a value less than 0",
+ c.compare(s1, s2) < 0);
}
@Test
public void testOneIsNeedy() {
- assertTrue(createComparator(8000, 8).compare(
- createSchedulable(2000, 5, 0, 6),
- createSchedulable(4000, 3, 0, 0)) < 0);
+ Comparator c = createComparator(8000, 8);
+ Schedulable s1 = createSchedulable(2000, 5, 0, 6);
+ Schedulable s2 = createSchedulable(4000, 3, 0, 0);
+
+ assertTrue("Comparison didn't return a value less than 0",
+ c.compare(s1, s2) < 0);
}
@Test
public void testBothAreNeedy() {
- assertTrue(createComparator(8000, 100).compare(
- // dominant share is 2000/8000
- createSchedulable(2000, 5),
- // dominant share is 4000/8000
- createSchedulable(4000, 3)) < 0);
- assertTrue(createComparator(8000, 100).compare(
- // dominant min share is 2/3
- createSchedulable(2000, 5, 3000, 6),
- // dominant min share is 4/5
- createSchedulable(4000, 3, 5000, 4)) < 0);
+ Comparator c = createComparator(8000, 100);
+ // dominant share is 2000/8000
+ Schedulable s1 = createSchedulable(2000, 5);
+ // dominant share is 4000/8000
+ Schedulable s2 = createSchedulable(4000, 3);
+
+ assertTrue("Comparison didn't return a value less than 0",
+ c.compare(s1, s2) < 0);
+
+ // dominant min share is 2/3
+ s1 = createSchedulable(2000, 5, 3000, 6);
+ // dominant min share is 4/5
+ s2 = createSchedulable(4000, 3, 5000, 4);
+
+ assertTrue("Comparison didn't return a value less than 0",
+ c.compare(s1, s2) < 0);
}
@Test
@@ -132,39 +160,229 @@ public void testEvenWeightsDifferentDominantResource() {
@Test
public void testUnevenWeightsSameDominantResource() {
+ ResourceWeights weights = new ResourceWeights();
+
+ weights.setWeight(ResourceInformation.MEMORY_MB.getName(), 2.0f);
+ weights.setWeight(ResourceInformation.VCORES.getName(), 1.0f);
+
assertTrue(createComparator(8000, 8).compare(
- createSchedulable(3000, 1, new ResourceWeights(2.0f, 1.0f)),
+ createSchedulable(3000, 1, weights),
createSchedulable(2000, 1)) < 0);
+
+ weights.setWeight(ResourceInformation.MEMORY_MB.getName(), 1.0f);
+ weights.setWeight(ResourceInformation.VCORES.getName(), 2.0f);
+
assertTrue(createComparator(8000, 8).compare(
- createSchedulable(1000, 3, new ResourceWeights(1.0f, 2.0f)),
+ createSchedulable(1000, 3, weights),
createSchedulable(1000, 2)) < 0);
}
@Test
public void testUnevenWeightsDifferentDominantResource() {
- assertTrue(createComparator(8000, 8).compare(
- createSchedulable(1000, 3, new ResourceWeights(1.0f, 2.0f)),
+ ResourceWeights weights = new ResourceWeights();
+
+ weights.setWeight(ResourceInformation.MEMORY_MB.getName(), 1.0f);
+ weights.setWeight(ResourceInformation.VCORES.getName(), 2.0f);
+
+ assertTrue("Resource comparison resulted in wrong order",
+ createComparator(8000, 8).compare(
+ createSchedulable(1000, 3, weights),
createSchedulable(2000, 1)) < 0);
- assertTrue(createComparator(8000, 8).compare(
- createSchedulable(3000, 1, new ResourceWeights(2.0f, 1.0f)),
+
+ weights.setWeight(ResourceInformation.MEMORY_MB.getName(), 2.0f);
+ weights.setWeight(ResourceInformation.VCORES.getName(), 1.0f);
+
+ assertTrue("Resource comparison resulted in wrong order",
+ createComparator(8000, 8).compare(
+ createSchedulable(3000, 1, weights),
createSchedulable(1000, 2)) < 0);
}
@Test
+ public void testSortShares() {
+ float[][] ratios1 = {{0.3f, 2.0f}, {0.2f, 1.0f}, {0.4f, 0.1f}};
+ float[][] ratios2 = {{0.2f, 9.0f}, {0.3f, 2.0f}, {0.25f, 0.1f}};
+ float[][] expected1 = {{0.4f, 0.1f}, {0.3f, 2.0f}, {0.2f, 1.0f}};
+ float[][] expected2 = {{0.3f, 2.0f}, {0.25f, 0.1f}, {0.2f, 9.0f}};
+ DominantResourceFairnessComparator comparator =
+ new DominantResourceFairnessComparator();
+
+ comparator.sortRatios(ratios1, ratios2);
+
+ for (int i = 0; i < ratios1.length; i++) {
+ Assert.assertArrayEquals("The shares array was not sorted into the "
+ + "expected order: incorrect inner array encountered",
+ expected1[i], ratios1[i], 0.00001f);
+ Assert.assertArrayEquals("The shares array was not sorted into the "
+ + "expected order: incorrect inner array encountered",
+ expected2[i], ratios2[i], 0.00001f);
+ }
+ }
+
+ @Test
public void testCalculateShares() {
+ Map index = ResourceUtils.getResourceTypeIndex();
Resource used = Resources.createResource(10, 5);
Resource capacity = Resources.createResource(100, 10);
- ResourceType[] resourceOrder = new ResourceType[2];
- ResourceWeights shares = new ResourceWeights();
- DominantResourceFairnessPolicy.DominantResourceFairnessComparator comparator =
- new DominantResourceFairnessPolicy.DominantResourceFairnessComparator();
- comparator.calculateShares(used, capacity, shares, resourceOrder,
- ResourceWeights.NEUTRAL);
-
- assertEquals(.1, shares.getWeight(ResourceType.MEMORY), .00001);
- assertEquals(.5, shares.getWeight(ResourceType.CPU), .00001);
- assertEquals(ResourceType.CPU, resourceOrder[0]);
- assertEquals(ResourceType.MEMORY, resourceOrder[1]);
+ float[][] shares = new float[3][2];
+ DominantResourceFairnessComparator comparator =
+ new DominantResourceFairnessComparator();
+
+ used.setResourceValue("test", 2L);
+ capacity.setResourceValue("test", 5L);
+
+ int dominant = comparator.calculateClusterAndFairRatios(used, capacity,
+ shares, ResourceWeights.NEUTRAL);
+
+ assertEquals("Calculated usage ratio for memory (10MB out of 100MB) is "
+ + "incorrect", 0.1,
+ shares[index.get(ResourceInformation.MEMORY_MB.getName())][0], .00001);
+ assertEquals("Calculated usage ratio for vcores (5 out of 10) is "
+ + "incorrect", 0.5,
+ shares[index.get(ResourceInformation.VCORES.getName())][0], .00001);
+ assertEquals("Calculated usage ratio for test resource (2 out of 5) is "
+ + "incorrect", 0.4, shares[index.get("test")][0], .00001);
+ assertEquals("The wrong dominant resource index was returned",
+ index.get(ResourceInformation.VCORES.getName()).intValue(),
+ dominant);
+ }
+
+ @Test
+ public void testCalculateMinShares() {
+ Map index = ResourceUtils.getResourceTypeIndex();
+ Resource used = Resources.createResource(10, 5);
+ Resource minShares = Resources.createResource(5, 10);
+ float[][] ratios = new float[3][2];
+ DominantResourceFairnessComparator comparator =
+ new DominantResourceFairnessComparator();
+
+ used.setResourceValue("test", 2L);
+ minShares.setResourceValue("test", 0L);
+
+ comparator.calculateMinShareRatios(used, minShares, ratios);
+
+ assertEquals("Calculated min share ratio for memory (10MB out of 5MB) is "
+ + "incorrect", 2.0,
+ ratios[index.get(ResourceInformation.MEMORY_MB.getName())][1], .00001);
+ assertEquals("Calculated min share ratio for vcores (5 out of 10) is "
+ + "incorrect", 0.5,
+ ratios[index.get(ResourceInformation.VCORES.getName())][1], .00001);
+ assertEquals("Calculated min share ratio for test resource (0 out of 5) is "
+ + "incorrect", 0.0, ratios[index.get("test")][0], .00001);
+ }
+
+ @Test
+ public void testCompareShares() {
+ float[][] ratios1 = {
+ {0.4f, 0.1f, 2.0f},
+ {0.3f, 2.0f, 0.1f},
+ {0.2f, 1.0f, 9.0f}
+ };
+ float[][] ratios2 = {
+ {0.3f, 2.0f, 1.0f},
+ {0.2f, 0.1f, 0.5f},
+ {0.2f, 1.0f, 2.0f}
+ };
+ float[][] ratios3 = {
+ {0.3f, 2.0f, 1.0f},
+ {0.2f, 0.1f, 2.0f},
+ {0.1f, 2.0f, 1.0f}
+ };
+ DominantResourceFairnessComparator comparator =
+ new DominantResourceFairnessComparator();
+
+ int ret = comparator.compareRatios(ratios1, ratios2, 0);
+
+ assertEquals("Expected the first array to be larger because the first "
+ + "usage ratio element is larger", 1, ret);
+
+ ret = comparator.compareRatios(ratios2, ratios1, 0);
+
+ assertEquals("Expected the first array to be smaller because the first "
+ + "usage ratio element is smaller", -1, ret);
+
+ ret = comparator.compareRatios(ratios1, ratios1, 0);
+
+ assertEquals("Expected the arrays to be equal, since they're the same "
+ + "array", 0, ret);
+
+ ret = comparator.compareRatios(ratios2, ratios2, 0);
+
+ assertEquals("Expected the arrays to be equal, since they're the same "
+ + "array", 0, ret);
+
+ ret = comparator.compareRatios(ratios3, ratios3, 0);
+
+ assertEquals("Expected the arrays to be equal, since they're the same "
+ + "array", 0, ret);
+
+ ret = comparator.compareRatios(ratios2, ratios3, 0);
+
+ assertEquals("Expected the first array to be larger because the last "
+ + "usage ratio element is larger, and all other elements are equal",
+ 1, ret);
+
+ ret = comparator.compareRatios(ratios1, ratios2, 1);
+
+ assertEquals("Expected the first array to be smaller because the first "
+ + "fair share ratio element is smaller", -1, ret);
+
+ ret = comparator.compareRatios(ratios2, ratios1, 1);
+
+ assertEquals("Expected the first array to be larger because the first "
+ + "fair share ratio element is larger", 1, ret);
+
+ ret = comparator.compareRatios(ratios1, ratios1, 1);
+
+ assertEquals("Expected the arrays to be equal, since they're the same "
+ + "array", 0, ret);
+
+ ret = comparator.compareRatios(ratios2, ratios2, 1);
+
+ assertEquals("Expected the arrays to be equal, since they're the same "
+ + "array", 0, ret);
+
+ ret = comparator.compareRatios(ratios3, ratios3, 1);
+
+ assertEquals("Expected the arrays to be equal, since they're the same "
+ + "array", 0, ret);
+
+ ret = comparator.compareRatios(ratios2, ratios3, 1);
+
+ assertEquals("Expected the first array to be smaller because the last "
+ + "usage ratio element is smaller, and all other elements are equal",
+ 1, ret);
+
+ ret = comparator.compareRatios(ratios1, ratios2, 2);
+
+ assertEquals("Expected the first array to be larger because the first "
+ + "min share ratio element is larger", 1, ret);
+
+ ret = comparator.compareRatios(ratios2, ratios1, 2);
+
+ assertEquals("Expected the first array to be smaller because the first "
+ + "min share ratio element is smaller", -1, ret);
+
+ ret = comparator.compareRatios(ratios1, ratios1, 2);
+
+ assertEquals("Expected the arrays to be equal, since they're the same "
+ + "array", 0, ret);
+
+ ret = comparator.compareRatios(ratios2, ratios2, 2);
+
+ assertEquals("Expected the arrays to be equal, since they're the same "
+ + "array", 0, ret);
+
+ ret = comparator.compareRatios(ratios3, ratios3, 2);
+
+ assertEquals("Expected the arrays to be equal, since they're the same "
+ + "array", 0, ret);
+
+ ret = comparator.compareRatios(ratios2, ratios3, 2);
+
+ assertEquals("Expected the first array to be larger because the second "
+ + "min share ratio element is larger, and all the first elements are "
+ + "equal", 1, ret);
}
@Test
@@ -183,4 +401,12 @@ public void testCompareSchedulablesWithClusterResourceChanges(){
assertTrue(createComparator(8000, 6)
.compare(schedulable1, schedulable2) < 0);
}
+
+ private static void addResources(String... resources) {
+ Configuration conf = new Configuration();
+
+ // Add a third resource to the allowed set
+ conf.set(YarnConfiguration.RESOURCE_TYPES, Joiner.on(',').join(resources));
+ ResourceUtils.resetResourceTypes(conf);
+ }
}