diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 3c97535..d22b069 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -371,6 +371,10 @@ public RMContainer preemptContainer() { // Preempt from the selected app if (candidateSched != null) { toBePreempted = candidateSched.preemptContainer(); + if (toBePreempted != null) { + getMetrics().incrContainerUnderPreemption(toBePreempted + .getAllocatedResource()); + } } return toBePreempted; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java index 82c422b..9e30c32 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java @@ -23,7 +23,12 @@ import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterInt; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGauge; import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.metrics2.lib.MutableRate; +import org.apache.hadoop.metrics2.lib.MutableStat; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -39,7 +44,27 @@ @Metric("Minimum share of CPU in vcores") MutableGaugeInt minShareVCores; @Metric("Maximum share of memory in MB") MutableGaugeInt maxShareMB; @Metric("Maximum share of CPU in vcores") MutableGaugeInt maxShareVCores; - + + // Preemption related metrics + // Preemptee side metrics + @Metric("Containers killed due to preemption") MutableCounterInt preemptedContainersCummulative; + @Metric("CPU vcores lost due to preemption") MutableCounterLong preemptedCpuVcoresCummulative; + @Metric("Memory MB lost due to preemption") MutableCounterLong preemptedMemoryMBCummulative; + + @Metric("Containers under preemption") MutableGaugeInt underPreemptionContainers; + @Metric("CPU vcores currently under preemption") MutableGaugeInt underPreemptionCpuVcores; + @Metric("Memory MB currently under preemption") MutableGaugeInt underPreemptionMemoryMB; + + // Preemptor side metrics + @Metric("Memory MB waiting for fair share preemption") MutableGaugeInt fairPreemptionAskMemoryMB; + @Metric("CPU vcores waiting for fair share preemption") MutableGaugeInt fairPreemptionAskCpuVcores; + + @Metric("Memory MB waiting for min share preemption") MutableGaugeInt minPreemptionAskMemoryMB; + @Metric("CPU vcores waiting for min share preemption") MutableGaugeInt minPreemptionAskCpuVcores; + + @Metric("Number of times queue was starved of min share ") MutableStat minSharedStarvedTime; + @Metric("Number of times queue was starved of min share ") MutableStat fairSharedStarvedTime; + FSQueueMetrics(MetricsSystem ms, String queueName, Queue parent, boolean enableUserMetrics, Configuration conf) { super(ms, queueName, parent, enableUserMetrics, conf); @@ -118,4 +143,42 @@ static FSQueueMetrics forQueue(String queueName, Queue parent, return (FSQueueMetrics)metrics; } + public void incrContainersKilledDueToPreemption(Resource resource) { + preemptedContainersCummulative.incr(); + preemptedCpuVcoresCummulative.incr(resource.getVirtualCores()); + preemptedMemoryMBCummulative.incr(resource.getMemory()); + underPreemptionContainers.decr(); + underPreemptionCpuVcores.decr(resource.getVirtualCores()); + underPreemptionMemoryMB.decr(resource.getMemory()); + } + + public void setMinSharePreemptionAsk(Resource resDueToMinShare) { + minPreemptionAskCpuVcores.set(resDueToMinShare.getVirtualCores()); + minPreemptionAskMemoryMB.set(resDueToMinShare.getMemory()); + } + + public void setFairSharePreemptionAsk(Resource fairSharePreemptionAsk) { + fairPreemptionAskCpuVcores.set(fairSharePreemptionAsk.getVirtualCores()); + fairPreemptionAskMemoryMB.set(fairSharePreemptionAsk.getMemory()); + } + + public void incrContainerUnderPreemption(Resource resourceUnderPreemption) { + underPreemptionContainers.incr(); + underPreemptionCpuVcores.incr(resourceUnderPreemption.getVirtualCores()); + underPreemptionMemoryMB.incr(resourceUnderPreemption.getMemory()); + } + + public void incrMinShareStarvedTime(long additionalTimeStarved) { + minSharedStarvedTime.add(additionalTimeStarved); + } + + public void incrFairShareStarvedTime(long additionalTimeStarved) { + fairSharedStarvedTime.add(additionalTimeStarved); + } + + public void setMinSharePreemptionCurrentWait(long timeStarvedOfMinShare) { + } + + public void setFairSharePreemptionCurrentWait(long timeStarvedOfFairShare) { + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 1ace604..3e0bd75 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -356,11 +356,13 @@ protected synchronized void preemptTasksIfNecessary() { if (curTime - lastPreemptCheckTime < preemptionInterval) { return; } + long prevLastPreemptCheckTime = lastPreemptCheckTime; lastPreemptCheckTime = curTime; Resource resToPreempt = Resources.clone(Resources.none()); for (FSLeafQueue sched : queueMgr.getLeafQueues()) { - Resources.addTo(resToPreempt, resToPreempt(sched, curTime)); + Resources.addTo(resToPreempt, resToPreempt(sched, curTime, + prevLastPreemptCheckTime)); } if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt, Resources.none())) { @@ -450,6 +452,9 @@ protected void warnOrKillContainer(RMContainer container) { SchedulerUtils.createPreemptedContainerStatus( container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); + queue.getMetrics().incrContainersKilledDueToPreemption(container + .getContainer().getResource()); + recoverResourceRequestForContainer(container); // TODO: Not sure if this ever actually adds this to the list of cleanup // containers on the RMNode (see SchedulerNode.releaseContainer()). @@ -474,23 +479,41 @@ protected void warnOrKillContainer(RMContainer container) { * max of the two amounts (this shouldn't happen unless someone sets the * timeouts to be identical for some reason). */ - protected Resource resToPreempt(FSLeafQueue sched, long curTime) { + protected Resource resToPreempt(FSLeafQueue sched, long curTime, + long lastPreemptCheckTime) { long minShareTimeout = sched.getMinSharePreemptionTimeout(); long fairShareTimeout = sched.getFairSharePreemptionTimeout(); Resource resDueToMinShare = Resources.none(); Resource resDueToFairShare = Resources.none(); - if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { + final long timeStarvedOfMinShare = + curTime - sched.getLastTimeAtMinShare() - minShareTimeout; + if (timeStarvedOfMinShare > 0) { Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, sched.getMinShare(), sched.getDemand()); resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); + + long timeStarvedSinceLastCheck = Math.min(curTime - lastPreemptCheckTime, + timeStarvedOfMinShare); + sched.getMetrics().setMinSharePreemptionCurrentWait(timeStarvedOfMinShare); + sched.getMetrics().incrMinShareStarvedTime(timeStarvedSinceLastCheck); } - if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) { + final long timeStarvedOfFairShare = + curTime - sched.getLastTimeAtFairShareThreshold() - fairShareTimeout; + if (timeStarvedOfFairShare > 0) { Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, sched.getFairShare(), sched.getDemand()); resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); + + long timeStarvedSinceLastCheck = Math.min(curTime - lastPreemptCheckTime, + timeStarvedOfFairShare); + sched.getMetrics().setFairSharePreemptionCurrentWait + (timeStarvedOfFairShare); + sched.getMetrics().incrFairShareStarvedTime(timeStarvedSinceLastCheck); } + sched.getMetrics().setMinSharePreemptionAsk(resDueToMinShare); + sched.getMetrics().setFairSharePreemptionAsk(resDueToFairShare); Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterResource, resDueToMinShare, resDueToFairShare); if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,