diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyMetrics.java new file mode 100644 index 00000000000..3571051f461 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyMetrics.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.metrics2.lib.MutableQuantiles; +import org.apache.hadoop.metrics2.lib.MutableRate; + + +import static org.apache.hadoop.metrics2.lib.Interns.info; + +@Metrics(about = "Metrics for AMRMProxy", context = "fedr") +public final class AMRMProxyMetrics { + + private static final MetricsInfo RECORD_INFO = + info("AMRMProxyMetrics", "Metrics for the AMRMProxy"); + @Metric("# of failed applications start requests") private MutableGaugeLong + failedAppStartRequests; + @Metric("# of failed register AM requests") private MutableGaugeLong + failedRegisterAMRequests; + @Metric("# of failed finish AM requests") private MutableGaugeLong + failedFinishAMRequests; + @Metric("# of failed allocate requests ") private MutableGaugeLong + failedAllocateRequests; + @Metric("# of failed application recoveries") private MutableGaugeLong + failedAppRecoveryCount; + // Aggregate metrics are shared, and don't have to be looked up per call + @Metric("Application start request latency(ms)") private MutableRate + totalSucceededAppStartRequests; + @Metric("Register application master latency(ms)") private MutableRate + totalSucceededRegisterAMRequests; + @Metric("Finish application master latency(ms)") private MutableRate + totalSucceededFinishAMRequests; + @Metric("Allocate latency(ms)") private MutableRate + totalSucceededAllocateRequests; + // Quantile latency in ms - this is needed for SLA (95%, 99%, etc) + private MutableQuantiles applicationStartLatency; + private MutableQuantiles registerAMLatency; + private MutableQuantiles finishAMLatency; + private MutableQuantiles allocateLatency; + private static volatile AMRMProxyMetrics instance = null; + private MetricsRegistry registry; + + private AMRMProxyMetrics() { + registry = new MetricsRegistry(RECORD_INFO); + registry.tag(RECORD_INFO, "AMRMProxy"); + + applicationStartLatency = registry + .newQuantiles("applicationStartLatency", "latency of app start", "ops", + "latency", 10); + registerAMLatency = registry + .newQuantiles("registerAMLatency", "latency of register AM", "ops", + "latency", 10); + finishAMLatency = registry + .newQuantiles("finishAMLatency", "latency of finish AM", "ops", + "latency", 10); + allocateLatency = registry + .newQuantiles("allocateLatency", "latency of allocate", "ops", + "latency", 10); + } + + /** + * Initialize the singleton instance. + * + * @return the singleton + */ + public static AMRMProxyMetrics getMetrics() { + synchronized (AMRMProxyMetrics.class) { + if (instance == null) { + instance = DefaultMetricsSystem.instance() + .register("AMRMProxyMetrics", "Metrics for the Yarn AMRMProxy", + new AMRMProxyMetrics()); + } + } + return instance; + } + + @VisibleForTesting public long getNumSucceededAppStartRequests() { + return totalSucceededAppStartRequests.lastStat().numSamples(); + } + + @VisibleForTesting public double getLatencySucceededAppStartRequests() { + return totalSucceededAppStartRequests.lastStat().mean(); + } + + public void succeededAppStartRequests(long duration) { + totalSucceededAppStartRequests.add(duration); + applicationStartLatency.add(duration); + } + + @VisibleForTesting public long getNumSucceededRegisterAMRequests() { + return totalSucceededRegisterAMRequests.lastStat().numSamples(); + } + + @VisibleForTesting public double getLatencySucceededRegisterAMRequests() { + return totalSucceededRegisterAMRequests.lastStat().mean(); + } + + public void succeededRegisterAMRequests(long duration) { + totalSucceededRegisterAMRequests.add(duration); + registerAMLatency.add(duration); + } + + @VisibleForTesting public long getNumSucceededFinishAMRequests() { + return totalSucceededFinishAMRequests.lastStat().numSamples(); + } + + @VisibleForTesting public double getLatencySucceededFinishAMRequests() { + return totalSucceededFinishAMRequests.lastStat().mean(); + } + + public void succeededFinishAMRequests(long duration) { + totalSucceededFinishAMRequests.add(duration); + finishAMLatency.add(duration); + } + + @VisibleForTesting public long getNumSucceededAllocateRequests() { + return totalSucceededAllocateRequests.lastStat().numSamples(); + } + + @VisibleForTesting public double getLatencySucceededAllocateRequests() { + return totalSucceededAllocateRequests.lastStat().mean(); + } + + public void succeededAllocateRequests(long duration) { + totalSucceededAllocateRequests.add(duration); + allocateLatency.add(duration); + } + + public long getFailedAppStartRequests() { + return failedAppStartRequests.value(); + } + + public void incrFailedAppStartRequests() { + failedAppStartRequests.incr(); + } + + public long getFailedRegisterAMRequests() { + return failedRegisterAMRequests.value(); + } + + public void incrFailedRegisterAMRequests() { + failedRegisterAMRequests.incr(); + } + + public long getFailedFinishAMRequests() { + return failedFinishAMRequests.value(); + } + + public void incrFailedFinishAMRequests() { + failedFinishAMRequests.incr(); + } + + public long getFailedAllocateRequests() { + return failedAllocateRequests.value(); + } + + public void incrFailedAllocateRequests() { + failedAllocateRequests.incr(); + } + + public long getFailedAppRecoveryCount() { + return failedAppRecoveryCount.value(); + } + + public void incrFailedAppRecoveryCount() { + failedAppRecoveryCount.incr(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java index 52f33135084..d3c4a1d5288 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -75,7 +75,9 @@ import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; +import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.MonotonicClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,6 +99,7 @@ private static final String NMSS_USER_KEY = "user"; private static final String NMSS_AMRMTOKEN_KEY = "amrmtoken"; + private final Clock clock = new MonotonicClock(); private Server server; private final Context nmContext; private final AsyncDispatcher dispatcher; @@ -104,6 +107,7 @@ private AMRMProxyTokenSecretManager secretManager; private Map applPipelineMap; private RegistryOperations registry; + private AMRMProxyMetrics metrics; /** * Creates an instance of the service. @@ -122,6 +126,8 @@ public AMRMProxyService(Context nmContext, AsyncDispatcher dispatcher) { this.dispatcher.register(ApplicationEventType.class, new ApplicationEventHandler()); + + metrics = AMRMProxyMetrics.getMetrics(); } @Override @@ -272,6 +278,7 @@ public void recover() throws IOException { } catch (Throwable e) { LOG.error("Exception when recovering " + attemptId + ", removing it from NMStateStore and move on", e); + this.metrics.incrFailedAppRecoveryCount(); this.nmContext.getNMStateStore().removeAMRMProxyAppContext(attemptId); } } @@ -286,13 +293,26 @@ public void recover() throws IOException { public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws YarnException, IOException { - LOG.info("Registering application master." + " Host:" - + request.getHost() + " Port:" + request.getRpcPort() - + " Tracking Url:" + request.getTrackingUrl()); - RequestInterceptorChainWrapper pipeline = - authorizeAndGetInterceptorChain(); - return pipeline.getRootInterceptor() - .registerApplicationMaster(request); + long startTime = clock.getTime(); + try { + RequestInterceptorChainWrapper pipeline = + authorizeAndGetInterceptorChain(); + LOG.info("Registering application master." + " Host:" + request.getHost() + + " Port:" + request.getRpcPort() + " Tracking Url:" + request + .getTrackingUrl() + " for application " + pipeline + .getApplicationAttemptId()); + RegisterApplicationMasterResponse response = + pipeline.getRootInterceptor().registerApplicationMaster(request); + + long endTime = clock.getTime(); + this.metrics.succeededRegisterAMRequests(endTime - startTime); + LOG.info("RegisterAM processing finished in {} ms for application {}", + endTime - startTime, pipeline.getApplicationAttemptId()); + return response; + } catch (Throwable t) { + this.metrics.incrFailedRegisterAMRequests(); + throw t; + } } /** @@ -304,11 +324,25 @@ public RegisterApplicationMasterResponse registerApplicationMaster( public FinishApplicationMasterResponse finishApplicationMaster( FinishApplicationMasterRequest request) throws YarnException, IOException { - LOG.info("Finishing application master. Tracking Url:" - + request.getTrackingUrl()); - RequestInterceptorChainWrapper pipeline = - authorizeAndGetInterceptorChain(); - return pipeline.getRootInterceptor().finishApplicationMaster(request); + long startTime = clock.getTime(); + try { + RequestInterceptorChainWrapper pipeline = + authorizeAndGetInterceptorChain(); + LOG.info("Finishing application master for {}. Tracking Url: {}", + pipeline.getApplicationAttemptId(), request.getTrackingUrl()); + FinishApplicationMasterResponse response = + pipeline.getRootInterceptor().finishApplicationMaster(request); + + long endTime = clock.getTime(); + this.metrics.succeededFinishAMRequests(endTime - startTime); + LOG.info("FinishAM finished with isUnregistered = {} in {} ms for {}", + response.getIsUnregistered(), endTime - startTime, + pipeline.getApplicationAttemptId()); + return response; + } catch (Throwable t) { + this.metrics.incrFailedFinishAMRequests(); + throw t; + } } /** @@ -321,16 +355,26 @@ public FinishApplicationMasterResponse finishApplicationMaster( @Override public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { - AMRMTokenIdentifier amrmTokenIdentifier = - YarnServerSecurityUtils.authorizeRequest(); - RequestInterceptorChainWrapper pipeline = - getInterceptorChain(amrmTokenIdentifier); - AllocateResponse allocateResponse = - pipeline.getRootInterceptor().allocate(request); - - updateAMRMTokens(amrmTokenIdentifier, pipeline, allocateResponse); - - return allocateResponse; + long startTime = clock.getTime(); + try { + AMRMTokenIdentifier amrmTokenIdentifier = + YarnServerSecurityUtils.authorizeRequest(); + RequestInterceptorChainWrapper pipeline = + getInterceptorChain(amrmTokenIdentifier); + AllocateResponse allocateResponse = + pipeline.getRootInterceptor().allocate(request); + + updateAMRMTokens(amrmTokenIdentifier, pipeline, allocateResponse); + + long endTime = clock.getTime(); + this.metrics.succeededAllocateRequests(endTime - startTime); + LOG.info("Allocate processing finished in {} ms for application {}", + endTime - startTime, pipeline.getApplicationAttemptId()); + return allocateResponse; + } catch (Throwable t) { + this.metrics.incrFailedAllocateRequests(); + throw t; + } } /** @@ -343,40 +387,47 @@ public AllocateResponse allocate(AllocateRequest request) */ public void processApplicationStartRequest(StartContainerRequest request) throws IOException, YarnException { - LOG.info("Callback received for initializing request " - + "processing pipeline for an AM"); - ContainerTokenIdentifier containerTokenIdentifierForKey = - BuilderUtils.newContainerTokenIdentifier(request - .getContainerToken()); - ApplicationAttemptId appAttemptId = - containerTokenIdentifierForKey.getContainerID() - .getApplicationAttemptId(); - Credentials credentials = - YarnServerSecurityUtils.parseCredentials(request - .getContainerLaunchContext()); - - Token amrmToken = - getFirstAMRMToken(credentials.getAllTokens()); - if (amrmToken == null) { - throw new YarnRuntimeException( - "AMRMToken not found in the start container request for application:" - + appAttemptId.toString()); - } - - // Substitute the existing AMRM Token with a local one. Keep the rest of the - // tokens in the credentials intact. - Token localToken = - this.secretManager.createAndGetAMRMToken(appAttemptId); - credentials.addToken(localToken.getService(), localToken); - - DataOutputBuffer dob = new DataOutputBuffer(); - credentials.writeTokenStorageToStream(dob); - request.getContainerLaunchContext().setTokens( - ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); + long startTime = clock.getTime(); + try { + LOG.info("Callback received for initializing request " + + "processing pipeline for an AM"); + ContainerTokenIdentifier containerTokenIdentifierForKey = + BuilderUtils.newContainerTokenIdentifier(request.getContainerToken()); + ApplicationAttemptId appAttemptId = + containerTokenIdentifierForKey.getContainerID() + .getApplicationAttemptId(); + Credentials credentials = YarnServerSecurityUtils + .parseCredentials(request.getContainerLaunchContext()); + + Token amrmToken = + getFirstAMRMToken(credentials.getAllTokens()); + if (amrmToken == null) { + throw new YarnRuntimeException( + "AMRMToken not found in the start container request for " + + "application:" + appAttemptId.toString()); + } - initializePipeline(appAttemptId, - containerTokenIdentifierForKey.getApplicationSubmitter(), amrmToken, - localToken, null, false, credentials); + // Substitute the existing AMRM Token with a local one. Keep the rest of + // the tokens in the credentials intact. + Token localToken = + this.secretManager.createAndGetAMRMToken(appAttemptId); + credentials.addToken(localToken.getService(), localToken); + + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + request.getContainerLaunchContext() + .setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); + + initializePipeline(appAttemptId, + containerTokenIdentifierForKey.getApplicationSubmitter(), amrmToken, + localToken, null, false, credentials); + + long endTime = clock.getTime(); + this.metrics.succeededAppStartRequests(endTime - startTime); + } catch (Throwable t) { + this.metrics.incrFailedAppStartRequests(); + throw t; + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyMetrics.java new file mode 100644 index 00000000000..f7cadd1bf32 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyMetrics.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestAMRMProxyMetrics extends BaseAMRMProxyTest { + public static final Logger LOG = + LoggerFactory.getLogger(TestAMRMProxyMetrics.class); + private static AMRMProxyMetrics metrics; + + @BeforeClass public static void init() { + metrics = AMRMProxyMetrics.getMetrics(); + LOG.info("Test: aggregate metrics are initialized correctly"); + + Assert.assertEquals(0, metrics.getFailedAppStartRequests()); + Assert.assertEquals(0, metrics.getFailedRegisterAMRequests()); + Assert.assertEquals(0, metrics.getFailedFinishAMRequests()); + Assert.assertEquals(0, metrics.getFailedAllocateRequests()); + + Assert.assertEquals(0, metrics.getNumSucceededAppStartRequests()); + Assert + .assertEquals(0, metrics.getNumSucceededRegisterAMRequests()); + Assert.assertEquals(0, metrics.getNumSucceededFinishAMRequests()); + Assert.assertEquals(0, metrics.getNumSucceededAllocateRequests()); + + LOG.info("Test: aggregate metrics are updated correctly"); + } + + @Test public void testAllocateRequestWithNullValues() throws Exception { + long failedAppStartRequests = metrics.getFailedAppStartRequests(); + long failedRegisterAMRequests = + metrics.getFailedRegisterAMRequests(); + long failedFinishAMRequests = metrics.getFailedFinishAMRequests(); + long failedAllocateRequests = metrics.getFailedAllocateRequests(); + + long succeededAppStartRequests = + metrics.getNumSucceededAppStartRequests(); + long succeededRegisterAMRequests = + metrics.getNumSucceededRegisterAMRequests(); + long succeededFinishAMRequests = + metrics.getNumSucceededFinishAMRequests(); + long succeededAllocateRequests = + metrics.getNumSucceededAllocateRequests(); + + int testAppId = 1; + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull(registerResponse); + Assert + .assertEquals(Integer.toString(testAppId), registerResponse.getQueue()); + + AllocateResponse allocateResponse = allocate(testAppId); + Assert.assertNotNull(allocateResponse); + + FinishApplicationMasterResponse finshResponse = + finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED); + + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + + Assert.assertEquals(failedAppStartRequests, + metrics.getFailedAppStartRequests()); + Assert.assertEquals(failedRegisterAMRequests, + metrics.getFailedRegisterAMRequests()); + Assert.assertEquals(failedFinishAMRequests, + metrics.getFailedFinishAMRequests()); + Assert.assertEquals(failedAllocateRequests, + metrics.getFailedAllocateRequests()); + + Assert.assertEquals(succeededAppStartRequests, + metrics.getNumSucceededAppStartRequests()); + Assert.assertEquals(1 + succeededRegisterAMRequests, + metrics.getNumSucceededRegisterAMRequests()); + Assert.assertEquals(1 + succeededFinishAMRequests, + metrics.getNumSucceededFinishAMRequests()); + Assert.assertEquals(1 + succeededAllocateRequests, + metrics.getNumSucceededAllocateRequests()); + } + + @Test public void testFinishOneApplicationMasterWithFailure() + throws Exception { + long failedAppStartRequests = metrics.getFailedAppStartRequests(); + long failedRegisterAMRequests = + metrics.getFailedRegisterAMRequests(); + long failedFinishAMRequests = metrics.getFailedFinishAMRequests(); + long failedAllocateRequests = metrics.getFailedAllocateRequests(); + + long succeededAppStartRequests = + metrics.getNumSucceededAppStartRequests(); + long succeededRegisterAMRequests = + metrics.getNumSucceededRegisterAMRequests(); + long succeededFinishAMRequests = + metrics.getNumSucceededFinishAMRequests(); + long succeededAllocateRequests = + metrics.getNumSucceededAllocateRequests(); + + int testAppId = 1; + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull(registerResponse); + Assert + .assertEquals(Integer.toString(testAppId), registerResponse.getQueue()); + + FinishApplicationMasterResponse finshResponse = + finishApplicationMaster(testAppId, FinalApplicationStatus.FAILED); + + Assert.assertNotNull(finshResponse); + + try { + // Try to finish an application master that is already finished. + finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED); + Assert + .fail("The request to finish application master should have failed"); + } catch (Throwable ex) { + // This is expected. So nothing required here. + LOG.info("Finish registration failed as expected because it was not " + + "registered"); + } + + Assert.assertEquals(failedAppStartRequests, + metrics.getFailedAppStartRequests()); + Assert.assertEquals(failedRegisterAMRequests, + metrics.getFailedRegisterAMRequests()); + Assert.assertEquals(1 + failedFinishAMRequests, + metrics.getFailedFinishAMRequests()); + Assert.assertEquals(failedAllocateRequests, + metrics.getFailedAllocateRequests()); + + Assert.assertEquals(succeededAppStartRequests, + metrics.getNumSucceededAppStartRequests()); + Assert.assertEquals(1 + succeededRegisterAMRequests, + metrics.getNumSucceededRegisterAMRequests()); + Assert.assertEquals(1 + succeededFinishAMRequests, + metrics.getNumSucceededFinishAMRequests()); + Assert.assertEquals(succeededAllocateRequests, + metrics.getNumSucceededAllocateRequests()); + } +}