diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 3b3f6ce..c09d7d8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -134,7 +134,8 @@ public void setPolicy(SchedulingPolicy policy) @Override public void recomputeShares() { - policy.computeShares(getRunnableAppSchedulables(), getFairShare()); + policy.computeShares(getRunnableAppSchedulables(), getFairShare(), + getDynamicFairShare()); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 9af72a5..b9a7e00 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -61,7 +61,7 @@ public void addChildQueue(FSQueue child) { @Override public void recomputeShares() { - policy.computeShares(childQueues, getFairShare()); + policy.computeShares(childQueues, getFairShare(), getDynamicFairShare()); for (FSQueue childQueue : childQueues) { childQueue.getMetrics().setFairShare(childQueue.getFairShare()); childQueue.recomputeShares(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 27a0075..639e1fe 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -274,6 +274,7 @@ protected synchronized void update() { rootQueue.updateDemand(); rootQueue.setFairShare(clusterResource); + rootQueue.setDynamicFairShare(clusterResource); // Recursively compute fair shares for all queues // and update metrics rootQueue.recomputeShares(); @@ -325,7 +326,7 @@ boolean isStarvedForMinShare(FSLeafQueue sched) { */ boolean isStarvedForFairShare(FSLeafQueue sched) { Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterResource, - Resources.multiply(sched.getFairShare(), .5), sched.getDemand()); + Resources.multiply(sched.getDynamicFairShare(), .5), sched.getDemand()); return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource, sched.getResourceUsage(), desiredFairShare); } @@ -469,7 +470,7 @@ protected Resource resToPreempt(FSLeafQueue sched, long curTime) { long minShareTimeout = allocConf.getMinSharePreemptionTimeout(queue); long fairShareTimeout = allocConf.getFairSharePreemptionTimeout(); Resource resDueToMinShare = Resources.none(); - Resource resDueToFairShare = Resources.none(); + Resource resDueToDynamicFairShare = Resources.none(); if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, sched.getMinShare(), sched.getDemand()); @@ -478,17 +479,18 @@ protected Resource resToPreempt(FSLeafQueue sched, long curTime) { } if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) { Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, - sched.getFairShare(), sched.getDemand()); - resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, - Resources.none(), Resources.subtract(target, sched.getResourceUsage())); + sched.getDynamicFairShare(), sched.getDemand()); + resDueToDynamicFairShare = Resources.max(RESOURCE_CALCULATOR, + clusterResource, Resources.none(), + Resources.subtract(target, sched.getResourceUsage())); } Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterResource, - resDueToMinShare, resDueToFairShare); + resDueToMinShare, resDueToDynamicFairShare); if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt, Resources.none())) { String message = "Should preempt " + resToPreempt + " res for queue " + sched.getName() + ": resDueToMinShare = " + resDueToMinShare - + ", resDueToFairShare = " + resDueToFairShare; + + ", resDueToDynamicFairShare = " + resDueToDynamicFairShare; LOG.info(message); } return resToPreempt; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java index 4f8ac1e..16f51f8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java @@ -60,6 +60,7 @@ public abstract class Schedulable { /** Fair share assigned to this Schedulable */ private Resource fairShare = Resources.createResource(0); + private Resource dynamicFairShare = Resources.createResource(0); /** * Name of job/queue, used for debugging as well as for breaking ties in @@ -116,6 +117,28 @@ public Resource getFairShare() { return fairShare; } + /** Assign dynamic fair share to this Schedulable. */ + public void setDynamicFairShare(Resource dynamicFairShare) { + this.dynamicFairShare = dynamicFairShare; + } + + /** Get the dynamic fair share assigned to this Schedulable. */ + public Resource getDynamicFairShare() { + return dynamicFairShare; + } + + /** + * Returns true if queue has atleast one app running. Always returns true for + * AppSchedulables. + */ + public boolean isActive() { + if (this instanceof FSQueue) { + FSQueue queue = (FSQueue) this; + return queue.getNumRunnableApps() > 0; + } + return true; + } + /** Convenient toString implementation for debugging. */ @Override public String toString() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java index 1087c73..60e1227 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java @@ -134,11 +134,18 @@ public static boolean isApplicableTo(SchedulingPolicy policy, byte depth) { * Computes and updates the shares of {@link Schedulable}s as per the * {@link SchedulingPolicy}, to be used later at schedule time. * - * @param schedulables {@link Schedulable}s whose shares are to be updated - * @param totalResources Total {@link Resource}s in the cluster + * @param schedulables + * {@link Schedulable}s whose shares are to be updated + * @param totalStaticResources + * Total {@link Resource}s in the parent queue of schedulables + * considering all sibling queues of parent + * @param totalDynamicResources + * Total {@link Resource}s in the parent queue of schedulables + * considering only active sibling queues of parent */ public abstract void computeShares( - Collection schedulables, Resource totalResources); + Collection schedulables, + Resource totalStaticResources, Resource totalDynamicResources); /** * Check if the resource usage is over the fair share under this policy diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java index 77dad49..4b56077 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies; +import java.util.ArrayList; import java.util.Collection; import org.apache.hadoop.yarn.api.records.Resource; @@ -33,12 +34,61 @@ public class ComputeFairShares { private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25; + + /** + * Compute static and dynamic fair share of the given schedulables. Static + * fair share is an allocation of shares to schedulables considering all + * schedulables. Dynamic fair share is an allocation of shares considering + * only active schedulables ie schedulables which have running apps. Static + * fair share could be used for predicting guaranteed finish time for + * applications in a queue. Dynamic fair share is used by the scheduler for + * triggering preemption. + * + * @param schedulables + * @param totalStaticResources + * @param totalDynamicResources + * @param type + */ + public static void computeShares( + Collection schedulables, + Resource totalStaticResources, Resource totalDynamicResources, + ResourceType type) { + if (schedulables.isEmpty()) { + return; + } + // First compute static w2rRatio by passing all schedulables + double w2rRatioStatic = getWeightToResourceRatio(schedulables, + totalStaticResources, type); + + // Next compute dynamic w2rRatio fair share by passing only active + // schedulables + Collection activeSchedulables = new ArrayList(); + for (Schedulable sched : schedulables) { + if (sched.isActive()) { + activeSchedulables.add(sched); + } else { + setResourceValue(0, sched.getDynamicFairShare(), type); + } + } + double w2rRatioDynamic = getWeightToResourceRatio(activeSchedulables, + totalDynamicResources, type); + + // Set static and dynamic fair share + for (Schedulable sched : schedulables) { + setResourceValue(computeShare(sched, w2rRatioStatic, type), + sched.getFairShare(), type); + if (activeSchedulables.contains(sched)) { + setResourceValue(computeShare(sched, w2rRatioDynamic, type), + sched.getDynamicFairShare(), type); + } + } + } /** - * Given a set of Schedulables and a number of slots, compute their weighted - * fair shares. The min and max shares and of the Schedulables are assumed to - * be set beforehand. We compute the fairest possible allocation of shares to - * the Schedulables that respects their min and max shares. + * Given a set of Schedulables and a number of slots, compute their weight + * to resource ratio. The min and max shares and of the Schedulables are + * assumed to be set beforehand. We compute the fairest possible allocation + * of shares to the Schedulables that respects their min and max shares. * * To understand what this method does, we must first define what weighted * fair sharing means in the presence of min and max shares. If there @@ -75,12 +125,9 @@ * because resourceUsedWithWeightToResourceRatio is linear-time and the number of * iterations of binary search is a constant (dependent on desired precision). */ - public static void computeShares( + private static double getWeightToResourceRatio( Collection schedulables, Resource totalResources, ResourceType type) { - if (schedulables.isEmpty()) { - return; - } // Find an upper bound on R that we can use in our binary search. We start // at R = 1 and double it until we have either used all the resources or we // have met all Schedulables' max shares. @@ -118,10 +165,8 @@ public static void computeShares( right = mid; } } - // Set the fair shares based on the value of R we've converged to - for (Schedulable sched : schedulables) { - setResourceValue(computeShare(sched, right, type), sched.getFairShare(), type); - } + + return right; } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index af674b9..1c2bf4d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -63,9 +63,10 @@ public byte getApplicableDepth() { @Override public void computeShares(Collection schedulables, - Resource totalResources) { + Resource totalStaticResources, Resource totalDynamicResources) { for (ResourceType type : ResourceType.values()) { - ComputeFairShares.computeShares(schedulables, totalResources, type); + ComputeFairShares.computeShares(schedulables, totalStaticResources, + totalDynamicResources, type); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index c51852f..fd3792f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -115,8 +115,9 @@ else if (s1Needy && s2Needy) @Override public void computeShares(Collection schedulables, - Resource totalResources) { - ComputeFairShares.computeShares(schedulables, totalResources, ResourceType.MEMORY); + Resource totalStaticResources, Resource totalDynamicResources) { + ComputeFairShares.computeShares(schedulables, totalStaticResources, + totalDynamicResources, ResourceType.MEMORY); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java index 0f43097..4f07a38 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java @@ -72,7 +72,8 @@ public int compare(Schedulable s1, Schedulable s2) { @Override public void computeShares(Collection schedulables, - Resource totalResources) { + Resource totalStaticResources, + Resource totalDynamicResources) { if (schedulables.isEmpty()) { return; } @@ -84,7 +85,8 @@ public void computeShares(Collection schedulables, earliest = schedulable; } } - earliest.setFairShare(Resources.clone(totalResources)); + earliest.setFairShare(Resources.clone(totalStaticResources)); + earliest.setDynamicFairShare(Resources.clone(totalDynamicResources)); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java index 9d8dd07..1a423d4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java @@ -51,8 +51,8 @@ public void testEqualSharing() { scheds.add(new FakeSchedulable()); scheds.add(new FakeSchedulable()); scheds.add(new FakeSchedulable()); - ComputeFairShares.computeShares(scheds, - Resources.createResource(40), ResourceType.MEMORY); + ComputeFairShares.computeShares(scheds, Resources.createResource(40), + Resources.createResource(0), ResourceType.MEMORY); verifyMemoryShares(10, 10, 10, 10); } @@ -69,8 +69,8 @@ public void testLowMaxShares() { scheds.add(new FakeSchedulable(0, 50)); scheds.add(new FakeSchedulable(0, 11)); scheds.add(new FakeSchedulable(0, 3)); - ComputeFairShares.computeShares(scheds, - Resources.createResource(40), ResourceType.MEMORY); + ComputeFairShares.computeShares(scheds, Resources.createResource(40), + Resources.createResource(0), ResourceType.MEMORY); verifyMemoryShares(13, 13, 11, 3); } @@ -89,8 +89,8 @@ public void testMinShares() { scheds.add(new FakeSchedulable(18)); scheds.add(new FakeSchedulable(0)); scheds.add(new FakeSchedulable(2)); - ComputeFairShares.computeShares(scheds, - Resources.createResource(40), ResourceType.MEMORY); + ComputeFairShares.computeShares(scheds, Resources.createResource(40), + Resources.createResource(0), ResourceType.MEMORY); verifyMemoryShares(20, 18, 0, 2); } @@ -104,8 +104,8 @@ public void testWeightedSharing() { scheds.add(new FakeSchedulable(0, 1.0)); scheds.add(new FakeSchedulable(0, 1.0)); scheds.add(new FakeSchedulable(0, 0.5)); - ComputeFairShares.computeShares(scheds, - Resources.createResource(45), ResourceType.MEMORY); + ComputeFairShares.computeShares(scheds, Resources.createResource(45), + Resources.createResource(0), ResourceType.MEMORY); verifyMemoryShares(20, 10, 10, 5); } @@ -122,8 +122,8 @@ public void testWeightedSharingWithMaxShares() { scheds.add(new FakeSchedulable(0, 11, 1.0)); scheds.add(new FakeSchedulable(0, 30, 1.0)); scheds.add(new FakeSchedulable(0, 20, 0.5)); - ComputeFairShares.computeShares(scheds, - Resources.createResource(45), ResourceType.MEMORY); + ComputeFairShares.computeShares(scheds, Resources.createResource(45), + Resources.createResource(0), ResourceType.MEMORY); verifyMemoryShares(10, 11, 16, 8); } @@ -141,8 +141,8 @@ public void testWeightedSharingWithMinShares() { scheds.add(new FakeSchedulable(0, 1.0)); scheds.add(new FakeSchedulable(5, 1.0)); scheds.add(new FakeSchedulable(15, 0.5)); - ComputeFairShares.computeShares(scheds, - Resources.createResource(45), ResourceType.MEMORY); + ComputeFairShares.computeShares(scheds, Resources.createResource(45), + Resources.createResource(0), ResourceType.MEMORY); verifyMemoryShares(20, 5, 5, 15); } @@ -158,7 +158,8 @@ public void testLargeShares() { scheds.add(new FakeSchedulable()); scheds.add(new FakeSchedulable()); ComputeFairShares.computeShares(scheds, - Resources.createResource(40 * million), ResourceType.MEMORY); + Resources.createResource(40 * million), Resources.createResource(0), + ResourceType.MEMORY); verifyMemoryShares(10 * million, 10 * million, 10 * million, 10 * million); } @@ -167,9 +168,9 @@ public void testLargeShares() { */ @Test public void testEmptyList() { - ComputeFairShares.computeShares(scheds, - Resources.createResource(40), ResourceType.MEMORY); - verifyMemoryShares(); + ComputeFairShares.computeShares(scheds, Resources.createResource(40), + Resources.createResource(0), ResourceType.MEMORY); + verifyMemoryShares(); } /** @@ -185,8 +186,8 @@ public void testCPU() { new ResourceWeights(1.0f))); scheds.add(new FakeSchedulable(Resources.createResource(0, 15), new ResourceWeights(0.5f))); - ComputeFairShares.computeShares(scheds, - Resources.createResource(0, 45), ResourceType.CPU); + ComputeFairShares.computeShares(scheds, Resources.createResource(0, 45), + Resources.createResource(0), ResourceType.CPU); verifyCPUShares(20, 5, 5, 15); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 33ec318..a065d81 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -1285,7 +1285,7 @@ public void testPreemptionIsNotDelayedToNextRound() throws Exception { scheduler.update(); Resource toPreempt = scheduler.resToPreempt(scheduler.getQueueManager() .getLeafQueue("queueA.queueA2", false), clock.getTime()); - assertEquals(2980, toPreempt.getMemory()); + assertEquals(3277, toPreempt.getMemory()); // verify if the 3 containers required by queueA2 are preempted in the same // round diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerDynamicFairShare.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerDynamicFairShare.java new file mode 100644 index 0000000..3706650 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerDynamicFairShare.java @@ -0,0 +1,321 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Collection; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Before; +import org.junit.Test; + +public class TestFairSchedulerDynamicFairShare extends FairSchedulerTestBase { + private final static String ALLOC_FILE = new File(TEST_DIR, + TestFairSchedulerPreemption.class.getName() + ".xml").getAbsolutePath(); + + @Before + public void setup() throws IOException { + conf = createConfiguration(); + scheduler = new FairScheduler(); + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler.setRMContext(resourceManager.getRMContext()); + } + + private void configureClusterWithQueuesAndOneNode(int mem, String policy) + throws IOException { + configureClusterWithQueuesAndOneNode(mem, 0, policy); + } + + private void configureClusterWithQueuesAndOneNode(int mem, int vCores, + String policy) throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(" "); + out.println(" 8"); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(" 1"); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(""); + out.println("" + policy + + ""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + RMNode node1 = MockNodes.newNodeInfo(1, + Resources.createResource(mem, vCores), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + } + + @Test + public void testDynamicFairShareNoAppsRunning() throws IOException { + int nodeCapacity = 16 * 1024; + configureClusterWithQueuesAndOneNode(nodeCapacity, "fair"); + + scheduler.update(); + // No apps are running in the cluster,verify if dynamic fair share is zero + // for all + // queues under parentA and parentB. Also verify if static fair share is set + // correctly. + Collection leafQueues = scheduler.getQueueManager() + .getLeafQueues(); + + for (FSLeafQueue leaf : leafQueues) { + if (leaf.getName().startsWith("root.parentA")) { + assertEquals(0, (double) leaf.getDynamicFairShare().getMemory() + / nodeCapacity * 100, 0); + assertEquals(20, (double) leaf.getFairShare().getMemory() + / nodeCapacity * 100, 0.1); + } else if (leaf.getName().startsWith("root.parentB")) { + assertEquals(0, (double) leaf.getDynamicFairShare().getMemory() + / nodeCapacity * 100, 0.1); + assertEquals(5, (double) leaf.getFairShare().getMemory() / nodeCapacity + * 100, 0.1); + } + } + } + + @Test + public void testDynamicFairShareOneAppRunning() throws IOException { + int nodeCapacity = 16 * 1024; + configureClusterWithQueuesAndOneNode(nodeCapacity, "fair"); + + // Run a app in a childA1. Verify whether static fair share is 20%(80/4) and + // dynamic fair share is 100% in childA1, since it is the only active queue. + // Also verify if dynamic fair share is 0 for childA2. since no app is + // running in it. + createSchedulingRequest(2 * 1024, "root.parentA.childA1", "user1"); + + scheduler.update(); + assertEquals( + 20, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentA.childA1", false).getFairShare() + .getMemory() + / nodeCapacity * 100, 0.1); + assertEquals( + 100, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentA.childA1", false).getDynamicFairShare() + .getMemory() + / nodeCapacity * 100, 0.1); + assertEquals( + 0, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentA.childA2", false).getDynamicFairShare() + .getMemory() + / nodeCapacity * 100, 0.1); + } + + @Test + public void testDynamicFairShareMultipleActiveQueuesUnderSameParent() + throws IOException { + int nodeCapacity = 16 * 1024; + configureClusterWithQueuesAndOneNode(nodeCapacity, "fair"); + + // Run apps in childA1,childA2,childA3 + createSchedulingRequest(2 * 1024, "root.parentA.childA1", "user1"); + createSchedulingRequest(2 * 1024, "root.parentA.childA2", "user2"); + createSchedulingRequest(2 * 1024, "root.parentA.childA3", "user3"); + + scheduler.update(); + + // Verify if static fair share is 80 / 4 = 20, while dynamic fair share + // is 100 / 3 = 33% + for (int i = 1; i <= 3; i++) { + assertEquals( + 20, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentA.childA" + i, false).getFairShare() + .getMemory() + / nodeCapacity * 100, .9); + assertEquals( + 33, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentA.childA" + i, false) + .getDynamicFairShare().getMemory() + / nodeCapacity * 100, .9); + } + } + + @Test + public void testDynamicFairShareMultipleActiveQueuesUnderDifferentParent() + throws IOException { + int nodeCapacity = 16 * 1024; + configureClusterWithQueuesAndOneNode(nodeCapacity, "fair"); + + // Run apps in childA1,childA2 which are under parentA + createSchedulingRequest(2 * 1024, "root.parentA.childA1", "user1"); + createSchedulingRequest(3 * 1024, "root.parentA.childA2", "user2"); + + // Run app in childB1 which is under parentB + createSchedulingRequest(1 * 1024, "root.parentB.childB1", "user3"); + + // Run app in root.default queue + createSchedulingRequest(1 * 1024, "root.default", "user4"); + + scheduler.update(); + + // The two active child queues under parentA would + // get dynamic fair share of 80/2=40% + for (int i = 1; i <= 2; i++) { + assertEquals( + 40, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentA.childA" + i, false) + .getDynamicFairShare().getMemory() + / nodeCapacity * 100, .9); + } + + // The child queue under parentB would get a dynamic fair share of 10%, + // basically all of parentB's fair share + assertEquals( + 10, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentB.childB1", false).getDynamicFairShare() + .getMemory() + / nodeCapacity * 100, .9); + } + + @Test + public void testDynamicFairShareResetsToZeroWhenAppsComplete() + throws IOException { + int nodeCapacity = 16 * 1024; + configureClusterWithQueuesAndOneNode(nodeCapacity, "fair"); + + // Run apps in childA1,childA2 which are under parentA + ApplicationAttemptId app1 = createSchedulingRequest(2 * 1024, + "root.parentA.childA1", "user1"); + ApplicationAttemptId app2 = createSchedulingRequest(3 * 1024, + "root.parentA.childA2", "user2"); + + scheduler.update(); + + // Verify if both the active queues under parentA get 50% dynamic fair + // share + for (int i = 1; i <= 2; i++) { + assertEquals( + 50, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentA.childA" + i, false) + .getDynamicFairShare().getMemory() + / nodeCapacity * 100, .9); + } + // Let app under childA1 complete. This should cause the dynamic fair share + // of queue childA1 to be reset to zero,since the queue has no apps running. + // Queue childA2's fair share would increase to 100% since its the only + // active queue. + AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( + app1, RMAppAttemptState.FINISHED, false); + + scheduler.handle(appRemovedEvent1); + scheduler.update(); + + assertEquals( + 0, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentA.childA1", false).getDynamicFairShare() + .getMemory() + / nodeCapacity * 100, 0); + assertEquals( + 100, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentA.childA2", false).getDynamicFairShare() + .getMemory() + / nodeCapacity * 100, 0.1); + } + + @Test + public void testDynamicFairShareWithDRFMultipleActiveQueuesUnderDifferentParent() + throws IOException { + int nodeMem = 16 * 1024; + int nodeVCores = 10; + configureClusterWithQueuesAndOneNode(nodeMem, nodeVCores, "drf"); + + // Run apps in childA1,childA2 which are under parentA + createSchedulingRequest(2 * 1024, "root.parentA.childA1", "user1"); + createSchedulingRequest(3 * 1024, "root.parentA.childA2", "user2"); + + // Run app in childB1 which is under parentB + createSchedulingRequest(1 * 1024, "root.parentB.childB1", "user3"); + + // Run app in root.default queue + createSchedulingRequest(1 * 1024, "root.default", "user4"); + + scheduler.update(); + + // The two active child queues under parentA would + // get 80/2=40% memory and vcores + for (int i = 1; i <= 2; i++) { + assertEquals( + 40, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentA.childA" + i, false) + .getDynamicFairShare().getMemory() + / nodeMem * 100, .9); + assertEquals( + 40, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentA.childA" + i, false) + .getDynamicFairShare().getVirtualCores() + / nodeVCores * 100, .9); + } + + // The only active child queue under parentB would get 10% memory and vcores + assertEquals( + 10, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentB.childB1", false).getDynamicFairShare() + .getMemory() + / nodeMem * 100, .9); + assertEquals( + 10, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentB.childB1", false).getDynamicFairShare() + .getVirtualCores() + / nodeVCores * 100, .9); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestEmptyQueues.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestEmptyQueues.java index 4636c5b..c34fe90 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestEmptyQueues.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestEmptyQueues.java @@ -36,7 +36,7 @@ public void setup() { } private void testComputeShares(SchedulingPolicy policy) { - policy.computeShares(schedulables, Resources.none()); + policy.computeShares(schedulables, Resources.none(), Resources.none()); } @Test (timeout = 1000)