diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java index 49feafa39d8..516e94554f8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java @@ -338,8 +338,7 @@ private static void updateResourceTypeIndex() { initializeResourcesMap(conf, resources); initializedResources = true; } catch (FileNotFoundException fe) { - LOG.info("Unable to find '" + resourceFile - + "'. Falling back to memory and vcores as resources", fe); + LOG.debug("Unable to find '" + resourceFile + "'."); initializeResourcesMap(conf, resources); initializedResources = true; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java index 4c62318e6f4..c3a56564ffb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java @@ -20,52 +20,138 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Evolving; -import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +/** + * {@code ResourceWeights} holds a list of weights for each resource type. + * This class is typically used in the scheduler code to represent weights or + * shares. It is effectively an array of floating point values that can be + * accessed by resource type as well as index. + */ @Private @Evolving public class ResourceWeights { + /** + * A {@code ResourceWeights} instance that has all weights set to 1. + * DO NOT MODIFY this instance. + */ public static final ResourceWeights NEUTRAL = new ResourceWeights(1.0f); + private final float[] weights; - private float[] weights = new float[ResourceType.values().length]; - - public ResourceWeights(float memoryWeight, float cpuWeight) { - weights[ResourceType.MEMORY.ordinal()] = memoryWeight; - weights[ResourceType.CPU.ordinal()] = cpuWeight; + /** + * Create an instance with all weight set to 0. + */ + public ResourceWeights() { + this(0.0f); } + /** + * Create an instance with all weights set to the given weight. + * + * @param weight the weight to use for all resource types + */ public ResourceWeights(float weight) { - setWeight(weight); - } + ResourceInformation[] infos = ResourceUtils.getResourceTypesArray(); + + weights = new float[infos.length]; - public ResourceWeights() { } + for (int i = 0; i < infos.length; i++) { + weights[i] = weight; + } + } - public void setWeight(float weight) { + /** + * Set the weight for all resource types to the given weight. + * + * @param weight the weight to use for all resource types + */ + public final void setWeight(float weight) { for (int i = 0; i < weights.length; i++) { weights[i] = weight; } } - public void setWeight(ResourceType resourceType, float weight) { - weights[resourceType.ordinal()] = weight; + /** + * Set the weight for the given resource type to the given weight. + * + * @param resource the name of the resource type to set + * @param weight the weight to set for the given resource type + */ + public void setWeight(String resource, float weight) { + setWeight(indexOfResouce(resource), weight); + } + + /** + * Set the weight at the given index to the given weight. Use + * {@link ResourceUtils#getResourceTypeIndex()} to map a resource type to + * an index. + * + * @param index the index of the resource type to set + * @param weight the weight to set at the given index + * @see ResourceUtils#getResourceTypeIndex() + */ + public void setWeight(int index, float weight) { + weights[index] = weight; + } + + /** + * Private wrapper method to lookup the resource index in the mapping and + * handle the case when it's not found. + * + * @param resource the resource type to lookup + * @return the index of that resource type in the arrays returned from + * {@link ResourceUtils}. + */ + private int indexOfResouce(String resource) { + Integer index = ResourceUtils.getResourceTypeIndex().get(resource); + + if (index == null) { + throw new ResourceNotFoundException(resource); + } + + return index; + } + + /** + * Return the weight for the given resource type. + * + * @param resource the resource type for which to return the weight + * @return the weight + */ + public float getWeight(String resource) { + return getWeight(indexOfResouce(resource)); } - - public float getWeight(ResourceType resourceType) { - return weights[resourceType.ordinal()]; + + /** + * Return the weight at the given index. Use + * {@link ResourceUtils#getResourceTypeIndex()} to map a resource type to + * an index. + * + * @param resource the index from which to return the weight + * @return the weight + * @see ResourceUtils#getResourceTypeIndex() + */ + public float getWeight(int index) { + return weights[index]; } - + + @Override public String toString() { - StringBuffer sb = new StringBuffer(); + ResourceInformation[] info = ResourceUtils.getResourceTypesArray(); + StringBuilder sb = new StringBuilder(); + sb.append("<"); - for (int i = 0; i < ResourceType.values().length; i++) { - if (i != 0) { - sb.append(", "); - } - ResourceType resourceType = ResourceType.values()[i]; - sb.append(StringUtils.toLowerCase(resourceType.name())); - sb.append(StringUtils.format(" weight=%.1f", getWeight(resourceType))); + + for (int i = 0; i < weights.length; i++) { + sb.append(String.format("%s weight=%.1f", info[i], weights[i])); + sb.append(", "); } + + sb.deleteCharAt(sb.length()); sb.append(">"); + return sb.toString(); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java index 440c73cefdd..fd12c5ff3dd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java @@ -21,7 +21,6 @@ import java.util.Collection; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; @@ -47,7 +46,7 @@ */ public static void computeShares( Collection schedulables, Resource totalResources, - ResourceType type) { + String type) { computeSharesInternal(schedulables, totalResources, type, false); } @@ -62,7 +61,7 @@ public static void computeShares( */ public static void computeSteadyShares( Collection queues, Resource totalResources, - ResourceType type) { + String type) { computeSharesInternal(queues, totalResources, type, true); } @@ -110,9 +109,9 @@ public static void computeSteadyShares( */ private static void computeSharesInternal( Collection allSchedulables, - Resource totalResources, ResourceType type, boolean isSteadyShare) { + Resource totalResources, String type, boolean isSteadyShare) { - Collection schedulables = new ArrayList(); + Collection schedulables = new ArrayList<>(); int takenResources = handleFixedFairShares( allSchedulables, schedulables, isSteadyShare, type); @@ -124,7 +123,7 @@ private static void computeSharesInternal( // have met all Schedulables' max shares. int totalMaxShare = 0; for (Schedulable sched : schedulables) { - long maxShare = getResourceValue(sched.getMaxShare(), type); + long maxShare = sched.getMaxShare().getResourceValue(type); totalMaxShare = (int) Math.min(maxShare + (long)totalMaxShare, Integer.MAX_VALUE); if (totalMaxShare == Integer.MAX_VALUE) { @@ -132,7 +131,7 @@ private static void computeSharesInternal( } } - long totalResource = Math.max((getResourceValue(totalResources, type) - + long totalResource = Math.max((totalResources.getResourceValue(type) - takenResources), 0); totalResource = Math.min(totalMaxShare, totalResource); @@ -159,13 +158,15 @@ private static void computeSharesInternal( } // Set the fair shares based on the value of R we've converged to for (Schedulable sched : schedulables) { + Resource target; + if (isSteadyShare) { - setResourceValue(computeShare(sched, right, type), - ((FSQueue) sched).getSteadyFairShare(), type); + target = ((FSQueue) sched).getSteadyFairShare(); } else { - setResourceValue( - computeShare(sched, right, type), sched.getFairShare(), type); + target = sched.getFairShare(); } + + target.setResourceValue(type, computeShare(sched, right, type)); } } @@ -174,7 +175,7 @@ private static void computeSharesInternal( * w2rRatio, for use in the computeFairShares algorithm as described in # */ private static int resourceUsedWithWeightToResourceRatio(double w2rRatio, - Collection schedulables, ResourceType type) { + Collection schedulables, String type) { int resourcesTaken = 0; for (Schedulable sched : schedulables) { int share = computeShare(sched, w2rRatio, type); @@ -188,10 +189,10 @@ private static int resourceUsedWithWeightToResourceRatio(double w2rRatio, * weight-to-resource ratio w2rRatio. */ private static int computeShare(Schedulable sched, double w2rRatio, - ResourceType type) { + String type) { double share = sched.getWeights().getWeight(type) * w2rRatio; - share = Math.max(share, getResourceValue(sched.getMinShare(), type)); - share = Math.min(share, getResourceValue(sched.getMaxShare(), type)); + share = Math.max(share, sched.getMinShare().getResourceValue(type)); + share = Math.min(share, sched.getMaxShare().getResourceValue(type)); return (int) share; } @@ -203,7 +204,7 @@ private static int computeShare(Schedulable sched, double w2rRatio, private static int handleFixedFairShares( Collection schedulables, Collection nonFixedSchedulables, - boolean isSteadyShare, ResourceType type) { + boolean isSteadyShare, String type) { int totalResource = 0; for (Schedulable sched : schedulables) { @@ -211,11 +212,15 @@ private static int handleFixedFairShares( if (fixedShare < 0) { nonFixedSchedulables.add(sched); } else { - setResourceValue(fixedShare, - isSteadyShare - ? ((FSQueue)sched).getSteadyFairShare() - : sched.getFairShare(), - type); + Resource target; + + if (isSteadyShare) { + target = ((FSQueue)sched).getSteadyFairShare(); + } else { + target = sched.getFairShare(); + } + + target.setResourceValue(type, fixedShare); totalResource = (int) Math.min((long)totalResource + (long)fixedShare, Integer.MAX_VALUE); } @@ -230,10 +235,10 @@ private static int handleFixedFairShares( * or the Schedulable is not active for instantaneous fairshare. */ private static long getFairShareIfFixed(Schedulable sched, - boolean isSteadyShare, ResourceType type) { + boolean isSteadyShare, String type) { // Check if maxShare is 0 - if (getResourceValue(sched.getMaxShare(), type) <= 0) { + if (sched.getMaxShare().getResourceValue(type) <= 0) { return 0; } @@ -245,34 +250,10 @@ private static long getFairShareIfFixed(Schedulable sched, // Check if weight is 0 if (sched.getWeights().getWeight(type) <= 0) { - long minShare = getResourceValue(sched.getMinShare(), type); + long minShare = sched.getMinShare().getResourceValue(type); return (minShare <= 0) ? 0 : minShare; } return -1; } - - private static long getResourceValue(Resource resource, ResourceType type) { - switch (type) { - case MEMORY: - return resource.getMemorySize(); - case CPU: - return resource.getVirtualCores(); - default: - throw new IllegalArgumentException("Invalid resource"); - } - } - - private static void setResourceValue(long val, Resource resource, ResourceType type) { - switch (type) { - case MEMORY: - resource.setMemorySize(val); - break; - case CPU: - resource.setVirtualCores((int)val); - break; - default: - throw new IllegalArgumentException("Invalid resource"); - } - } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index 193ed4dc36c..63a0a04bfc9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -18,24 +18,24 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies; +import com.google.common.annotations.VisibleForTesting; +import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; - import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; - -import static org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType.*; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; /** * Makes scheduling decisions by trying to equalize dominant resource usage. @@ -71,16 +71,16 @@ public ResourceCalculator getResourceCalculator() { @Override public void computeShares(Collection schedulables, Resource totalResources) { - for (ResourceType type : ResourceType.values()) { - ComputeFairShares.computeShares(schedulables, totalResources, type); + for (String name: ResourceUtils.getResourceNamesArray()) { + ComputeFairShares.computeShares(schedulables, totalResources, name); } } @Override public void computeSteadyShares(Collection queues, Resource totalResources) { - for (ResourceType type : ResourceType.values()) { - ComputeFairShares.computeSteadyShares(queues, totalResources, type); + for (String name: ResourceUtils.getResourceNamesArray()) { + ComputeFairShares.computeSteadyShares(queues, totalResources, name); } } @@ -109,9 +109,8 @@ public void initialize(FSContext fsContext) { COMPARATOR.setFSContext(fsContext); } - public static class DominantResourceFairnessComparator implements Comparator { - private static final int NUM_RESOURCES = ResourceType.values().length; - + public static class DominantResourceFairnessComparator + implements Comparator { private FSContext fsContext; public void setFSContext(FSContext fsContext) { @@ -120,88 +119,172 @@ public void setFSContext(FSContext fsContext) { @Override public int compare(Schedulable s1, Schedulable s2) { - ResourceWeights sharesOfCluster1 = new ResourceWeights(); - ResourceWeights sharesOfCluster2 = new ResourceWeights(); - ResourceWeights sharesOfMinShare1 = new ResourceWeights(); - ResourceWeights sharesOfMinShare2 = new ResourceWeights(); - ResourceType[] resourceOrder1 = new ResourceType[NUM_RESOURCES]; - ResourceType[] resourceOrder2 = new ResourceType[NUM_RESOURCES]; + ResourceInformation[] info = ResourceUtils.getResourceTypesArray(); + Resource usage1 = s1.getResourceUsage(); + Resource usage2 = s2.getResourceUsage(); + Resource minShare1 = s1.getMinShare(); + Resource minShare2 = s2.getMinShare(); + float[][] shares1 = new float[info.length][2]; + float[][] shares2 = new float[info.length][2]; Resource clusterCapacity = fsContext.getClusterResource(); // Calculate shares of the cluster for each resource both schedulables. - calculateShares(s1.getResourceUsage(), - clusterCapacity, sharesOfCluster1, resourceOrder1, s1.getWeights()); - calculateShares(s1.getResourceUsage(), - s1.getMinShare(), sharesOfMinShare1, null, ResourceWeights.NEUTRAL); - calculateShares(s2.getResourceUsage(), - clusterCapacity, sharesOfCluster2, resourceOrder2, s2.getWeights()); - calculateShares(s2.getResourceUsage(), - s2.getMinShare(), sharesOfMinShare2, null, ResourceWeights.NEUTRAL); - + int dominant1 = calculateShares(usage1, clusterCapacity, shares1, + s1.getWeights()); + int dominant2 = calculateShares(usage2, clusterCapacity, shares2, + s2.getWeights()); + // A queue is needy for its min share if its dominant resource - // (with respect to the cluster capacity) is below its configured min share - // for that resource - boolean s1Needy = sharesOfMinShare1.getWeight(resourceOrder1[0]) < 1.0f; - boolean s2Needy = sharesOfMinShare2.getWeight(resourceOrder2[0]) < 1.0f; + // (with respect to the cluster capacity) is below its configured min + // share for that resource + boolean s1Needy = + usage1.getResources()[dominant1].getValue() < + minShare1.getResources()[dominant1].getValue(); + boolean s2Needy = + usage2.getResources()[dominant2].getValue() < + minShare2.getResources()[dominant2].getValue(); int res = 0; + if (!s2Needy && !s1Needy) { - res = compareShares(sharesOfCluster1, sharesOfCluster2, - resourceOrder1, resourceOrder2); + // Sort shares by fair share and compare them by fair share + sortShares(shares1, shares2); + res = compareShares(shares1, shares2, true); } else if (s1Needy && !s2Needy) { res = -1; } else if (s2Needy && !s1Needy) { res = 1; } else { // both are needy below min share - res = compareShares(sharesOfMinShare1, sharesOfMinShare2, - resourceOrder1, resourceOrder2); + // Calculate the min shares, then sort by fair share, and compare them + // by min share + calculateMinShares(usage1, minShare1, shares1, s1.getWeights()); + calculateMinShares(usage2, minShare2, shares2, s2.getWeights()); + sortShares(shares1, shares2); + res = compareShares(shares1, shares2, false); } + if (res == 0) { // Apps are tied in fairness ratio. Break the tie by submit time and job // name to get a deterministic ordering, which is useful for unit tests. res = (int) Math.signum(s1.getStartTime() - s2.getStartTime()); + if (res == 0) { res = s1.getName().compareTo(s2.getName()); } } + return res; } - + + /** + * Sort both shares arrays according to the cluster shares (the first index + * of the inner arrays, e.g. {@code shares1[x][0]}). + * + * @param shares1 the first shares array + * @param shares2 the second shares array + */ + @VisibleForTesting + void sortShares(float[][] shares1, float[][]shares2) { + // sort order desceding by resource share + Arrays.sort(shares1, (float[] o1, float[] o2) -> + (int) Math.signum(o2[0] - o1[0])); + Arrays.sort(shares2, (float[] o1, float[] o2) -> + (int) Math.signum(o2[0] - o1[0])); + } + /** - * Calculates and orders a resource's share of a pool in terms of two vectors. - * The shares vector contains, for each resource, the fraction of the pool that - * it takes up. The resourceOrder vector contains an ordering of resources - * by largest share. So if resource=<10 MB, 5 CPU>, and pool=<100 MB, 10 CPU>, - * shares will be [.1, .5] and resourceOrder will be [CPU, MEMORY]. + * Calculate a resource's share of the cluster. The shares array will be + * populated with the fraction of the cluster that {@code resource} + * represents, for each resource. If the clusters resources are 100 MB and + * 10 vcores, and the usage ({@code resource}) is 10 MB and 5 CPU, the + * shares will be 0.1 and 0.5. + * + * The shares array must be n x 2, where n is the number of + * resource types. Only the first index of the inner arrays in the shares + * array will be used, e.g. {@code shares[x][0]}. + * + * @param resource the resource for which to calculate min shares + * @param cluster the cluster resources + * @param shares the shares array + * @param weights the resource weights + * @return the index of the largest share */ - void calculateShares(Resource resource, Resource pool, - ResourceWeights shares, ResourceType[] resourceOrder, ResourceWeights weights) { - shares.setWeight(MEMORY, (float)resource.getMemorySize() / - (pool.getMemorySize() * weights.getWeight(MEMORY))); - shares.setWeight(CPU, (float)resource.getVirtualCores() / - (pool.getVirtualCores() * weights.getWeight(CPU))); - // sort order vector by resource share - if (resourceOrder != null) { - if (shares.getWeight(MEMORY) > shares.getWeight(CPU)) { - resourceOrder[0] = MEMORY; - resourceOrder[1] = CPU; - } else { - resourceOrder[0] = CPU; - resourceOrder[1] = MEMORY; + @VisibleForTesting + int calculateShares(Resource resource, Resource cluster, + float[][] shares, ResourceWeights weights) { + ResourceInformation[] resourceInfo = resource.getResources(); + ResourceInformation[] clusterInfo = cluster.getResources(); + int max = 0; + + for (int i = 0; i < clusterInfo.length; i++) { + shares[i][0] = resourceInfo[i].getValue() / + (clusterInfo[i].getValue() * weights.getWeight(i)); + + if (shares[i][0] > shares[max][0]) { + max = i; } } + + return max; } - private int compareShares(ResourceWeights shares1, ResourceWeights shares2, - ResourceType[] resourceOrder1, ResourceType[] resourceOrder2) { - for (int i = 0; i < resourceOrder1.length; i++) { - int ret = (int)Math.signum(shares1.getWeight(resourceOrder1[i]) - - shares2.getWeight(resourceOrder2[i])); + /** + * Calculate a resource's min shares. The shares array will be populated + * with the ratio of usage compared to min share for each resource. + * If the min shares are 5 MB and 10 vcores, and the usage + * ({@code resource}) is 10 MB and 5 CPU, the ratios will be 2 and 0.5. + * + * The shares array must be n x 2, where n is the number of + * resource types. Only the second index of the inner arrays in the shares + * array will be used, e.g. {@code shares[x][1]}. + * + * @param resource the resource for which to calculate min shares + * @param minShare the min share + * @param shares the shares array + * @param weights the resource weights + */ + @VisibleForTesting + void calculateMinShares(Resource resource, Resource minShare, + float[][] shares, ResourceWeights weights) { + ResourceInformation[] resourceInfo = resource.getResources(); + ResourceInformation[] minShareInfo = minShare.getResources(); + + for (int i = 0; i < minShareInfo.length; i++) { + shares[i][1] = resourceInfo[i].getValue() / + (minShareInfo[i].getValue() * weights.getWeight(i)); + } + } + + /** + * Compare the two shares arrays and return -1, 0, or 1 if the first array + * is less than, equal to, or greater than the second array, respectively. + * If {@code cluster} is true, the comparison is done based on the fair + * shares (stored in index 0 of the inner arrays in the shares array). If + * false, the comparison is done based on the min shares (stored in index 1 + * of the inner arrays in the shares arrays). The shares arrays are assumed + * to be sorted in descending order. + * + * @param shares1 the first shares array + * @param shares2 the second shares array + * @param cluster whether to compare based on cluster shares + * @return -1, 0, or 1 if the first array is less than, equal to, or + * greater than the second array, respectively + */ + @VisibleForTesting + int compareShares(float[][] shares1, float[][] shares2, + boolean cluster) { + int index = cluster ? 0 : 1; + int ret = 0; + + for (int i = 0; i < shares1.length; i++) { + ret = (int) Math.signum(shares1[i][index] - shares2[i][index]); + if (ret != 0) { - return ret; + break; } } - return 0; + + return ret; } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index c3ec47a1eba..d17d5cc8314 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -26,7 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; @@ -43,6 +43,7 @@ @Unstable public class FairSharePolicy extends SchedulingPolicy { private static final Log LOG = LogFactory.getLog(FairSharePolicy.class); + private static String MEMORY = ResourceInformation.MEMORY_MB.getName(); @VisibleForTesting public static final String NAME = "fair"; private static final DefaultResourceCalculator RESOURCE_CALCULATOR = @@ -98,8 +99,8 @@ public int compare(Schedulable s1, Schedulable s2) { minShareRatio2 = (double) resourceUsage2.getMemorySize() / Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemorySize(); - weight1 = s1.getWeights().getWeight(ResourceType.MEMORY); - weight2 = s2.getWeights().getWeight(ResourceType.MEMORY); + weight1 = s1.getWeights().getWeight(MEMORY); + weight2 = s2.getWeights().getWeight(MEMORY); if (weight1 > 0.0 && weight2 > 0.0) { useToWeightRatio1 = resourceUsage1.getMemorySize() / weight1; useToWeightRatio2 = resourceUsage2.getMemorySize() / weight2; @@ -163,14 +164,13 @@ public Resource getHeadroom(Resource queueFairShare, @Override public void computeShares(Collection schedulables, Resource totalResources) { - ComputeFairShares.computeShares(schedulables, totalResources, ResourceType.MEMORY); + ComputeFairShares.computeShares(schedulables, totalResources, MEMORY); } @Override public void computeSteadyShares(Collection queues, Resource totalResources) { - ComputeFairShares.computeSteadyShares(queues, totalResources, - ResourceType.MEMORY); + ComputeFairShares.computeSteadyShares(queues, totalResources, MEMORY); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java index 9561234d633..bedc7627ed3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java @@ -34,11 +34,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; -import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -137,7 +137,8 @@ protected void checkDefaultQueueBeforePlanFollowerRun() { } @Override protected void verifyCapacity(Queue defQ) { - assertTrue(((FSQueue) defQ).getWeights().getWeight(ResourceType.MEMORY) > 0.9); + assertTrue(((FSQueue) defQ).getWeights() + .getWeight(ResourceInformation.MEMORY_MB.getName()) > 0.9); } @Override @@ -174,7 +175,8 @@ protected void assertReservationQueueExists(ReservationId r, assertNotNull(q); // For now we are setting both to same weight Assert.assertEquals(expectedCapacity, - q.getWeights().getWeight(ResourceType.MEMORY), 0.01); + q.getWeights().getWeight(ResourceInformation.MEMORY_MB.getName()), + 0.01); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResourceWeights.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResourceWeights.java index f420b9ecd22..059c71134d3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResourceWeights.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResourceWeights.java @@ -17,39 +17,31 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.junit.Assert; import org.junit.Test; public class TestResourceWeights { - @Test(timeout=3000) + @Test public void testWeights() { ResourceWeights rw1 = new ResourceWeights(); Assert.assertEquals("Default CPU weight should be 0.0f.", 0.0f, - rw1.getWeight(ResourceType.CPU), 0.00001f); + rw1.getWeight(ResourceInformation.VCORES.getName()), 0.00001f); Assert.assertEquals("Default memory weight should be 0.0f", 0.0f, - rw1.getWeight(ResourceType.MEMORY), 0.00001f); + rw1.getWeight(ResourceInformation.MEMORY_MB.getName()), 0.00001f); ResourceWeights rw2 = new ResourceWeights(2.0f); Assert.assertEquals("The CPU weight should be 2.0f.", 2.0f, - rw2.getWeight(ResourceType.CPU), 0.00001f); + rw2.getWeight(ResourceInformation.VCORES.getName()), 0.00001f); Assert.assertEquals("The memory weight should be 2.0f", 2.0f, - rw2.getWeight(ResourceType.MEMORY), 0.00001f); + rw2.getWeight(ResourceInformation.MEMORY_MB.getName()), 0.00001f); - // set each individually - ResourceWeights rw3 = new ResourceWeights(1.5f, 2.0f); - Assert.assertEquals("The CPU weight should be 2.0f", 2.0f, - rw3.getWeight(ResourceType.CPU), 0.00001f); - Assert.assertEquals("The memory weight should be 1.5f", 1.5f, - rw3.getWeight(ResourceType.MEMORY), 0.00001f); - - // reset weights - rw3.setWeight(ResourceType.CPU, 2.5f); - Assert.assertEquals("The CPU weight should be set to 2.5f.", 2.5f, - rw3.getWeight(ResourceType.CPU), 0.00001f); - rw3.setWeight(ResourceType.MEMORY, 4.0f); - Assert.assertEquals("The memory weight should be set to 4.0f.", 4.0f, - rw3.getWeight(ResourceType.MEMORY), 0.00001f); + ResourceWeights rwn = ResourceWeights.NEUTRAL; + Assert.assertEquals("Default CPU weight should be 1.0f.", 1.0f, + rwn.getWeight(ResourceInformation.VCORES.getName()), 1.00001f); + Assert.assertEquals("Default memory weight should be 1.0f", 1.0f, + rwn.getWeight(ResourceInformation.MEMORY_MB.getName()), 1.00001f); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java index 4f3ccb2acd4..bba09f1e080 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java @@ -20,11 +20,11 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.junit.Assert; import org.apache.hadoop.yarn.util.resource.Resources; -import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.ComputeFairShares; import org.junit.Before; @@ -52,7 +52,7 @@ public void testEqualSharing() { scheds.add(new FakeSchedulable()); scheds.add(new FakeSchedulable()); ComputeFairShares.computeShares(scheds, - Resources.createResource(40), ResourceType.MEMORY); + Resources.createResource(40), ResourceInformation.MEMORY_MB.getName()); verifyMemoryShares(10, 10, 10, 10); } @@ -70,7 +70,7 @@ public void testLowMaxShares() { scheds.add(new FakeSchedulable(0, 11)); scheds.add(new FakeSchedulable(0, 3)); ComputeFairShares.computeShares(scheds, - Resources.createResource(40), ResourceType.MEMORY); + Resources.createResource(40), ResourceInformation.MEMORY_MB.getName()); verifyMemoryShares(13, 13, 11, 3); } @@ -90,7 +90,7 @@ public void testMinShares() { scheds.add(new FakeSchedulable(0)); scheds.add(new FakeSchedulable(2)); ComputeFairShares.computeShares(scheds, - Resources.createResource(40), ResourceType.MEMORY); + Resources.createResource(40), ResourceInformation.MEMORY_MB.getName()); verifyMemoryShares(20, 18, 0, 2); } @@ -105,7 +105,7 @@ public void testWeightedSharing() { scheds.add(new FakeSchedulable(0, 1.0)); scheds.add(new FakeSchedulable(0, 0.5)); ComputeFairShares.computeShares(scheds, - Resources.createResource(45), ResourceType.MEMORY); + Resources.createResource(45), ResourceInformation.MEMORY_MB.getName()); verifyMemoryShares(20, 10, 10, 5); } @@ -123,7 +123,7 @@ public void testWeightedSharingWithMaxShares() { scheds.add(new FakeSchedulable(0, 30, 1.0)); scheds.add(new FakeSchedulable(0, 20, 0.5)); ComputeFairShares.computeShares(scheds, - Resources.createResource(45), ResourceType.MEMORY); + Resources.createResource(45), ResourceInformation.MEMORY_MB.getName()); verifyMemoryShares(10, 11, 16, 8); } @@ -142,7 +142,7 @@ public void testWeightedSharingWithMinShares() { scheds.add(new FakeSchedulable(5, 1.0)); scheds.add(new FakeSchedulable(15, 0.5)); ComputeFairShares.computeShares(scheds, - Resources.createResource(45), ResourceType.MEMORY); + Resources.createResource(45), ResourceInformation.MEMORY_MB.getName()); verifyMemoryShares(20, 5, 5, 15); } @@ -158,7 +158,8 @@ public void testLargeShares() { scheds.add(new FakeSchedulable()); scheds.add(new FakeSchedulable()); ComputeFairShares.computeShares(scheds, - Resources.createResource(40 * million), ResourceType.MEMORY); + Resources.createResource(40 * million), + ResourceInformation.MEMORY_MB.getName()); verifyMemoryShares(10 * million, 10 * million, 10 * million, 10 * million); } @@ -168,7 +169,7 @@ public void testLargeShares() { @Test public void testEmptyList() { ComputeFairShares.computeShares(scheds, - Resources.createResource(40), ResourceType.MEMORY); + Resources.createResource(40), ResourceInformation.MEMORY_MB.getName()); verifyMemoryShares(); } @@ -186,7 +187,7 @@ public void testCPU() { scheds.add(new FakeSchedulable(Resources.createResource(0, 15), new ResourceWeights(0.5f))); ComputeFairShares.computeShares(scheds, - Resources.createResource(0, 45), ResourceType.CPU); + Resources.createResource(0, 45), ResourceInformation.VCORES.getName()); verifyCPUShares(20, 5, 5, 15); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 63dcade4c46..b6e61cf478c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -79,7 +79,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -123,6 +122,7 @@ import org.xml.sax.SAXException; import com.google.common.collect.Sets; +import org.apache.hadoop.yarn.api.records.ResourceInformation; @SuppressWarnings("unchecked") public class TestFairScheduler extends FairSchedulerTestBase { @@ -1984,7 +1984,8 @@ public void testFairShareAndWeightsInNestedUserQueueRule() throws Exception { // assert that the steady fair share is 1/4th node1's capacity assertEquals(capacity / 4, leaf.getSteadyFairShare().getMemorySize()); // assert weights are equal for both the user queues - assertEquals(1.0, leaf.getWeights().getWeight(ResourceType.MEMORY), 0); + assertEquals(1.0, leaf.getWeights() + .getWeight(ResourceInformation.MEMORY_MB.getName()), 0); } } } @@ -5277,7 +5278,7 @@ public void testDumpState() throws IOException { child1.updateDemand(); String childQueueString = "{Name: root.parent.child1," - + " Weight: ," + + " Weight: ," + " Policy: fair," + " FairShare: ," + " SteadyFairShare: ," @@ -5294,14 +5295,15 @@ public void testDumpState() throws IOException { + " LastTimeAtMinShare: " + clock.getTime() + "}"; - assertTrue(child1.dumpState().equals(childQueueString)); + assertEquals("Unexpected state dump string", + childQueueString, child1.dumpState()); FSParentQueue parent = scheduler.getQueueManager().getParentQueue("parent", false); parent.setMaxShare(resource); parent.updateDemand(); String parentQueueString = "{Name: root.parent," - + " Weight: ," + + " Weight: ," + " Policy: fair," + " FairShare: ," + " SteadyFairShare: ," @@ -5312,7 +5314,7 @@ public void testDumpState() throws IOException { + " MaxAMShare: 0.5," + " Runnable: 0}"; - assertTrue(parent.dumpState().equals( - parentQueueString + ", " + childQueueString)); + assertEquals("Unexpected state dump string", + parentQueueString + ", " + childQueueString, parent.dumpState()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java index 3719e2aee08..25ae0d45d8c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java @@ -23,15 +23,22 @@ import static org.mockito.Mockito.when; import java.util.Comparator; +import java.util.Map; +import org.apache.curator.shaded.com.google.common.base.Joiner; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FakeSchedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; /** @@ -39,6 +46,10 @@ * container before sched2 */ public class TestDominantResourceFairnessPolicy { + @BeforeClass + public void setup() { + addResources("test"); + } private Comparator createComparator(int clusterMem, int clusterCpu) { @@ -77,37 +88,52 @@ private Schedulable createSchedulable(int memUsage, int cpuUsage, @Test public void testSameDominantResource() { - assertTrue(createComparator(8000, 4).compare( - createSchedulable(1000, 1), - createSchedulable(2000, 1)) < 0); + Comparator c = createComparator(8000, 4); + Schedulable s1 = createSchedulable(1000, 1); + Schedulable s2 = createSchedulable(2000, 1); + + assertTrue("Comparison didn't return a value less than 0", + c.compare(s1, s2) < 0); } @Test public void testDifferentDominantResource() { - assertTrue(createComparator(8000, 8).compare( - createSchedulable(4000, 3), - createSchedulable(2000, 5)) < 0); + Comparator c = createComparator(8000, 8); + Schedulable s1 = createSchedulable(4000, 3); + Schedulable s2 = createSchedulable(2000, 5); + + assertTrue("Comparison didn't return a value less than 0", + c.compare(s1, s2) < 0); } @Test public void testOneIsNeedy() { - assertTrue(createComparator(8000, 8).compare( - createSchedulable(2000, 5, 0, 6), - createSchedulable(4000, 3, 0, 0)) < 0); + Comparator c = createComparator(8000, 8); + Schedulable s1 = createSchedulable(2000, 5, 0, 6); + Schedulable s2 = createSchedulable(4000, 3, 0, 0); + + assertTrue("Comparison didn't return a value less than 0", + c.compare(s1, s2) < 0); } @Test public void testBothAreNeedy() { - assertTrue(createComparator(8000, 100).compare( - // dominant share is 2000/8000 - createSchedulable(2000, 5), - // dominant share is 4000/8000 - createSchedulable(4000, 3)) < 0); - assertTrue(createComparator(8000, 100).compare( - // dominant min share is 2/3 - createSchedulable(2000, 5, 3000, 6), - // dominant min share is 4/5 - createSchedulable(4000, 3, 5000, 4)) < 0); + Comparator c = createComparator(8000, 100); + // dominant share is 2000/8000 + Schedulable s1 = createSchedulable(2000, 5); + // dominant share is 4000/8000 + Schedulable s2 = createSchedulable(4000, 3); + + assertTrue("Comparison didn't return a value less than 0", + c.compare(s1, s2) < 0); + + // dominant min share is 2/3 + s1 = createSchedulable(2000, 5, 3000, 6); + // dominant min share is 4/5 + s2 = createSchedulable(4000, 3, 5000, 4); + + assertTrue("Comparison didn't return a value less than 0", + c.compare(s1, s2) < 0); } @Test @@ -132,39 +158,156 @@ public void testEvenWeightsDifferentDominantResource() { @Test public void testUnevenWeightsSameDominantResource() { + ResourceWeights weights = new ResourceWeights(); + + weights.setWeight(ResourceInformation.MEMORY_MB.getName(), 2.0f); + weights.setWeight(ResourceInformation.VCORES.getName(), 1.0f); + assertTrue(createComparator(8000, 8).compare( - createSchedulable(3000, 1, new ResourceWeights(2.0f, 1.0f)), + createSchedulable(3000, 1, weights), createSchedulable(2000, 1)) < 0); + + weights.setWeight(ResourceInformation.MEMORY_MB.getName(), 1.0f); + weights.setWeight(ResourceInformation.VCORES.getName(), 2.0f); + assertTrue(createComparator(8000, 8).compare( - createSchedulable(1000, 3, new ResourceWeights(1.0f, 2.0f)), + createSchedulable(1000, 3, weights), createSchedulable(1000, 2)) < 0); } @Test public void testUnevenWeightsDifferentDominantResource() { - assertTrue(createComparator(8000, 8).compare( - createSchedulable(1000, 3, new ResourceWeights(1.0f, 2.0f)), + ResourceWeights weights = new ResourceWeights(); + + weights.setWeight(ResourceInformation.MEMORY_MB.getName(), 1.0f); + weights.setWeight(ResourceInformation.VCORES.getName(), 2.0f); + + assertTrue("Resource comparison resulted in wrong order", + createComparator(8000, 8).compare( + createSchedulable(1000, 3, weights), createSchedulable(2000, 1)) < 0); - assertTrue(createComparator(8000, 8).compare( - createSchedulable(3000, 1, new ResourceWeights(2.0f, 1.0f)), + + weights.setWeight(ResourceInformation.MEMORY_MB.getName(), 2.0f); + weights.setWeight(ResourceInformation.VCORES.getName(), 1.0f); + + assertTrue("Resource comparison resulted in wrong order", + createComparator(8000, 8).compare( + createSchedulable(3000, 1, weights), createSchedulable(1000, 2)) < 0); } @Test + public void testSortShares() { + float[][] shares1 = {{0.3f, 2.0f}, {0.2f, 1.0f}, {0.4f, 0.1f}}; + float[][] shares2 = {{0.2f, 9.0f}, {0.3f, 2.0f}, {0.25f, 0.1f}}; + float[][] expected1 = {{0.4f, 0.1f}, {0.3f, 2.0f}, {0.2f, 1.0f}}; + float[][] expected2 = {{0.3f, 2.0f}, {0.25f, 0.1f}, {0.2f, 9.0f}}; + DominantResourceFairnessPolicy.DominantResourceFairnessComparator comparator = + new DominantResourceFairnessPolicy.DominantResourceFairnessComparator(); + + comparator.sortShares(shares1, shares2); + + for (int i = 0; i < shares1.length; i++) { + Assert.assertArrayEquals("The shares array was not sorted into the " + + "expected order: incorrect inner array encountered", + expected1[i], shares1[i], 0.00001f); + Assert.assertArrayEquals("The shares array was not sorted into the " + + "expected order: incorrect inner array encountered", + expected2[i], shares2[i], 0.00001f); + } + } + + @Test public void testCalculateShares() { + Map index = ResourceUtils.getResourceTypeIndex(); Resource used = Resources.createResource(10, 5); Resource capacity = Resources.createResource(100, 10); - ResourceType[] resourceOrder = new ResourceType[2]; - ResourceWeights shares = new ResourceWeights(); + float[][] shares = new float[3][2]; + DominantResourceFairnessPolicy.DominantResourceFairnessComparator comparator = + new DominantResourceFairnessPolicy.DominantResourceFairnessComparator(); + + used.setResourceValue("test", 2); + capacity.setResourceValue("test", 5); + + int dominant = comparator.calculateShares(used, capacity, shares, + ResourceWeights.NEUTRAL); + + assertEquals("Calculated cluster share for memory (10MB out of 100MB) is " + + "incorrect", 0.1, + shares[index.get(ResourceInformation.MEMORY_MB.getName())][0], .00001); + assertEquals("Calculated cluster share for vcores (5 out of 10) is " + + "incorrect", 0.5, + shares[index.get(ResourceInformation.VCORES.getName())][0], .00001); + assertEquals("Calculated cluster share for test resource (2 out of 5) is " + + "incorrect", 0.4, shares[index.get("test")][0], .00001); + assertEquals("The wrong dominant resource index was returned", + index.get(ResourceInformation.VCORES.getName()).intValue(), + dominant); + } + + @Test + public void testCalculateMinShares() { + Map index = ResourceUtils.getResourceTypeIndex(); + Resource used = Resources.createResource(10, 5); + Resource minShares = Resources.createResource(5, 10); + float[][] shares = new float[3][2]; DominantResourceFairnessPolicy.DominantResourceFairnessComparator comparator = new DominantResourceFairnessPolicy.DominantResourceFairnessComparator(); - comparator.calculateShares(used, capacity, shares, resourceOrder, + + used.setResourceValue("test", 2); + minShares.setResourceValue("test", 0); + + comparator.calculateMinShares(used, minShares, shares, ResourceWeights.NEUTRAL); - - assertEquals(.1, shares.getWeight(ResourceType.MEMORY), .00001); - assertEquals(.5, shares.getWeight(ResourceType.CPU), .00001); - assertEquals(ResourceType.CPU, resourceOrder[0]); - assertEquals(ResourceType.MEMORY, resourceOrder[1]); + + assertEquals("Calculated min share for memory (10MB out of 5MB) is " + + "incorrect", 2.0, + shares[index.get(ResourceInformation.MEMORY_MB.getName())][1], .00001); + assertEquals("Calculated min share for vcores (5 out of 10) is " + + "incorrect", 0.5, + shares[index.get(ResourceInformation.VCORES.getName())][1], .00001); + assertEquals("Calculated min share for test resource (0 out of 5) is " + + "incorrect", 0.0, shares[index.get("test")][0], .00001); + } + + @Test + public void testCompareShares() { + float[][] shares1 = {{0.4f, 0.1f}, {0.3f, 2.0f}, {0.2f, 1.0f}}; + float[][] shares2 = {{0.3f, 2.0f}, {0.2f, 0.1f}, {0.2f, 9.0f}}; + float[][] shares3 = {{0.3f, 1.0f}, {0.2f, 0.5f}, {0.1f, 2.0f}}; + DominantResourceFairnessPolicy.DominantResourceFairnessComparator comparator = + new DominantResourceFairnessPolicy.DominantResourceFairnessComparator(); + + int ret = comparator.compareShares(shares1, shares2, true); + + assertEquals("Expected the first array to be larger because the first " + + "cluster share element is larger", 1, ret); + + ret = comparator.compareShares(shares2, shares1, true); + + assertEquals("Expected the first array to be smaller because the first " + + "cluster share element is smaller", -1, ret); + + ret = comparator.compareShares(shares1, shares1, true); + + assertEquals("Expected the arrays to be equal, since they're the same " + + "array", 0, ret); + + ret = comparator.compareShares(shares2, shares2, true); + + assertEquals("Expected the arrays to be equal, since they're the same " + + "array", 0, ret); + + ret = comparator.compareShares(shares3, shares3, true); + + assertEquals("Expected the arrays to be equal, since they're the same " + + "array", 0, ret); + + ret = comparator.compareShares(shares2, shares3, true); + + assertEquals("Expected the first array to be larger because the last " + + "cluster share element is larger, and all other elements are equal", + 1, ret); } @Test @@ -183,4 +326,12 @@ public void testCompareSchedulablesWithClusterResourceChanges(){ assertTrue(createComparator(8000, 6) .compare(schedulable1, schedulable2) < 0); } + + private void addResources(String... resources) { + Configuration conf = new Configuration(); + + // Add a third resource to the allowed set + conf.set(YarnConfiguration.RESOURCE_TYPES, Joiner.on(',').join(resources)); + ResourceUtils.resetResourceTypes(conf); + } }