diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java index 6169e51744b..5135cad584d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java @@ -28,6 +28,7 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.metrics2.lib.MutableQuantiles; import java.util.concurrent.atomic.AtomicBoolean; @@ -55,6 +56,7 @@ public static OpportunisticSchedulerMetrics getMetrics() { if(INSTANCE == null){ INSTANCE = new OpportunisticSchedulerMetrics(); registerMetrics(); + INSTANCE.initialize(); isInitialized.set(true); } } @@ -62,6 +64,13 @@ public static OpportunisticSchedulerMetrics getMetrics() { return INSTANCE; } + private void initialize() { + allocateLatencyOQuantiles = + registry.newQuantiles("AllocateLatencyOpportunistic", + "Latency to allocate opportunistic containers", "ops", + "latency", 5); + } + private static void registerMetrics() { registry = new MetricsRegistry(RECORD_INFO); registry.tag(RECORD_INFO, "ResourceManager"); @@ -86,6 +95,8 @@ private static void registerMetrics() { @Metric("Aggregate # of allocated off-switch opportunistic containers") MutableCounterLong aggregateOffSwitchOContainersAllocated; + MutableQuantiles allocateLatencyOQuantiles; + @VisibleForTesting public int getAllocatedContainers() { return allocatedOContainers.value(); @@ -138,4 +149,8 @@ public void incrRackLocalOppContainers() { public void incrOffSwitchOppContainers() { aggregateOffSwitchOContainersAllocated.incr(); } + + public void addAllocateOLatencyEntry(long latency) { + allocateLatencyOQuantiles.add(latency); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 10c24022daa..102a49d2ffb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -241,9 +242,15 @@ String getResourceName() { private final Map nodeLocations = new HashMap<>(); private final Map rackLocations = new HashMap<>(); private final ResourceRequest request; + private final long timestamp; EnrichedResourceRequest(ResourceRequest request) { this.request = request; + timestamp = SystemClock.getInstance().getTime(); + } + + long getTimestamp() { + return timestamp; } ResourceRequest getRequest() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java index 246d450668d..1ccb9950c7b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java @@ -18,9 +18,12 @@ package org.apache.hadoop.yarn.server.scheduler; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; +import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; +import org.apache.hadoop.yarn.util.SystemClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -191,7 +194,14 @@ public void matchAllocationToOutstandingRequest(Resource capability, err.removeLocation(allocation.getResourceName()); } } + getOppSchedulerMetrics().addAllocateOLatencyEntry( + SystemClock.getInstance().getTime() - err.getTimestamp()); } } } + + @VisibleForTesting + OpportunisticSchedulerMetrics getOppSchedulerMetrics() { + return OpportunisticSchedulerMetrics.getMetrics(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java index 57e397d0102..3ab7c300112 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; @@ -47,6 +48,14 @@ import java.util.List; import java.util.Set; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + public class TestOpportunisticContainerAllocator { private static final Logger LOG = @@ -829,4 +838,48 @@ public void testMaxAllocationsPerAMHeartbeatWithHighLimit() // all containers should be allocated in single heartbeat. Assert.assertEquals(20, containers.size()); } + + @Test + public void testAllocationLatencyMetrics() throws Exception { + oppCntxt = spy(oppCntxt); + OpportunisticSchedulerMetrics metrics = + mock(OpportunisticSchedulerMetrics.class); + when(oppCntxt.getOppSchedulerMetrics()).thenReturn(metrics); + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + List reqs = + Arrays.asList( + ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)), + ResourceRequest.newInstance(Priority.newInstance(1), "h6", + Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)), + ResourceRequest.newInstance(Priority.newInstance(1), "/r3", + Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h3", 1234), "h3:1234", "/r2"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h5", 1234), "h5:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h4", 1234), "h4:1234", "/r2"))); + + List containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + LOG.info("Containers: {}", containers); + Assert.assertEquals(2, containers.size()); + verify(metrics, times(2)).addAllocateOLatencyEntry(anyLong()); + } }