diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 3c97535..d22b069 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -371,6 +371,10 @@ public RMContainer preemptContainer() { // Preempt from the selected app if (candidateSched != null) { toBePreempted = candidateSched.preemptContainer(); + if (toBePreempted != null) { + getMetrics().incrContainerUnderPreemption(toBePreempted + .getAllocatedResource()); + } } return toBePreempted; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java index 82c422b..6d168d3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java @@ -23,7 +23,13 @@ import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterInt; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGauge; import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.metrics2.lib.MutableRate; +import org.apache.hadoop.metrics2.lib.MutableStat; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -39,7 +45,29 @@ @Metric("Minimum share of CPU in vcores") MutableGaugeInt minShareVCores; @Metric("Maximum share of memory in MB") MutableGaugeInt maxShareMB; @Metric("Maximum share of CPU in vcores") MutableGaugeInt maxShareVCores; - + + // Preemption related metrics + // Preemptee side metrics + @Metric("Containers killed due to preemption") MutableCounterInt preemptedContainersCummulative; + @Metric("CPU vcores lost due to preemption") MutableCounterLong preemptedCpuVcoresCummulative; + @Metric("Memory MB lost due to preemption") MutableCounterLong preemptedMemoryMBCummulative; + + @Metric("Containers under preemption") MutableGaugeInt underPreemptionContainers; + @Metric("CPU vcores currently under preemption") MutableGaugeInt underPreemptionCpuVcores; + @Metric("Memory MB currently under preemption") MutableGaugeInt underPreemptionMemoryMB; + + // Preemptor side metrics + @Metric("Memory MB waiting for fair share preemption") MutableGaugeInt fairPreemptionAskMemoryMB; + @Metric("CPU vcores waiting for fair share preemption") MutableGaugeInt fairPreemptionAskCpuVcores; + + @Metric("Memory MB waiting for min share preemption") MutableGaugeInt minPreemptionAskMemoryMB; + @Metric("CPU vcores waiting for min share preemption") MutableGaugeInt minPreemptionAskCpuVcores; + + @Metric("Amount of time queue was starved of min share ") MutableStat minSharedStarvedTime; + @Metric("Amount of time queue was starved of min share ") MutableStat fairSharedStarvedTime; + @Metric("Current amount of time queue was starved of min share ") MutableGaugeLong minSharedStarvedCurrentTime; + @Metric("Current of times queue was starved of min share ") MutableGaugeLong fairSharedStarvedCurrentTime; + FSQueueMetrics(MetricsSystem ms, String queueName, Queue parent, boolean enableUserMetrics, Configuration conf) { super(ms, queueName, parent, enableUserMetrics, conf); @@ -118,4 +146,44 @@ static FSQueueMetrics forQueue(String queueName, Queue parent, return (FSQueueMetrics)metrics; } + public void incrContainersKilledDueToPreemption(Resource resource) { + preemptedContainersCummulative.incr(); + preemptedCpuVcoresCummulative.incr(resource.getVirtualCores()); + preemptedMemoryMBCummulative.incr(resource.getMemory()); + underPreemptionContainers.decr(); + underPreemptionCpuVcores.decr(resource.getVirtualCores()); + underPreemptionMemoryMB.decr(resource.getMemory()); + } + + public void setMinSharePreemptionAsk(Resource resDueToMinShare) { + minPreemptionAskCpuVcores.set(resDueToMinShare.getVirtualCores()); + minPreemptionAskMemoryMB.set(resDueToMinShare.getMemory()); + } + + public void setFairSharePreemptionAsk(Resource fairSharePreemptionAsk) { + fairPreemptionAskCpuVcores.set(fairSharePreemptionAsk.getVirtualCores()); + fairPreemptionAskMemoryMB.set(fairSharePreemptionAsk.getMemory()); + } + + public void incrContainerUnderPreemption(Resource resourceUnderPreemption) { + underPreemptionContainers.incr(); + underPreemptionCpuVcores.incr(resourceUnderPreemption.getVirtualCores()); + underPreemptionMemoryMB.incr(resourceUnderPreemption.getMemory()); + } + + public void incrMinShareStarvedTime(long additionalTimeStarved) { + minSharedStarvedTime.add(additionalTimeStarved); + } + + public void incrFairShareStarvedTime(long additionalTimeStarved) { + fairSharedStarvedTime.add(additionalTimeStarved); + } + + public void setMinSharePreemptionCurrentWait(long timeStarvedOfMinShare) { + minSharedStarvedCurrentTime.set(timeStarvedOfMinShare); + } + + public void setFairSharePreemptionCurrentWait(long timeStarvedOfFairShare) { + minSharedStarvedCurrentTime.set(timeStarvedOfFairShare); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 1ace604..3e0bd75 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -356,11 +356,13 @@ protected synchronized void preemptTasksIfNecessary() { if (curTime - lastPreemptCheckTime < preemptionInterval) { return; } + long prevLastPreemptCheckTime = lastPreemptCheckTime; lastPreemptCheckTime = curTime; Resource resToPreempt = Resources.clone(Resources.none()); for (FSLeafQueue sched : queueMgr.getLeafQueues()) { - Resources.addTo(resToPreempt, resToPreempt(sched, curTime)); + Resources.addTo(resToPreempt, resToPreempt(sched, curTime, + prevLastPreemptCheckTime)); } if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt, Resources.none())) { @@ -450,6 +452,9 @@ protected void warnOrKillContainer(RMContainer container) { SchedulerUtils.createPreemptedContainerStatus( container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); + queue.getMetrics().incrContainersKilledDueToPreemption(container + .getContainer().getResource()); + recoverResourceRequestForContainer(container); // TODO: Not sure if this ever actually adds this to the list of cleanup // containers on the RMNode (see SchedulerNode.releaseContainer()). @@ -474,23 +479,41 @@ protected void warnOrKillContainer(RMContainer container) { * max of the two amounts (this shouldn't happen unless someone sets the * timeouts to be identical for some reason). */ - protected Resource resToPreempt(FSLeafQueue sched, long curTime) { + protected Resource resToPreempt(FSLeafQueue sched, long curTime, + long lastPreemptCheckTime) { long minShareTimeout = sched.getMinSharePreemptionTimeout(); long fairShareTimeout = sched.getFairSharePreemptionTimeout(); Resource resDueToMinShare = Resources.none(); Resource resDueToFairShare = Resources.none(); - if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { + final long timeStarvedOfMinShare = + curTime - sched.getLastTimeAtMinShare() - minShareTimeout; + if (timeStarvedOfMinShare > 0) { Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, sched.getMinShare(), sched.getDemand()); resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); + + long timeStarvedSinceLastCheck = Math.min(curTime - lastPreemptCheckTime, + timeStarvedOfMinShare); + sched.getMetrics().setMinSharePreemptionCurrentWait(timeStarvedOfMinShare); + sched.getMetrics().incrMinShareStarvedTime(timeStarvedSinceLastCheck); } - if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) { + final long timeStarvedOfFairShare = + curTime - sched.getLastTimeAtFairShareThreshold() - fairShareTimeout; + if (timeStarvedOfFairShare > 0) { Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, sched.getFairShare(), sched.getDemand()); resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); + + long timeStarvedSinceLastCheck = Math.min(curTime - lastPreemptCheckTime, + timeStarvedOfFairShare); + sched.getMetrics().setFairSharePreemptionCurrentWait + (timeStarvedOfFairShare); + sched.getMetrics().incrFairShareStarvedTime(timeStarvedSinceLastCheck); } + sched.getMetrics().setMinSharePreemptionAsk(resDueToMinShare); + sched.getMetrics().setFairSharePreemptionAsk(resDueToFairShare); Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterResource, resDueToMinShare, resDueToFairShare); if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 5d079a3..cd64d8f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -1689,8 +1689,9 @@ public void testPreemptionIsNotDelayedToNextRound() throws Exception { clock.tick(11); scheduler.update(); + long anyTime = 0; Resource toPreempt = scheduler.resToPreempt(scheduler.getQueueManager() - .getLeafQueue("queueA.queueA2", false), clock.getTime()); + .getLeafQueue("queueA.queueA2", false), clock.getTime(), anyTime); assertEquals(3277, toPreempt.getMemory()); // verify if the 3 containers required by queueA2 are preempted in the same @@ -1811,26 +1812,33 @@ public void testPreemptionDecision() throws Exception { FSLeafQueue schedD = scheduler.getQueueManager().getLeafQueue("queueD", true); + long anyTime = 0; assertTrue(Resources.equals( - Resources.none(), scheduler.resToPreempt(schedC, clock.getTime()))); + Resources.none(), scheduler.resToPreempt(schedC, clock.getTime(), + anyTime))); assertTrue(Resources.equals( - Resources.none(), scheduler.resToPreempt(schedD, clock.getTime()))); + Resources.none(), scheduler.resToPreempt(schedD, clock.getTime(), + anyTime))); // After minSharePreemptionTime has passed, they should want to preempt min // share. clock.tick(6); assertEquals( - 1024, scheduler.resToPreempt(schedC, clock.getTime()).getMemory()); + 1024, scheduler.resToPreempt(schedC, clock.getTime(), anyTime) + .getMemory()); assertEquals( - 1024, scheduler.resToPreempt(schedD, clock.getTime()).getMemory()); + 1024, scheduler.resToPreempt(schedD, clock.getTime(), anyTime) + .getMemory()); // After fairSharePreemptionTime has passed, they should want to preempt // fair share. scheduler.update(); clock.tick(6); assertEquals( - 1536 , scheduler.resToPreempt(schedC, clock.getTime()).getMemory()); + 1536 , scheduler.resToPreempt(schedC, clock.getTime(), anyTime) + .getMemory()); assertEquals( - 1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory()); + 1536, scheduler.resToPreempt(schedD, clock.getTime(), anyTime) + .getMemory()); } @Test @@ -1946,72 +1954,94 @@ public void testPreemptionDecisionWithVariousTimeout() throws Exception { FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", true); FSLeafQueue queueC = queueMgr.getLeafQueue("queueC", true); + long anyTime = 0; assertTrue(Resources.equals( - Resources.none(), scheduler.resToPreempt(queueB1, clock.getTime()))); + Resources.none(), scheduler.resToPreempt(queueB1, clock.getTime(), + anyTime))); assertTrue(Resources.equals( - Resources.none(), scheduler.resToPreempt(queueB2, clock.getTime()))); + Resources.none(), scheduler.resToPreempt(queueB2, clock.getTime(), + anyTime))); assertTrue(Resources.equals( - Resources.none(), scheduler.resToPreempt(queueC, clock.getTime()))); + Resources.none(), scheduler.resToPreempt(queueC, clock.getTime(), + anyTime))); // After 5 seconds, queueB1 wants to preempt min share scheduler.update(); clock.tick(6); assertEquals( - 1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); + 1024, scheduler.resToPreempt(queueB1, clock.getTime(), anyTime) + .getMemory()); assertEquals( - 0, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory()); + 0, scheduler.resToPreempt(queueB2, clock.getTime(), anyTime) + .getMemory()); assertEquals( - 0, scheduler.resToPreempt(queueC, clock.getTime()).getMemory()); + 0, scheduler.resToPreempt(queueC, clock.getTime(), anyTime) + .getMemory()); // After 10 seconds, queueB2 wants to preempt min share scheduler.update(); clock.tick(5); assertEquals( - 1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); + 1024, scheduler.resToPreempt(queueB1, clock.getTime(), anyTime) + .getMemory()); assertEquals( - 1024, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory()); + 1024, scheduler.resToPreempt(queueB2, clock.getTime(), anyTime) + .getMemory()); assertEquals( - 0, scheduler.resToPreempt(queueC, clock.getTime()).getMemory()); + 0, scheduler.resToPreempt(queueC, clock.getTime(), anyTime) + .getMemory()); // After 15 seconds, queueC wants to preempt min share scheduler.update(); clock.tick(5); assertEquals( - 1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); + 1024, scheduler.resToPreempt(queueB1, clock.getTime(), anyTime) + .getMemory()); assertEquals( - 1024, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory()); + 1024, scheduler.resToPreempt(queueB2, clock.getTime(), anyTime) + .getMemory()); assertEquals( - 1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory()); + 1024, scheduler.resToPreempt(queueC, clock.getTime(), anyTime) + .getMemory()); // After 20 seconds, queueB2 should want to preempt fair share scheduler.update(); clock.tick(5); assertEquals( - 1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); + 1024, scheduler.resToPreempt(queueB1, clock.getTime(), anyTime) + .getMemory()); assertEquals( - 1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory()); + 1536, scheduler.resToPreempt(queueB2, clock.getTime(), anyTime) + .getMemory()); assertEquals( - 1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory()); + 1024, scheduler.resToPreempt(queueC, clock.getTime(), anyTime) + .getMemory()); // After 25 seconds, queueB1 should want to preempt fair share scheduler.update(); clock.tick(5); assertEquals( - 1536, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); + 1536, scheduler.resToPreempt(queueB1, clock.getTime(), anyTime) + .getMemory()); assertEquals( - 1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory()); + 1536, scheduler.resToPreempt(queueB2, clock.getTime(), anyTime) + .getMemory()); assertEquals( - 1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory()); + 1024, scheduler.resToPreempt(queueC, clock.getTime(), anyTime) + .getMemory()); // After 30 seconds, queueC should want to preempt fair share scheduler.update(); clock.tick(5); assertEquals( - 1536, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); + 1536, scheduler.resToPreempt(queueB1, clock.getTime(), anyTime) + .getMemory()); assertEquals( - 1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory()); + 1536, scheduler.resToPreempt(queueB2, clock.getTime(), anyTime) + .getMemory()); assertEquals( - 1536, scheduler.resToPreempt(queueC, clock.getTime()).getMemory()); + 1536, scheduler.resToPreempt(queueC, clock.getTime(), anyTime) + .getMemory()); } @Test