diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsCollectorImpl.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsCollectorImpl.java index 5f53629..be442ed 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsCollectorImpl.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsCollectorImpl.java @@ -21,14 +21,18 @@ import java.util.Iterator; import java.util.List; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.metrics2.MetricsInfo; import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsFilter; import static org.apache.hadoop.metrics2.lib.Interns.*; -class MetricsCollectorImpl implements MetricsCollector, +@InterfaceAudience.Private +@VisibleForTesting +public class MetricsCollectorImpl implements MetricsCollector, Iterable { private final List rbs = Lists.newArrayList(); diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableStat.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableStat.java index b8ba435..ba37757 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableStat.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableStat.java @@ -90,6 +90,14 @@ public MutableStat(String name, String description, } /** + * Set whether to display the extended stats (stdev, min/max etc.) or not + * @param extended enable/disable displaying extended stats + */ + public synchronized void setExtended(boolean extended) { + this.extended = extended; + } + + /** * Add a number of samples and their sum to the running stat * @param numSamples number of samples * @param sum of the samples diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSDurations.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSDurations.java new file mode 100644 index 0000000..0a90f78 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSDurations.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; + +import static org.apache.hadoop.metrics2.lib.Interns.info; +import org.apache.hadoop.metrics2.lib.MutableRate; + +/** + * Class to capture the performance metrics of FairScheduler. + * This should be a singleton. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +@Metrics(context="fairscheduler-durations") +public class FSDurations implements MetricsSource { + + @Metric("Duration for a continuous scheduling run") + MutableRate continuousSchedulingRun; + + @Metric("Duration to handle a node update") + MutableRate nodeUpdateCall; + + @Metric("Duration for a update thread run") + MutableRate updateThreadRun; + + @Metric("Duration for an update call") + MutableRate updateCall; + + @Metric("Duration for a preempt call") + MutableRate preemptCall; + + private static final MetricsInfo RECORD_INFO = + info("FSDurations", "Durations of FairScheduler calls or thread-runs"); + + private final MetricsRegistry registry; + + private boolean isExtended = false; + + private static final FSDurations INSTANCE = new FSDurations(); + + public static FSDurations getInstance(boolean isExtended) { + INSTANCE.setExtended(isExtended); + return INSTANCE; + } + + private FSDurations() { + registry = new MetricsRegistry(RECORD_INFO); + registry.tag(RECORD_INFO, "FSDurations"); + + MetricsSystem ms = DefaultMetricsSystem.instance(); + if (ms != null) { + ms.register(RECORD_INFO.name(), RECORD_INFO.description(), this); + } + } + + private synchronized void setExtended(boolean isExtended) { + if (isExtended == INSTANCE.isExtended) + return; + + continuousSchedulingRun.setExtended(isExtended); + nodeUpdateCall.setExtended(isExtended); + updateThreadRun.setExtended(isExtended); + updateCall.setExtended(isExtended); + preemptCall.setExtended(isExtended); + + INSTANCE.isExtended = isExtended; + } + + @Override + public synchronized void getMetrics(MetricsCollector collector, boolean all) { + registry.snapshot(collector.addRecord(registry.info()), all); + } + + public void addContinuousSchedulingRunDuration(long value) { + continuousSchedulingRun.add(value); + } + + public void addNodeUpdateDuration(long value) { + nodeUpdateCall.add(value); + } + + public void addUpdateThreadRunDuration(long value) { + updateThreadRun.add(value); + } + + public void addUpdateCallDuration(long value) { + updateCall.add(value); + } + + public void addPreemptCallDuration(long value) { + preemptCall.add(value); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 4e1c244..0083681 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -149,6 +149,7 @@ // Aggregate metrics FSQueueMetrics rootMetrics; + FSDurations perfMetrics; // Time when we last updated preemption vars protected long lastPreemptionUpdateTime; @@ -256,8 +257,11 @@ public void run() { while (!Thread.currentThread().isInterrupted()) { try { Thread.sleep(updateInterval); + long start = clock.getTime(); update(); preemptTasksIfNecessary(); + long duration = clock.getTime() - start; + perfMetrics.addUpdateThreadRunDuration(duration); } catch (InterruptedException ie) { LOG.warn("Update thread interrupted. Exiting."); return; @@ -294,6 +298,7 @@ public void run() { * required resources per job. */ protected synchronized void update() { + long start = clock.getTime(); updatePreemptionVariables(); // Determine if any queues merit preemption FSQueue rootQueue = queueMgr.getRootQueue(); @@ -317,6 +322,9 @@ protected synchronized void update() { " Demand: " + rootQueue.getDemand()); } } + + long duration = clock.getTime() - start; + perfMetrics.addUpdateCallDuration(duration); } /** @@ -352,7 +360,8 @@ boolean isStarvedForMinShare(FSLeafQueue sched) { * defined as being below half its fair share. */ boolean isStarvedForFairShare(FSLeafQueue sched) { - Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterResource, + Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, + clusterResource, Resources.multiply(sched.getFairShare(), .5), sched.getDemand()); return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource, sched.getResourceUsage(), desiredFairShare); @@ -398,6 +407,7 @@ protected synchronized void preemptTasksIfNecessary() { * We make sure that no queue is placed below its fair share in the process. */ protected void preemptResources(Resource toPreempt) { + long start = clock.getTime(); if (Resources.equals(toPreempt, Resources.none())) { return; } @@ -448,6 +458,9 @@ protected void preemptResources(Resource toPreempt) { } } } + + long duration = clock.getTime() - start; + perfMetrics.addPreemptCallDuration(duration); } protected void warnOrKillContainer(RMContainer container) { @@ -659,7 +672,7 @@ protected synchronized void addApplicationAttempt( rmContext); if (transferStateFromPreviousAttempt) { attempt.transferStateFromPreviousAttempt(application - .getCurrentAppAttempt()); + .getCurrentAppAttempt()); } application.setCurrentAppAttempt(attempt); @@ -960,6 +973,7 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, * Process a heartbeat update from a node. */ private synchronized void nodeUpdate(RMNode nm) { + long start = clock.getTime(); if (LOG.isDebugEnabled()) { LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource); } @@ -996,9 +1010,13 @@ private synchronized void nodeUpdate(RMNode nm) { } else { attemptScheduling(node); } + + long duration = clock.getTime() - start; + perfMetrics.addNodeUpdateDuration(duration); } void continuousSchedulingAttempt() throws InterruptedException { + long start = clock.getTime(); List nodeIdList = new ArrayList(nodes.keySet()); // Sort the nodes by space available on them, so that we offer // containers on emptier nodes first, facilitating an even spread. This @@ -1021,6 +1039,9 @@ void continuousSchedulingAttempt() throws InterruptedException { ": " + ex.toString(), ex); } } + + long duration = clock.getTime() - start; + perfMetrics.addContinuousSchedulingRunDuration(duration); } /** Sort nodes by available resource */ @@ -1244,6 +1265,8 @@ private synchronized void initScheduler(Configuration conf) } rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf); + perfMetrics = FSDurations.getInstance(true); + // This stores per-application scheduling information this.applications = new ConcurrentHashMap>(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 23c928c..1769f06 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -3366,4 +3367,14 @@ public void testThreadLifeCycle() throws InterruptedException { assertNotEquals("One of the threads is still alive", 0, numRetries); } + + @Test + public void testPerfMetricsInited() { + scheduler.init(conf); + scheduler.start(); + MetricsCollectorImpl collector = new MetricsCollectorImpl(); + scheduler.perfMetrics.getMetrics(collector, true); + assertEquals("Incorrect number of perf metrics", 1, + collector.getRecords().size()); + } }