diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 05745ec272e..d8ba70c2cf3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -95,6 +95,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.MemoryPlacementConstraintManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManagerService; @@ -1216,6 +1217,7 @@ void stopActiveServices() { void reinitialize(boolean initialize) { ClusterMetrics.destroy(); QueueMetrics.clearQueueMetrics(); + CapacitySchedulerMetrics.destroy(); if (initialize) { resetRMContext(); createAndInitActiveServices(true); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 776e512d433..01f6baa2628 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -191,6 +191,8 @@ private CSConfigurationProvider csConfProvider; + private static CapacitySchedulerMetrics csMetrics; + @Override public void setConf(Configuration conf) { yarnConf = conf; @@ -384,6 +386,9 @@ void initScheduler(Configuration configuration) throws // Setup how many containers we can allocate for each round offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit(); + // Init scheduler metrics + csMetrics = CapacitySchedulerMetrics.getMetrics(); + LOG.info("Initialized CapacityScheduler with " + "calculator=" + getResourceCalculator().getClass() + ", " + "minimumAllocation=<" + getMinimumResourceCapability() + ">, " + "maximumAllocation=<" @@ -1238,6 +1243,7 @@ public QueueInfo getQueueInfo(String queueName, @Override protected void nodeUpdate(RMNode rmNode) { + long begin = System.nanoTime(); try { readLock.lock(); setLastNodeUpdateTime(Time.now()); @@ -1265,6 +1271,9 @@ protected void nodeUpdate(RMNode rmNode) { writeLock.unlock(); } } + + long latency = System.nanoTime() - begin; + csMetrics.addNodeUpdate(latency); } /** @@ -1629,17 +1638,28 @@ CSAssignment allocateContainersToNode( return null; } + long startTime = System.nanoTime(); + // Backward compatible way to make sure previous behavior which allocation // driven by node heartbeat works. FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); // We have two different logics to handle allocation on single node / multi // nodes. + CSAssignment assignment; if (null != node) { - return allocateContainerOnSingleNode(candidates, node, withNodeHeartbeat); + assignment = allocateContainerOnSingleNode(candidates, + node, withNodeHeartbeat); } else{ - return allocateContainersOnMultiNodes(candidates); + assignment = allocateContainersOnMultiNodes(candidates); + } + + if (assignment != null && assignment.getAssignmentInformation() != null + && assignment.getAssignmentInformation().getNumAllocations() > 0) { + long allocateTime = System.nanoTime() - startTime; + csMetrics.addAllocate(allocateTime); } + return assignment; } @Override @@ -2792,6 +2812,7 @@ public boolean attemptAllocationOnNode(SchedulerApplicationAttempt appAttempt, @Override public boolean tryCommit(Resource cluster, ResourceCommitRequest r, boolean updatePending) { + long commitStart = System.nanoTime(); ResourceCommitRequest request = (ResourceCommitRequest) r; @@ -2830,9 +2851,13 @@ public boolean tryCommit(Resource cluster, ResourceCommitRequest r, if (app != null && attemptId.equals(app.getApplicationAttemptId())) { if (app.accept(cluster, request, updatePending) && app.apply(cluster, request, updatePending)) { + long commitSuccess = System.nanoTime() - commitStart; + csMetrics.addCommitSuccess(commitSuccess); LOG.info("Allocation proposal accepted"); isSuccess = true; } else{ + long commitFailed = System.nanoTime() - commitStart; + csMetrics.addCommitFailure(commitFailed); LOG.info("Failed to accept allocation proposal"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java new file mode 100644 index 00000000000..c2b0cddbf7f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableRate; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.hadoop.metrics2.lib.Interns.info; + +/** + * Metrics for capacity scheduler. + */ +@InterfaceAudience.Private +@Metrics(context="yarn") +public class CapacitySchedulerMetrics { + + private static AtomicBoolean isInitialized = new AtomicBoolean(false); + + private static final MetricsInfo RECORD_INFO = + info("CapacitySchedulerMetrics", + "Metrics for the Yarn Capacity Scheduler"); + + @Metric("Scheduler allocate containers") MutableRate allocate; + @Metric("Scheduler commit success") MutableRate commitSuccess; + @Metric("Scheduler commit failure") MutableRate commitFailure; + @Metric("Scheduler node update") MutableRate nodeUpdate; + + private static volatile CapacitySchedulerMetrics INSTANCE = null; + private static MetricsRegistry registry; + + public static CapacitySchedulerMetrics getMetrics() { + if(!isInitialized.get()){ + synchronized (CapacitySchedulerMetrics.class) { + if(INSTANCE == null){ + INSTANCE = new CapacitySchedulerMetrics(); + registerMetrics(); + isInitialized.set(true); + } + } + } + return INSTANCE; + } + + private static void registerMetrics() { + registry = new MetricsRegistry(RECORD_INFO); + registry.tag(RECORD_INFO, "ResourceManager"); + MetricsSystem ms = DefaultMetricsSystem.instance(); + if (ms != null) { + ms.register("CapacitySchedulerMetrics", + "Metrics for the Yarn Capacity Scheduler", INSTANCE); + } + } + + @VisibleForTesting + public synchronized static void destroy() { + isInitialized.set(false); + INSTANCE = null; + } + + public void addAllocate(long latency) { + this.allocate.add(latency); + } + + public void addCommitSuccess(long latency) { + this.commitSuccess.add(latency); + } + + public void addCommitFailure(long latency) { + this.commitFailure.add(latency); + } + + public void addNodeUpdate(long latency) { + this.nodeUpdate.add(latency); + } + + @VisibleForTesting + public long getNumOfNodeUpdate() { + return this.nodeUpdate.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumOfAllocates() { + return this.allocate.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumOfCommitSuccess() { + return this.commitSuccess.lastStat().numSamples(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java new file mode 100644 index 00000000000..eaa966afac3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerMetrics; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.concurrent.TimeoutException; + +/** + * Test class for CS metrics. + */ +public class TestCapacitySchedulerMetrics { + + private MockRM rm; + + @Test + public void testCSMetrics() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true); + + RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + rm = new MockRM(conf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 2048); + MockNM nm2 = rm.registerNode("host2:1234", 2048); + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + CapacitySchedulerMetrics csMetrics = CapacitySchedulerMetrics.getMetrics(); + Assert.assertNotNull(csMetrics); + try { + GenericTestUtils.waitFor(() + -> csMetrics.getNumOfNodeUpdate() == 2, 100, 3000); + } catch(TimeoutException e) { + Assert.fail("CS metrics not updated on node-update events."); + } + + Assert.assertEquals(0, csMetrics.getNumOfAllocates()); + Assert.assertEquals(0, csMetrics.getNumOfCommitSuccess()); + + RMApp rmApp = rm.submitApp(1024, "app", "user", null, false, + "default", 1, null, null, false); + MockAM am = MockRM.launchAMWhenAsyncSchedulingEnabled(rmApp, rm); + am.registerAppAttempt(); + am.allocate("*", 1024, 1, new ArrayList<>()); + + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + // Verify HB metrics updated + try { + GenericTestUtils.waitFor(() + -> csMetrics.getNumOfNodeUpdate() == 4, 100, 3000); + } catch(TimeoutException e) { + Assert.fail("CS metrics not updated on node-update events."); + } + + // For async mode, the number of alloc might be bigger than 1 + Assert.assertTrue(csMetrics.getNumOfAllocates() > 0); + // But there will be only 2 successful commit (1 AM + 1 task) + Assert.assertEquals(2, csMetrics.getNumOfCommitSuccess()); + } + + @After + public void tearDown() { + if (rm != null) { + rm.stop(); + } + } +}