diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 97f55c9f4b3..3e258f2c562 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -1171,6 +1171,7 @@ void stopActiveServices() { void reinitialize(boolean initialize) { ClusterMetrics.destroy(); QueueMetrics.clearQueueMetrics(); + getResourceScheduler().resetSchedulerMetrics(); if (initialize) { resetRMContext(); createAndInitActiveServices(true); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index fd50f203d9b..30273d07438 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -1420,4 +1420,9 @@ protected void triggerUpdate() { updateThreadMonitor.notify(); } } + + @Override + public void resetSchedulerMetrics() { + // reset scheduler metrics + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java index d96d62545c8..5620a9bbdfd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java @@ -58,4 +58,9 @@ * @return the number of available {@link NodeId} by resource name. */ List getNodeIds(String resourceName); + + /** + * Reset scheduler metrics. + */ + void resetSchedulerMetrics(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 0ecc785ef7b..426aa9cdf38 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1046,6 +1046,7 @@ public QueueInfo getQueueInfo(String queueName, @Override protected void nodeUpdate(RMNode rmNode) { + long begin = System.nanoTime(); try { readLock.lock(); setLastNodeUpdateTime(Time.now()); @@ -1073,6 +1074,9 @@ protected void nodeUpdate(RMNode rmNode) { writeLock.unlock(); } } + + long latency = System.nanoTime() - begin; + CapacitySchedulerMetrics.getMetrics().addNodeUpdate(latency); } /** @@ -1426,17 +1430,28 @@ CSAssignment allocateContainersToNode(PlacementSet ps, return null; } + long startTime = System.nanoTime(); + // Backward compatible way to make sure previous behavior which allocation // driven by node heartbeat works. FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); // We have two different logics to handle allocation on single node / multi // nodes. + CSAssignment assignment; if (null != node) { - return allocateContainerOnSingleNode(ps, node, withNodeHeartbeat); + assignment = allocateContainerOnSingleNode(ps, + node, withNodeHeartbeat); } else { - return allocateContainersOnMultiNodes(ps); + assignment = allocateContainersOnMultiNodes(ps); + } + + if (assignment != null && assignment.getAssignmentInformation() != null + && assignment.getAssignmentInformation().getNumAllocations() > 0) { + long allocateTime = System.nanoTime() - startTime; + CapacitySchedulerMetrics.getMetrics().addAllocate(allocateTime); } + return assignment; } @Override @@ -2533,6 +2548,7 @@ public void submitResourceCommitRequest(Resource cluster, @Override public void tryCommit(Resource cluster, ResourceCommitRequest r) { + long commitStart = System.nanoTime(); ResourceCommitRequest request = (ResourceCommitRequest) r; @@ -2569,8 +2585,14 @@ public void tryCommit(Resource cluster, ResourceCommitRequest r) { // and proposal queue was not be consumed in time if (app != null && attemptId.equals(app.getApplicationAttemptId())) { if (app.accept(cluster, request) && app.apply(cluster, request)) { + long commitSuccess = System.nanoTime() - commitStart; + CapacitySchedulerMetrics.getMetrics() + .addCommitSuccess(commitSuccess); LOG.info("Allocation proposal accepted"); } else{ + long commitFailed = System.nanoTime() - commitStart; + CapacitySchedulerMetrics.getMetrics() + .addCommitFailure(commitFailed); LOG.info("Failed to accept allocation proposal"); } @@ -2719,4 +2741,9 @@ public MutableConfigurationProvider getMutableConfProvider() { } return null; } + + @Override + public void resetSchedulerMetrics() { + CapacitySchedulerMetrics.destroy(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java new file mode 100644 index 00000000000..5f8988b0778 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableRate; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.hadoop.metrics2.lib.Interns.info; + +/** + * Metrics for capacity scheduler. + */ +@InterfaceAudience.Private +@Metrics(context="yarn") +public class CapacitySchedulerMetrics { + + private static AtomicBoolean isInitialized = new AtomicBoolean(false); + + private static final MetricsInfo RECORD_INFO = + info("CapacitySchedulerMetrics", + "Metrics for the Yarn Capacity Scheduler"); + + @Metric("Scheduler allocate containers") MutableRate allocate; + @Metric("Scheduler commit success") MutableRate commitSuccess; + @Metric("Scheduler commit failure") MutableRate commitFailure; + @Metric("Scheduler node update") MutableRate nodeUpdate; + + private static volatile CapacitySchedulerMetrics INSTANCE = null; + private static MetricsRegistry registry; + + public static CapacitySchedulerMetrics getMetrics() { + if(!isInitialized.get()){ + synchronized (CapacitySchedulerMetrics.class) { + if(INSTANCE == null){ + INSTANCE = new CapacitySchedulerMetrics(); + registerMetrics(); + isInitialized.set(true); + } + } + } + return INSTANCE; + } + + private static void registerMetrics() { + registry = new MetricsRegistry(RECORD_INFO); + registry.tag(RECORD_INFO, "ResourceManager"); + MetricsSystem ms = DefaultMetricsSystem.instance(); + if (ms != null) { + ms.register("CapacitySchedulerMetrics", + "Metrics for the Yarn Capacity Scheduler", INSTANCE); + } + } + + @VisibleForTesting + public synchronized static void destroy() { + isInitialized.set(false); + INSTANCE = null; + MetricsSystem ms = DefaultMetricsSystem.instance(); + if (ms != null) { + ms.unregisterSource("CapacitySchedulerMetrics"); + } + } + + public void addAllocate(long latency) { + this.allocate.add(latency); + } + + public void addCommitSuccess(long latency) { + this.commitSuccess.add(latency); + } + + public void addCommitFailure(long latency) { + this.commitFailure.add(latency); + } + + public void addNodeUpdate(long latency) { + this.nodeUpdate.add(latency); + } + + @VisibleForTesting + public long getNumOfNodeUpdate() { + return this.nodeUpdate.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumOfAllocates() { + return this.allocate.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumOfCommitSuccess() { + return this.commitSuccess.lastStat().numSamples(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java new file mode 100644 index 00000000000..e0a3e17d23e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import com.google.common.base.Supplier; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerMetrics; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.concurrent.TimeoutException; + +/** + * Test class for CS metrics. + */ +public class TestCapacitySchedulerMetrics { + + private MockRM rm; + + @Test + public void testCSMetrics() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true); + + final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + rm = new MockRM(conf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 2048); + MockNM nm2 = rm.registerNode("host2:1234", 2048); + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + final CapacitySchedulerMetrics csMetrics = CapacitySchedulerMetrics.getMetrics(); + Assert.assertNotNull(csMetrics); + try { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return csMetrics.getNumOfNodeUpdate() == 2; + } + }, 100, 3000); + } catch(TimeoutException e) { + Assert.fail("CS metrics not updated on node-update events."); + } + + Assert.assertEquals(0, csMetrics.getNumOfAllocates()); + Assert.assertEquals(0, csMetrics.getNumOfCommitSuccess()); + + RMApp rmApp = rm.submitApp(1024, "app", "user", null, false, + "default", 1, null, null, false); + MockAM am = MockRM.launchAMWhenAsyncSchedulingEnabled(rmApp, rm); + am.registerAppAttempt(); + am.allocate("*", 1024, 1, new ArrayList()); + + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + // Verify HB metrics updated + try { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return csMetrics.getNumOfNodeUpdate() == 4; + } + }, 100, 3000); + } catch(TimeoutException e) { + Assert.fail("CS metrics not updated on node-update events."); + } + + // For async mode, the number of alloc might be bigger than 1 + Assert.assertTrue(csMetrics.getNumOfAllocates() > 0); + // But there will be only 2 successful commit (1 AM + 1 task) + Assert.assertEquals(2, csMetrics.getNumOfCommitSuccess()); + } + + @After + public void tearDown() { + if (rm != null) { + rm.stop(); + } + } +}