diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java index 6169e51744b..585f8f1c0ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java @@ -28,6 +28,7 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.metrics2.lib.MutableQuantiles; import java.util.concurrent.atomic.AtomicBoolean; @@ -86,6 +87,9 @@ private static void registerMetrics() { @Metric("Aggregate # of allocated off-switch opportunistic containers") MutableCounterLong aggregateOffSwitchOContainersAllocated; + @Metric("Aggregate latency for opportunistic container allocation") + MutableQuantiles allocateLatencyOQuantiles; + @VisibleForTesting public int getAllocatedContainers() { return allocatedOContainers.value(); @@ -138,4 +142,8 @@ public void incrRackLocalOppContainers() { public void incrOffSwitchOppContainers() { aggregateOffSwitchOContainersAllocated.incr(); } + + public void addAllocateOLatencyEntry(long latency) { + allocateLatencyOQuantiles.add(latency); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 10c24022daa..0ce19763803 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -22,6 +22,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -241,9 +242,15 @@ String getResourceName() { private final Map nodeLocations = new HashMap<>(); private final Map rackLocations = new HashMap<>(); private final ResourceRequest request; + private final long timestamp; EnrichedResourceRequest(ResourceRequest request) { this.request = request; + timestamp = Time.monotonicNow(); + } + + long getTimestamp() { + return timestamp; } ResourceRequest getRequest() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java index 246d450668d..84e7bb9dce8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java @@ -18,9 +18,12 @@ package org.apache.hadoop.yarn.server.scheduler; +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; +import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -191,7 +194,14 @@ public void matchAllocationToOutstandingRequest(Resource capability, err.removeLocation(allocation.getResourceName()); } } + getOppSchedulerMetrics().addAllocateOLatencyEntry( + Time.monotonicNow() - err.getTimestamp()); } } } + + @VisibleForTesting + OpportunisticSchedulerMetrics getOppSchedulerMetrics() { + return OpportunisticSchedulerMetrics.getMetrics(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java index 57e397d0102..fd6abff72a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; @@ -43,10 +44,18 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + public class TestOpportunisticContainerAllocator { private static final Logger LOG = @@ -829,4 +838,51 @@ public void testMaxAllocationsPerAMHeartbeatWithHighLimit() // all containers should be allocated in single heartbeat. Assert.assertEquals(20, containers.size()); } + + /** + * Test opportunistic container allocation latency metrics. + * @throws Exception + */ + @Test + public void testAllocationLatencyMetrics() throws Exception { + oppCntxt = spy(oppCntxt); + OpportunisticSchedulerMetrics metrics = + mock(OpportunisticSchedulerMetrics.class); + when(oppCntxt.getOppSchedulerMetrics()).thenReturn(metrics); + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + Collections.emptyList(), Collections.emptyList()); + final Priority priority = Priority.newInstance(1); + final Resource capability = Resources.createResource(1 * GB); + final ExecutionTypeRequest oppRequest = ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true); + List reqs = + Arrays.asList( + ResourceRequest.newInstance(priority, "*", + capability, 2, true, null, oppRequest), + ResourceRequest.newInstance(priority, "h6", + capability, 2, true, null, oppRequest), + ResourceRequest.newInstance(priority, "/r3", + capability, 2, true, null, oppRequest)); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h3", 1234), "h3:1234", "/r2"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h5", 1234), "h5:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h4", 1234), "h4:1234", "/r2"))); + + List containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + LOG.info("Containers: {}", containers); + Assert.assertEquals(2, containers.size()); + // for each allocated container, latency should be added. + verify(metrics, times(2)).addAllocateOLatencyEntry(anyLong()); + } }