diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java new file mode 100644 index 00000000000..6169e51744b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.metrics; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.hadoop.metrics2.lib.Interns.info; + +/** + * Metrics for Opportunistic Scheduler. + */ +@InterfaceAudience.Private +@Metrics(context="yarn") +public class OpportunisticSchedulerMetrics { + // CHECKSTYLE:OFF:VisibilityModifier + private static AtomicBoolean isInitialized = new AtomicBoolean(false); + + private static final MetricsInfo RECORD_INFO = + info("OpportunisticSchedulerMetrics", + "Metrics for the Yarn Opportunistic Scheduler"); + + private static volatile OpportunisticSchedulerMetrics INSTANCE = null; + private static MetricsRegistry registry; + + public static OpportunisticSchedulerMetrics getMetrics() { + if(!isInitialized.get()){ + synchronized (OpportunisticSchedulerMetrics.class) { + if(INSTANCE == null){ + INSTANCE = new OpportunisticSchedulerMetrics(); + registerMetrics(); + isInitialized.set(true); + } + } + } + return INSTANCE; + } + + private static void registerMetrics() { + registry = new MetricsRegistry(RECORD_INFO); + registry.tag(RECORD_INFO, "ResourceManager"); + MetricsSystem ms = DefaultMetricsSystem.instance(); + if (ms != null) { + ms.register("OpportunisticSchedulerMetrics", + "Metrics for the Yarn Opportunistic Scheduler", INSTANCE); + } + } + + @Metric("# of allocated opportunistic containers") + MutableGaugeInt allocatedOContainers; + @Metric("Aggregate # of allocated opportunistic containers") + MutableCounterLong aggregateOContainersAllocated; + @Metric("Aggregate # of released opportunistic containers") + MutableCounterLong aggregateOContainersReleased; + + @Metric("Aggregate # of allocated node-local opportunistic containers") + MutableCounterLong aggregateNodeLocalOContainersAllocated; + @Metric("Aggregate # of allocated rack-local opportunistic containers") + MutableCounterLong aggregateRackLocalOContainersAllocated; + @Metric("Aggregate # of allocated off-switch opportunistic containers") + MutableCounterLong aggregateOffSwitchOContainersAllocated; + + @VisibleForTesting + public int getAllocatedContainers() { + return allocatedOContainers.value(); + } + + @VisibleForTesting + public long getAggregatedAllocatedContainers() { + return aggregateOContainersAllocated.value(); + } + + @VisibleForTesting + public long getAggregatedReleasedContainers() { + return aggregateOContainersReleased.value(); + } + + @VisibleForTesting + public long getAggregatedNodeLocalContainers() { + return aggregateNodeLocalOContainersAllocated.value(); + } + + @VisibleForTesting + public long getAggregatedRackLocalContainers() { + return aggregateRackLocalOContainersAllocated.value(); + } + + @VisibleForTesting + public long getAggregatedOffSwitchContainers() { + return aggregateOffSwitchOContainersAllocated.value(); + } + + // Opportunistic Containers + public void incrAllocatedOppContainers(int numContainers) { + allocatedOContainers.incr(numContainers); + aggregateOContainersAllocated.incr(numContainers); + } + + public void incrReleasedOppContainers(int numContainers) { + aggregateOContainersReleased.incr(numContainers); + allocatedOContainers.decr(numContainers); + } + + public void incrNodeLocalOppContainers() { + aggregateNodeLocalOContainersAllocated.incr(); + } + + public void incrRackLocalOppContainers() { + aggregateRackLocalOContainersAllocated.incr(); + } + + public void incrOffSwitchOppContainers() { + aggregateOffSwitchOContainersAllocated.incr(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 1cec3dac11b..5600aa80dbb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; +import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; @@ -433,6 +434,7 @@ private void allocateContainersInternal(long rmIdentifier, idCounter, id, userName, allocations, location, anyAsk, rNode); numAllocated++; + updateMetrics(loopIndex); // Try to spread the allocations across the nodes. // But don't add if it is a node local request. if (loopIndex != NODE_LOCAL_LOOP) { @@ -459,6 +461,18 @@ private void allocateContainersInternal(long rmIdentifier, } } + private void updateMetrics(int loopIndex) { + OpportunisticSchedulerMetrics metrics = + OpportunisticSchedulerMetrics.getMetrics(); + if (loopIndex == NODE_LOCAL_LOOP) { + metrics.incrNodeLocalOppContainers(); + } else if (loopIndex == RACK_LOCAL_LOOP) { + metrics.incrRackLocalOppContainers(); + } else { + metrics.incrOffSwitchOppContainers(); + } + } + private Collection findNodeCandidates(int loopIndex, Map allNodes, Set blackList, EnrichedResourceRequest enrichedRR) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index f6d23f71b7d..9e861bd3a39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -200,6 +201,9 @@ public void allocate(ApplicationAttemptId appAttemptId, // Create RMContainers and update the NMTokens. if (!oppContainers.isEmpty()) { + OpportunisticSchedulerMetrics schedulerMetrics = + OpportunisticSchedulerMetrics.getMetrics(); + schedulerMetrics.incrAllocatedOppContainers(oppContainers.size()); handleNewContainers(oppContainers, false); appAttempt.updateNMTokens(oppContainers); ApplicationMasterServiceUtils.addToAllocatedContainers( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 92dde949647..2c43b9594f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; @@ -690,6 +691,7 @@ public void completedContainer(RMContainer rmContainer, if (node != null) { node.releaseContainer(rmContainer.getContainerId(), false); } + OpportunisticSchedulerMetrics.getMetrics().incrReleasedOppContainers(1); } // If the container is getting killed in ACQUIRED state, the requester (AM diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index 221181d19c2..dbbc9b1e988 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -752,6 +753,108 @@ private void verifyMetrics(QueueMetrics metrics, long availableMB, Assert.assertEquals(allocatedContainers, metrics.getAllocatedContainers()); } + @Test(timeout = 60000) + public void testOpportunisticSchedulerMetrics() throws Exception { + HashMap nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + nm1.registerNode(); + nm2.registerNode(); + OpportunisticSchedulerMetrics metrics = + OpportunisticSchedulerMetrics.getMetrics(); + + int allocContainers = metrics.getAllocatedContainers(); + long aggrAllocatedContainers = metrics.getAggregatedAllocatedContainers(); + long aggrOffSwitchContainers = metrics.getAggregatedOffSwitchContainers(); + long aggrReleasedContainers = metrics.getAggregatedReleasedContainers(); + + OpportunisticContainerAllocatorAMService amservice = + (OpportunisticContainerAllocatorAMService) rm + .getApplicationMasterService(); + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + ApplicationAttemptId attemptId = + app1.getCurrentAppAttempt().getAppAttemptId(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + ResourceScheduler scheduler = rm.getResourceScheduler(); + RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); + + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + ((RMNodeImpl) rmNode1) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + ((RMNodeImpl) rmNode2) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + + OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler) + .getApplicationAttempt(attemptId).getOpportunisticContainerContext(); + // Send add and update node events to AM Service. + amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); + amservice.handle(new NodeAddedSchedulerEvent(rmNode2)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode1)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + // All nodes 1 to 2 will be applicable for scheduling. + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + Thread.sleep(1000); + + AllocateResponse allocateResponse = am1.allocate(Arrays.asList( + ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest + .newInstance(ExecutionType.OPPORTUNISTIC, true))), null); + + List allocatedContainers = allocateResponse + .getAllocatedContainers(); + Assert.assertEquals(2, allocatedContainers.size()); + + Assert.assertEquals(allocContainers + 2, metrics.getAllocatedContainers()); + Assert.assertEquals(aggrAllocatedContainers + 2, + metrics.getAggregatedAllocatedContainers()); + Assert.assertEquals(aggrOffSwitchContainers + 2, + metrics.getAggregatedOffSwitchContainers()); + + Container container = allocatedContainers.get(0); + MockNM allocNode = nodes.get(container.getNodeId()); + + // Start Container in NM + allocNode.nodeHeartbeat(Arrays.asList( + ContainerStatus.newInstance(container.getId(), + ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), + true); + rm.drainEvents(); + + // Verify that container is actually running wrt the RM.. + RMContainer rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt( + container.getId().getApplicationAttemptId()).getRMContainer( + container.getId()); + Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState()); + + // Container Completed in the NM + allocNode.nodeHeartbeat(Arrays.asList( + ContainerStatus.newInstance(container.getId(), + ExecutionType.OPPORTUNISTIC, ContainerState.COMPLETE, "", 0)), + true); + rm.drainEvents(); + + // Verify that container has been removed.. + rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt( + container.getId().getApplicationAttemptId()).getRMContainer( + container.getId()); + Assert.assertNull(rmContainer); + + Assert.assertEquals(allocContainers + 1, metrics.getAllocatedContainers()); + Assert.assertEquals(aggrReleasedContainers + 1, + metrics.getAggregatedReleasedContainers()); + } + @Test(timeout = 60000) public void testAMCrashDuringAllocate() throws Exception { MockNM nm = new MockNM("h:1234", 4096, rm.getResourceTrackerService());