diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 3778cba..2111e0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -80,6 +80,7 @@ private Resource preemptedResources = Resources.createResource(0); private RMContainerComparator comparator = new RMContainerComparator(); private final Map preemptionMap = new HashMap(); + private long lastTimeAtFairShareThreshold; // Used to record node reservation by an app. // Key = RackName, Value = Set of Nodes reserved by app on rack @@ -107,6 +108,7 @@ public FSAppAttempt(FairScheduler scheduler, this.startTime = scheduler.getClock().getTime(); this.priority = Priority.newInstance(1); this.resourceWeights = new ResourceWeights(); + this.lastMemoryAggregateAllocationUpdateTime = scheduler.getClock().getTime(); } public ResourceWeights getResourceWeights() { @@ -937,4 +939,26 @@ public RMContainer preemptContainer() { } return toBePreempted; } + + public long getLastTimeAtFairShareThreshold() { + return lastTimeAtFairShareThreshold; + } + + public void setLastTimeAtFairShareThreshold( + long lastTimeAtFairShareThreshold) { + this.lastTimeAtFairShareThreshold = lastTimeAtFairShareThreshold; + } + + /** + * Is a appAttempt being starved for its fair share threshold. + */ + public boolean isAppStarvedForFairShare () { + Resource needed = Resources.multiply(getFairShare(), + getQueue().getFairSharePreemptionThreshold()); + Resource desiredShare = Resources.min( + getQueue().policy.getResourceCalculator(), + scheduler.getClusterResource(), needed, getDemand()); + return Resources.lessThan(getQueue().policy.getResourceCalculator(), + scheduler.getClusterResource(), getResourceUsage(), desiredShare); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index ca5a146..516478f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -271,13 +271,17 @@ public void updateDemand() { try { for (FSAppAttempt sched : runnableApps) { if (Resources.equals(demand, maxRes)) { - break; + // just update demand for app itself rather than leaf queue; + sched.updateDemand(); + continue; } updateDemandForApp(sched, maxRes); } for (FSAppAttempt sched : nonRunnableApps) { if (Resources.equals(demand, maxRes)) { - break; + // just update demand for app itself rather than leaf queue; + sched.updateDemand(); + continue; } updateDemandForApp(sched, maxRes); } @@ -511,7 +515,8 @@ public void recoverContainer(Resource clusterResource, } /** - * Update the preemption fields for the queue, i.e. the times since last was + * Update the preemption fields for the queue and each runnable app in queue, + * i.e. the times since last was * at its guaranteed share and over its fair share threshold. */ public void updateStarvationStats() { @@ -522,6 +527,12 @@ public void updateStarvationStats() { if (!isStarvedForFairShare()) { setLastTimeAtFairShareThreshold(now); } + // check each app starvation stats + for (FSAppAttempt app : runnableApps) { + if (!app.isAppStarvedForFairShare()) { + app.setLastTimeAtFairShareThreshold(now); + } + } } /** Allows setting weight for a dynamically created queue @@ -567,4 +578,8 @@ private boolean isStarved(Resource share) { return Resources.lessThan(policy.getResourceCalculator(), scheduler.getClusterResource(), resourceUsage, desiredShare); } + + public List getRunnableApps() { + return runnableApps; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 9c16e49..1ab41a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -539,6 +539,23 @@ protected Resource resourceDeficit(FSLeafQueue sched, long curTime) { } Resource deficit = Resources.max(calc, clusterResource, resDueToMinShare, resDueToFairShare); + + // if queue-level need not to preempt, check each app in queue + if (!Resources.greaterThan(calc, clusterResource, deficit, Resources.none())) { + for(FSAppAttempt app : sched.getRunnableApps()) { + if (curTime - app.getLastTimeAtFairShareThreshold() > fairShareTimeout) { + Resource target = Resources.min(calc, clusterResource, + app.getFairShare(), app.getDemand()); + Resource resForApp = Resources.max(calc, clusterResource, + Resources.none(), Resources.subtract(target, app.getResourceUsage())); + deficit = Resources.add(deficit, resForApp); + } + } + if (Resources.greaterThan(calc, clusterResource, deficit, Resources.none())) { + resDueToFairShare = deficit; + } + } + if (Resources.greaterThan(calc, clusterResource, deficit, Resources.none())) { String message = "Should preempt " + deficit + " res for queue " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 2f48380..1038692 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2125,7 +2125,129 @@ public void testPreemptionDecision() throws Exception { assertEquals( 1536, scheduler.resourceDeficit(schedD, clock.getTime()).getMemory()); } + + @Test + /** + * Tests the timing of decision to preempt tasks. + */ + public void testPreemptionDecisionWithinQueue() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + ControlledClock clock = new ControlledClock(); + scheduler.setClock(clock); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("0mb,0vcores"); + out.println(""); + out.println(""); + out.println(".25"); + out.println("1024mb,0vcores"); + out.println("3072mb,3vcores"); + out.println(""); + out.println(""); + out.println(".25"); + out.println("1024mb,0vcores"); + out.println("3072mb,3vcores"); + out.println(""); + out.println(""); + out.println(".25"); + out.println("1024mb,0vcores"); + out.println(""); + out.println(""); + out.println(".25"); + out.println("1024mb,0vcores"); + out.println(""); + out.println("5"); + out.println("10"); + out.println(".5"); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Create four nodes + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + RMNode node3 = + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3, + "127.0.0.3"); + NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); + scheduler.handle(nodeEvent3); + + RMNode node4 = + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3, + "127.0.0.4"); + NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4); + scheduler.handle(nodeEvent4); + + // Queue A and B each request 4 containers + ApplicationAttemptId app1 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1); + ApplicationAttemptId app2 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 2); + + scheduler.update(); + + // Sufficient node check-ins to fully schedule containers + for (int i = 0; i < 2; i++) { + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeUpdate1); + + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); + scheduler.handle(nodeUpdate2); + + NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate3); + + NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4); + scheduler.handle(nodeUpdate4); + } + + // Now new requests arrive from queue B + ApplicationAttemptId app3 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 2); + + scheduler.update(); + + FSLeafQueue schedA = + scheduler.getQueueManager().getLeafQueue("queueA", true); + FSLeafQueue schedB = + scheduler.getQueueManager().getLeafQueue("queueB", true); + + assertTrue(Resources.equals( + Resources.none(), scheduler.resourceDeficit(schedA, clock.getTime()))); + assertTrue(Resources.equals( + Resources.none(), scheduler.resourceDeficit(schedB, clock.getTime()))); + // After fairSharePreemptionTime has passed, they should want to preempt fair + // share. + clock.tickSec(12); + // now fairshare/minshare for queue is satisfied, but certain app is starve + // fairshare usage starve + // queueA 3072mb 3072mb no + // queueB 3072mb 3072mb no + // app01 3072mb 3072mb no + // app02 1536mb 3072mb no + // app03 1536mb 0mb yes + assertTrue(Resources.equals( + Resources.none(), scheduler.resourceDeficit(schedA, clock.getTime()))); + assertEquals( + 1536, scheduler.resourceDeficit(schedB, clock.getTime()).getMemory()); + + } @Test /** * Tests the timing of decision to preempt tasks.