diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java index 440c73cefdd..7413234279c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java @@ -32,10 +32,13 @@ * consumption lies at or below its fair share will never have its containers * preempted. */ -public class ComputeFairShares { +public final class ComputeFairShares { private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25; + private ComputeFairShares() { + } + /** * Compute fair share of the given schedulables.Fair share is an allocation of * shares considering only active schedulables ie schedulables which have @@ -101,19 +104,20 @@ public static void computeSteadyShares( * all Schedulables are only given their minShare) and an upper bound computed * to be large enough that too many slots are given (by doubling R until we * use more than totalResources resources). The helper method - * resourceUsedWithWeightToResourceRatio computes the total resources used with a - * given value of R. + * resourceUsedWithWeightToResourceRatio computes the total resources used + * with a given value of R. *

* The running time of this algorithm is linear in the number of Schedulables, - * because resourceUsedWithWeightToResourceRatio is linear-time and the number of - * iterations of binary search is a constant (dependent on desired precision). + * because resourceUsedWithWeightToResourceRatio is linear-time and the + * number of iterations of binary search is a constant (dependent on desired + * precision). */ private static void computeSharesInternal( Collection allSchedulables, Resource totalResources, ResourceType type, boolean isSteadyShare) { Collection schedulables = new ArrayList(); - int takenResources = handleFixedFairShares( + long takenResources = handleFixedFairShares( allSchedulables, schedulables, isSteadyShare, type); if (schedulables.isEmpty()) { @@ -122,12 +126,11 @@ private static void computeSharesInternal( // Find an upper bound on R that we can use in our binary search. We start // at R = 1 and double it until we have either used all the resources or we // have met all Schedulables' max shares. - int totalMaxShare = 0; + long totalMaxShare = 0; for (Schedulable sched : schedulables) { long maxShare = getResourceValue(sched.getMaxShare(), type); - totalMaxShare = (int) Math.min(maxShare + (long)totalMaxShare, - Integer.MAX_VALUE); - if (totalMaxShare == Integer.MAX_VALUE) { + totalMaxShare = safeAdd(maxShare, totalMaxShare); + if (totalMaxShare == Long.MAX_VALUE) { break; } } @@ -146,7 +149,7 @@ private static void computeSharesInternal( double right = rMax; for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) { double mid = (left + right) / 2.0; - int plannedResourceUsed = resourceUsedWithWeightToResourceRatio( + long plannedResourceUsed = resourceUsedWithWeightToResourceRatio( mid, schedulables, type); if (plannedResourceUsed == totalResource) { right = mid; @@ -171,14 +174,18 @@ private static void computeSharesInternal( /** * Compute the resources that would be used given a weight-to-resource ratio - * w2rRatio, for use in the computeFairShares algorithm as described in # + * w2rRatio, for use in the computeFairShares algorithm as described in + * {@link #computeSharesInternal}. */ - private static int resourceUsedWithWeightToResourceRatio(double w2rRatio, + private static long resourceUsedWithWeightToResourceRatio(double w2rRatio, Collection schedulables, ResourceType type) { - int resourcesTaken = 0; + long resourcesTaken = 0; for (Schedulable sched : schedulables) { - int share = computeShare(sched, w2rRatio, type); - resourcesTaken += share; + long share = computeShare(sched, w2rRatio, type); + resourcesTaken = safeAdd(resourcesTaken, share); + if (resourcesTaken == Long.MAX_VALUE) { + break; + } } return resourcesTaken; } @@ -187,12 +194,12 @@ private static int resourceUsedWithWeightToResourceRatio(double w2rRatio, * Compute the resources assigned to a Schedulable given a particular * weight-to-resource ratio w2rRatio. */ - private static int computeShare(Schedulable sched, double w2rRatio, + private static long computeShare(Schedulable sched, double w2rRatio, ResourceType type) { double share = sched.getWeights().getWeight(type) * w2rRatio; share = Math.max(share, getResourceValue(sched.getMinShare(), type)); share = Math.min(share, getResourceValue(sched.getMaxShare(), type)); - return (int) share; + return (long) share; } /** @@ -200,11 +207,11 @@ private static int computeShare(Schedulable sched, double w2rRatio, * Returns the resources taken by fixed fairshare schedulables, * and adds the remaining to the passed nonFixedSchedulables. */ - private static int handleFixedFairShares( + private static long handleFixedFairShares( Collection schedulables, Collection nonFixedSchedulables, boolean isSteadyShare, ResourceType type) { - int totalResource = 0; + long totalResource = 0; for (Schedulable sched : schedulables) { long fixedShare = getFairShareIfFixed(sched, isSteadyShare, type); @@ -216,15 +223,15 @@ private static int handleFixedFairShares( ? ((FSQueue)sched).getSteadyFairShare() : sched.getFairShare(), type); - totalResource = (int) Math.min((long)totalResource + (long)fixedShare, - Integer.MAX_VALUE); + totalResource = safeAdd(totalResource, fixedShare); } } return totalResource; } /** - * Get the fairshare for the {@link Schedulable} if it is fixed, -1 otherwise. + * Get the fairshare for the {@link Schedulable} if it is fixed, + * -1 otherwise. * * The fairshare is fixed if either the maxShare is 0, weight is 0, * or the Schedulable is not active for instantaneous fairshare. @@ -275,4 +282,21 @@ private static void setResourceValue(long val, Resource resource, ResourceType t throw new IllegalArgumentException("Invalid resource"); } } + + /** + * Safely add two long values. The result will always be a valid long value. + * If the addition caused an overflow the return value will be set to + * Long.MAX_VALUE. + * @param a first long to add + * @param b second long to add + * @return result of the addition + */ + private static long safeAdd(long a, long b) { + long r = a + b; + // Overflow iff both arguments have the opposite sign of the result + if (((a ^ r) & (b ^ r)) < 0) { + r = Long.MAX_VALUE; + } + return r; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java index 36ff85e5a46..13300b996de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java @@ -68,7 +68,18 @@ public FakeSchedulable(Resource minShare, ResourceWeights weights) { this(minShare, Resources.createResource(Integer.MAX_VALUE, Integer.MAX_VALUE), weights, Resources.createResource(0, 0), Resources.createResource(0, 0), 0); } - + + public FakeSchedulable(long minShare, long maxShare) { + this(minShare, maxShare, 1L); + } + + public FakeSchedulable(long minShare, long maxShare, float weights) { + this(Resources.createResource(minShare, 0), + Resources.createResource(maxShare, 0), + weights, Resources.createResource(0, 0), + Resources.createResource(0, 0), 0); + } + public FakeSchedulable(Resource minShare, Resource maxShare, ResourceWeights weight, Resource fairShare, Resource usage, long startTime) { this.minShare = minShare; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java index 4f3ccb2acd4..b86ba24e95b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.yarn.api.records.Resource; import org.junit.Assert; import org.apache.hadoop.yarn.util.resource.Resources; @@ -38,7 +39,7 @@ @Before public void setUp() throws Exception { - scheds = new ArrayList(); + scheds = new ArrayList<>(); } /** @@ -147,21 +148,71 @@ public void testWeightedSharingWithMinShares() { } /** - * Test that shares are computed accurately even when the number of slots is - * very large. + * Test that shares are computed accurately even when the number of + * resources is very large. + * Test adapted to accommodate long values for resources. */ @Test public void testLargeShares() { - int million = 1000 * 1000; - scheds.add(new FakeSchedulable()); - scheds.add(new FakeSchedulable()); - scheds.add(new FakeSchedulable()); - scheds.add(new FakeSchedulable()); + long giga = 1000L * 1000L * 1000L * 4L; + scheds.add(new FakeSchedulable(0L, giga)); + scheds.add(new FakeSchedulable(0L, giga)); + scheds.add(new FakeSchedulable(0L, giga)); + scheds.add(new FakeSchedulable(0L, giga)); ComputeFairShares.computeShares(scheds, - Resources.createResource(40 * million), ResourceType.MEMORY); - verifyMemoryShares(10 * million, 10 * million, 10 * million, 10 * million); + Resources.createResource(40 * giga), ResourceType.MEMORY); + verifyMemoryShares(giga, giga, giga, giga); } - + + /** + * Test overflow in the resources taken and upper bound. + */ + @Test + public void testLargeMinimums() { + long giga = 1000L * 1000L * 1000L * 4L; + scheds.add(new FakeSchedulable(Long.MAX_VALUE, Long.MAX_VALUE)); + scheds.add(new FakeSchedulable(giga, giga)); + ComputeFairShares.computeShares(scheds, + Resources.createResource(4 * giga), + ResourceType.MEMORY); + verifyMemoryShares(Long.MAX_VALUE, giga); + } + + /** + * Test overflow in the upper bound calculation for the binary search. + */ + @Test + public void testOverflowMaxShare() { + long giga = 1000L * 1000L * 1000L; + scheds.add(new FakeSchedulable(0L, giga)); + scheds.add(new FakeSchedulable(0L, Long.MAX_VALUE)); + ComputeFairShares.computeShares(scheds, + Resources.createResource(2 * giga), + ResourceType.MEMORY); + verifyMemoryShares(giga, giga); + } + + /** + * Test overflow in the fixed share calculations. The 3th schedulable should + * not get any share as all resources are taken by the handleFixedShare() + * call. + * With the overflow it looked like there were more resources available then + * there really are. + * The values in the test might not be "real" but they show the overflow. + */ + @Test + public void testOverflowFixedShare() { + long giga = 1000L * 1000L * 1000L; + long minValue = Long.MAX_VALUE - 1L; + scheds.add(new FakeSchedulable(giga, giga, 0)); + scheds.add(new FakeSchedulable(minValue, Long.MAX_VALUE, 0)); + scheds.add(new FakeSchedulable(0L, giga)); + ComputeFairShares.computeShares(scheds, + Resources.createResource(1000L), + ResourceType.MEMORY); + verifyMemoryShares(giga, minValue, 0); + } + /** * Test that being called on an empty list doesn't confuse the algorithm. */ @@ -173,7 +224,7 @@ public void testEmptyList() { } /** - * Test that CPU works as well as memory + * Test that CPU works as well as memory. */ @Test public void testCPU() { @@ -193,10 +244,12 @@ public void testCPU() { /** * Check that a given list of shares have been assigned to this.scheds. */ - private void verifyMemoryShares(int... shares) { - Assert.assertEquals(scheds.size(), shares.length); + private void verifyMemoryShares(long... shares) { + Assert.assertEquals("Number of shares and schedulables are not consistent", + scheds.size(), shares.length); for (int i = 0; i < shares.length; i++) { - Assert.assertEquals(shares[i], scheds.get(i).getFairShare().getMemorySize()); + Assert.assertEquals("Expected share number " + i + " in list wrong", + shares[i], scheds.get(i).getFairShare().getMemorySize()); } } @@ -204,9 +257,11 @@ private void verifyMemoryShares(int... shares) { * Check that a given list of shares have been assigned to this.scheds. */ private void verifyCPUShares(int... shares) { - Assert.assertEquals(scheds.size(), shares.length); + Assert.assertEquals("Number of shares and schedulables are not consistent", + scheds.size(), shares.length); for (int i = 0; i < shares.length; i++) { - Assert.assertEquals(shares[i], scheds.get(i).getFairShare().getVirtualCores()); + Assert.assertEquals("Expected share number " + i + " in list wrong", + shares[i], scheds.get(i).getFairShare().getVirtualCores()); } } }