diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 6779a1b..f90a198 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -560,9 +560,10 @@ boolean isStarvedForFairShare() { } private boolean isStarved(Resource share) { - Resource desiredShare = Resources.min(scheduler.getResourceCalculator(), - scheduler.getClusterResource(), share, getDemand()); - return Resources.lessThan(scheduler.getResourceCalculator(), - scheduler.getClusterResource(), getResourceUsage(), desiredShare); + Resource desiredShare = Resources.min(policy.getResourceCalculator(), + scheduler.getClusterResource(), share, getDemand()); + Resource resourceUsage = getResourceUsage(); + return Resources.lessThan(policy.getResourceCalculator(), + scheduler.getClusterResource(), resourceUsage, desiredShare); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index cbc10e7..0b60cd5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -490,21 +490,22 @@ protected Resource resToPreempt(FSLeafQueue sched, long curTime) { long fairShareTimeout = sched.getFairSharePreemptionTimeout(); Resource resDueToMinShare = Resources.none(); Resource resDueToFairShare = Resources.none(); + ResourceCalculator calc = sched.getPolicy().getResourceCalculator(); if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { - Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, + Resource target = Resources.componentwiseMin( sched.getMinShare(), sched.getDemand()); - resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, + resDueToMinShare = Resources.max(calc, clusterResource, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); } if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) { - Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, - sched.getFairShare(), sched.getDemand()); - resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, + Resource target = Resources.componentwiseMin( + sched.getFairShare(), sched.getDemand()); + resDueToFairShare = Resources.max(calc, clusterResource, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); } - Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterResource, + Resource resToPreempt = Resources.max(calc, clusterResource, resDueToMinShare, resDueToFairShare); - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, + if (Resources.greaterThan(calc, clusterResource, resToPreempt, Resources.none())) { String message = "Should preempt " + resToPreempt + " res for queue " + sched.getName() + ": resDueToMinShare = " + resDueToMinShare diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java index abdc834..160ba4b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java @@ -26,6 +26,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; + +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; + import java.util.Collection; import java.util.Comparator; import java.util.concurrent.ConcurrentHashMap; @@ -98,6 +101,14 @@ public static SchedulingPolicy parse(String policy) public void initialize(Resource clusterCapacity) {} /** + * The {@link ResourceCalculator} returned by this method should be used + * for any calculations involving resources. + * + * @return ResourceCalculator instance to use + */ + public abstract ResourceCalculator getResourceCalculator(); + + /** * @return returns the name of {@link SchedulingPolicy} */ public abstract String getName(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index 86d503b..45fbf98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -29,6 +29,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; + +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import static org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType.*; @@ -44,8 +47,10 @@ public static final String NAME = "DRF"; - private DominantResourceFairnessComparator comparator = + private static final DominantResourceFairnessComparator COMPARATOR = new DominantResourceFairnessComparator(); + private static final DominantResourceCalculator CALCULATOR = + new DominantResourceCalculator(); @Override public String getName() { @@ -59,9 +64,14 @@ public byte getApplicableDepth() { @Override public Comparator getComparator() { - return comparator; + return COMPARATOR; } - + + @Override + public ResourceCalculator getResourceCalculator() { + return CALCULATOR; + } + @Override public void computeShares(Collection schedulables, Resource totalResources) { @@ -105,7 +115,7 @@ public Resource getHeadroom(Resource queueFairShare, Resource queueUsage, @Override public void initialize(Resource clusterCapacity) { - comparator.setClusterCapacity(clusterCapacity); + COMPARATOR.setClusterCapacity(clusterCapacity); } public static class DominantResourceFairnessComparator implements Comparator { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index 918db9d..3b9f07f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; @@ -43,7 +44,8 @@ public static final String NAME = "fair"; private static final DefaultResourceCalculator RESOURCE_CALCULATOR = new DefaultResourceCalculator(); - private FairShareComparator comparator = new FairShareComparator(); + private static final FairShareComparator COMPARATOR = + new FairShareComparator(); @Override public String getName() { @@ -111,7 +113,12 @@ else if (s1Needy && s2Needy) @Override public Comparator getComparator() { - return comparator; + return COMPARATOR; + } + + @Override + public ResourceCalculator getResourceCalculator() { + return RESOURCE_CALCULATOR; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java index 7d88933..a644e58 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java @@ -27,6 +27,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; + + +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; @@ -36,7 +40,9 @@ public class FifoPolicy extends SchedulingPolicy { @VisibleForTesting public static final String NAME = "FIFO"; - private FifoComparator comparator = new FifoComparator(); + private static final FifoComparator COMPARATOR = new FifoComparator(); + private static final DefaultResourceCalculator CALCULATOR = + new DefaultResourceCalculator(); @Override public String getName() { @@ -68,7 +74,12 @@ public int compare(Schedulable s1, Schedulable s2) { @Override public Comparator getComparator() { - return comparator; + return COMPARATOR; + } + + @Override + public ResourceCalculator getResourceCalculator() { + return CALCULATOR; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index 385ea0b..8ee6c07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -233,6 +233,68 @@ public void testIsStarvedForFairShare() throws Exception { assertFalse(queueB2.isStarvedForFairShare()); } + @Test (timeout = 5000) + public void testIsStarvedForFairShareDRF() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(".5"); + out.println(""); + out.println(""); + out.println(".5"); + out.println(""); + out.println("1"); + out.println("drf"); + out.println(""); + out.close(); + + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + + // Add one big node (only care about aggregate capacity) + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + scheduler.update(); + + // Queue A wants 7 * 1024, 1. Node update gives this all to A + createSchedulingRequest(7 * 1024, 1, "queueA", "user1", 1); + scheduler.update(); + NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeEvent2); + + QueueManager queueMgr = scheduler.getQueueManager(); + FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false); + assertEquals(7 * 1024, queueA.getResourceUsage().getMemory()); + assertEquals(1, queueA.getResourceUsage().getVirtualCores()); + + // Queue B has 3 reqs : + // 1) 2 * 1024, 5 .. which will be granted + // 2) 1 * 1024, 1 .. which will be granted + // 3) 1 * 1024, 1 .. which wont + createSchedulingRequest(2 * 1024, 5, "queueB", "user1", 1); + createSchedulingRequest(1 * 1024, 2, "queueB", "user1", 2); + scheduler.update(); + for (int i = 0; i < 3; i ++) { + scheduler.handle(nodeEvent2); + } + + FSLeafQueue queueB = queueMgr.getLeafQueue("queueB", false); + assertEquals(3 * 1024, queueB.getResourceUsage().getMemory()); + assertEquals(6, queueB.getResourceUsage().getVirtualCores()); + + scheduler.update(); + + // Verify that Queue us not starved for fair share.. + assertFalse(queueB.isStarvedForFairShare()); + } + @Test public void testConcurrentAccess() { conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 56e8adc..6e76f66 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -1851,6 +1851,149 @@ public void testPreemptionDecision() throws Exception { } @Test +/** + * Tests the timing of decision to preempt tasks. + */ + public void testPreemptionDecisionWithDRF() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + ControlledClock clock = new ControlledClock(); + scheduler.setClock(clock); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("0mb,0vcores"); + out.println(""); + out.println(""); + out.println(".25"); + out.println("1024mb,1vcores"); + out.println(""); + out.println(""); + out.println(".25"); + out.println("1024mb,2vcores"); + out.println(""); + out.println(""); + out.println(".25"); + out.println("1024mb,3vcores"); + out.println(""); + out.println(""); + out.println(".25"); + out.println("1024mb,2vcores"); + out.println(""); + out.println("5"); + out.println("10"); + out.println(".5"); + out.println("drf"); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Create four nodes + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 2, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + RMNode node3 = + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 3, + "127.0.0.3"); + NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); + scheduler.handle(nodeEvent3); + + // Queue A and B each request three containers + ApplicationAttemptId app1 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1); + ApplicationAttemptId app2 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2); + ApplicationAttemptId app3 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3); + + ApplicationAttemptId app4 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1); + ApplicationAttemptId app5 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2); + ApplicationAttemptId app6 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3); + + scheduler.update(); + + // Sufficient node check-ins to fully schedule containers + for (int i = 0; i < 2; i++) { + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeUpdate1); + + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); + scheduler.handle(nodeUpdate2); + + NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate3); + } + + // Now new requests arrive from queues C and D + ApplicationAttemptId app7 = + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1); + ApplicationAttemptId app8 = + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2); + ApplicationAttemptId app9 = + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3); + + ApplicationAttemptId app10 = + createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 1); + ApplicationAttemptId app11 = + createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 2); + ApplicationAttemptId app12 = + createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 3); + + scheduler.update(); + + FSLeafQueue schedC = + scheduler.getQueueManager().getLeafQueue("queueC", true); + FSLeafQueue schedD = + scheduler.getQueueManager().getLeafQueue("queueD", true); + + assertTrue(Resources.equals( + Resources.none(), scheduler.resToPreempt(schedC, clock.getTime()))); + assertTrue(Resources.equals( + Resources.none(), scheduler.resToPreempt(schedD, clock.getTime()))); + // After minSharePreemptionTime has passed, they should want to preempt min + // share. + clock.tickSec(6); + Resource res = scheduler.resToPreempt(schedC, clock.getTime()); + assertEquals(1024, res.getMemory()); + // Demand = 3 + assertEquals(3, res.getVirtualCores()); + + res = scheduler.resToPreempt(schedD, clock.getTime()); + assertEquals(1024, res.getMemory()); + // Demand = 6, but min share = 2 + assertEquals(2, res.getVirtualCores()); + + // After fairSharePreemptionTime has passed, they should want to preempt + // fair share. + scheduler.update(); + clock.tickSec(6); + res = scheduler.resToPreempt(schedC, clock.getTime()); + assertEquals(1536, res.getMemory()); + assertEquals(3, res.getVirtualCores()); + + res = scheduler.resToPreempt(schedD, clock.getTime()); + assertEquals(1536, res.getMemory()); + // Demand = 6, but fair share = 3 + assertEquals(3, res.getVirtualCores()); + } + + @Test /** * Tests the various timing of decision to preempt tasks. */