diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsCollectorImpl.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsCollectorImpl.java index 5f53629..be442ed 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsCollectorImpl.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsCollectorImpl.java @@ -21,14 +21,18 @@ import java.util.Iterator; import java.util.List; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.metrics2.MetricsInfo; import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsFilter; import static org.apache.hadoop.metrics2.lib.Interns.*; -class MetricsCollectorImpl implements MetricsCollector, +@InterfaceAudience.Private +@VisibleForTesting +public class MetricsCollectorImpl implements MetricsCollector, Iterable { private final List rbs = Lists.newArrayList(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPerfMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPerfMetrics.java new file mode 100644 index 0000000..dcad104 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPerfMetrics.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; + +import static org.apache.hadoop.metrics2.lib.Interns.info; + +/** + * Class to capture the performance metrics of FairScheduler. + * This should be a singleton. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +@Metrics(context="yarn") +public class FSPerfMetrics implements MetricsSource { + + @Metric("Duration for a continuous scheduling run") + MutableGaugeLong continuousSchedulingRunDuration; + + @Metric("Duration to handle a node update") + MutableGaugeLong nodeUpdateDuration; + + @Metric("Duration for a update thread run") + MutableGaugeLong updateThreadRunDuration; + + @Metric("Duration for an update call") + MutableGaugeLong updateCallDuration; + + @Metric("Duration for a preempt call") + MutableGaugeLong preemptCallDuration; + + private static final MetricsInfo RECORD_INFO = + info("FSPerfMetrics", "Perf metrics for the FairScheduler"); + + private final MetricsRegistry registry; + private static final FSPerfMetrics INSTANCE = new FSPerfMetrics(); + + private FSPerfMetrics() { + registry = new MetricsRegistry(RECORD_INFO); + registry.tag(RECORD_INFO, "FSPerfMetrics"); + MetricsSystem ms = DefaultMetricsSystem.instance(); + if (ms != null) { + ms.register("FSPerfMetrics", "Perf metrics for the FairScheduler", this); + } + } + + public static FSPerfMetrics getInstance() { + return INSTANCE; + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + registry.snapshot(collector.addRecord(registry.info()), all); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 4e1c244..885a277 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -149,6 +149,7 @@ // Aggregate metrics FSQueueMetrics rootMetrics; + FSPerfMetrics perfMetrics; // Time when we last updated preemption vars protected long lastPreemptionUpdateTime; @@ -256,8 +257,11 @@ public void run() { while (!Thread.currentThread().isInterrupted()) { try { Thread.sleep(updateInterval); + long start = clock.getTime(); update(); preemptTasksIfNecessary(); + long duration = clock.getTime() - start; + perfMetrics.updateThreadRunDuration.set(duration); } catch (InterruptedException ie) { LOG.warn("Update thread interrupted. Exiting."); return; @@ -294,6 +298,7 @@ public void run() { * required resources per job. */ protected synchronized void update() { + long start = clock.getTime(); updatePreemptionVariables(); // Determine if any queues merit preemption FSQueue rootQueue = queueMgr.getRootQueue(); @@ -317,6 +322,9 @@ protected synchronized void update() { " Demand: " + rootQueue.getDemand()); } } + + long duration = clock.getTime() - start; + perfMetrics.updateCallDuration.set(duration); } /** @@ -398,6 +406,7 @@ protected synchronized void preemptTasksIfNecessary() { * We make sure that no queue is placed below its fair share in the process. */ protected void preemptResources(Resource toPreempt) { + long start = clock.getTime(); if (Resources.equals(toPreempt, Resources.none())) { return; } @@ -448,6 +457,9 @@ protected void preemptResources(Resource toPreempt) { } } } + + long duration = clock.getTime() - start; + perfMetrics.preemptCallDuration.set(duration); } protected void warnOrKillContainer(RMContainer container) { @@ -960,6 +972,7 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, * Process a heartbeat update from a node. */ private synchronized void nodeUpdate(RMNode nm) { + long start = clock.getTime(); if (LOG.isDebugEnabled()) { LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource); } @@ -996,9 +1009,13 @@ private synchronized void nodeUpdate(RMNode nm) { } else { attemptScheduling(node); } + + long duration = clock.getTime() - start; + perfMetrics.nodeUpdateDuration.set(duration); } void continuousSchedulingAttempt() throws InterruptedException { + long start = clock.getTime(); List nodeIdList = new ArrayList(nodes.keySet()); // Sort the nodes by space available on them, so that we offer // containers on emptier nodes first, facilitating an even spread. This @@ -1021,6 +1038,9 @@ void continuousSchedulingAttempt() throws InterruptedException { ": " + ex.toString(), ex); } } + + long duration = clock.getTime() - start; + perfMetrics.continuousSchedulingRunDuration.set(duration); } /** Sort nodes by available resource */ @@ -1244,6 +1264,8 @@ private synchronized void initScheduler(Configuration conf) } rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf); + perfMetrics = FSPerfMetrics.getInstance(); + // This stores per-application scheduling information this.applications = new ConcurrentHashMap>(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 23c928c..1769f06 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -3366,4 +3367,14 @@ public void testThreadLifeCycle() throws InterruptedException { assertNotEquals("One of the threads is still alive", 0, numRetries); } + + @Test + public void testPerfMetricsInited() { + scheduler.init(conf); + scheduler.start(); + MetricsCollectorImpl collector = new MetricsCollectorImpl(); + scheduler.perfMetrics.getMetrics(collector, true); + assertEquals("Incorrect number of perf metrics", 1, + collector.getRecords().size()); + } }