diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyMetrics.java new file mode 100644 index 00000000000..3571051f461 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyMetrics.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+
+
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+
+@Metrics(about = "Metrics for AMRMProxy", context = "fedr")
+public final class AMRMProxyMetrics {
+
+ private static final MetricsInfo RECORD_INFO =
+ info("AMRMProxyMetrics", "Metrics for the AMRMProxy");
+ @Metric("# of failed applications start requests") private MutableGaugeLong
+ failedAppStartRequests;
+ @Metric("# of failed register AM requests") private MutableGaugeLong
+ failedRegisterAMRequests;
+ @Metric("# of failed finish AM requests") private MutableGaugeLong
+ failedFinishAMRequests;
+ @Metric("# of failed allocate requests ") private MutableGaugeLong
+ failedAllocateRequests;
+ @Metric("# of failed application recoveries") private MutableGaugeLong
+ failedAppRecoveryCount;
+ // Aggregate metrics are shared, and don't have to be looked up per call
+ @Metric("Application start request latency(ms)") private MutableRate
+ totalSucceededAppStartRequests;
+ @Metric("Register application master latency(ms)") private MutableRate
+ totalSucceededRegisterAMRequests;
+ @Metric("Finish application master latency(ms)") private MutableRate
+ totalSucceededFinishAMRequests;
+ @Metric("Allocate latency(ms)") private MutableRate
+ totalSucceededAllocateRequests;
+ // Quantile latency in ms - this is needed for SLA (95%, 99%, etc)
+ private MutableQuantiles applicationStartLatency;
+ private MutableQuantiles registerAMLatency;
+ private MutableQuantiles finishAMLatency;
+ private MutableQuantiles allocateLatency;
+ private static volatile AMRMProxyMetrics instance = null;
+ private MetricsRegistry registry;
+
+ private AMRMProxyMetrics() {
+ registry = new MetricsRegistry(RECORD_INFO);
+ registry.tag(RECORD_INFO, "AMRMProxy");
+
+ applicationStartLatency = registry
+ .newQuantiles("applicationStartLatency", "latency of app start", "ops",
+ "latency", 10);
+ registerAMLatency = registry
+ .newQuantiles("registerAMLatency", "latency of register AM", "ops",
+ "latency", 10);
+ finishAMLatency = registry
+ .newQuantiles("finishAMLatency", "latency of finish AM", "ops",
+ "latency", 10);
+ allocateLatency = registry
+ .newQuantiles("allocateLatency", "latency of allocate", "ops",
+ "latency", 10);
+ }
+
+ /**
+ * Initialize the singleton instance.
+ *
+ * @return the singleton
+ */
+ public static AMRMProxyMetrics getMetrics() {
+ synchronized (AMRMProxyMetrics.class) {
+ if (instance == null) {
+ instance = DefaultMetricsSystem.instance()
+ .register("AMRMProxyMetrics", "Metrics for the Yarn AMRMProxy",
+ new AMRMProxyMetrics());
+ }
+ }
+ return instance;
+ }
+
+ @VisibleForTesting public long getNumSucceededAppStartRequests() {
+ return totalSucceededAppStartRequests.lastStat().numSamples();
+ }
+
+ @VisibleForTesting public double getLatencySucceededAppStartRequests() {
+ return totalSucceededAppStartRequests.lastStat().mean();
+ }
+
+ public void succeededAppStartRequests(long duration) {
+ totalSucceededAppStartRequests.add(duration);
+ applicationStartLatency.add(duration);
+ }
+
+ @VisibleForTesting public long getNumSucceededRegisterAMRequests() {
+ return totalSucceededRegisterAMRequests.lastStat().numSamples();
+ }
+
+ @VisibleForTesting public double getLatencySucceededRegisterAMRequests() {
+ return totalSucceededRegisterAMRequests.lastStat().mean();
+ }
+
+ public void succeededRegisterAMRequests(long duration) {
+ totalSucceededRegisterAMRequests.add(duration);
+ registerAMLatency.add(duration);
+ }
+
+ @VisibleForTesting public long getNumSucceededFinishAMRequests() {
+ return totalSucceededFinishAMRequests.lastStat().numSamples();
+ }
+
+ @VisibleForTesting public double getLatencySucceededFinishAMRequests() {
+ return totalSucceededFinishAMRequests.lastStat().mean();
+ }
+
+ public void succeededFinishAMRequests(long duration) {
+ totalSucceededFinishAMRequests.add(duration);
+ finishAMLatency.add(duration);
+ }
+
+ @VisibleForTesting public long getNumSucceededAllocateRequests() {
+ return totalSucceededAllocateRequests.lastStat().numSamples();
+ }
+
+ @VisibleForTesting public double getLatencySucceededAllocateRequests() {
+ return totalSucceededAllocateRequests.lastStat().mean();
+ }
+
+ public void succeededAllocateRequests(long duration) {
+ totalSucceededAllocateRequests.add(duration);
+ allocateLatency.add(duration);
+ }
+
+ public long getFailedAppStartRequests() {
+ return failedAppStartRequests.value();
+ }
+
+ public void incrFailedAppStartRequests() {
+ failedAppStartRequests.incr();
+ }
+
+ public long getFailedRegisterAMRequests() {
+ return failedRegisterAMRequests.value();
+ }
+
+ public void incrFailedRegisterAMRequests() {
+ failedRegisterAMRequests.incr();
+ }
+
+ public long getFailedFinishAMRequests() {
+ return failedFinishAMRequests.value();
+ }
+
+ public void incrFailedFinishAMRequests() {
+ failedFinishAMRequests.incr();
+ }
+
+ public long getFailedAllocateRequests() {
+ return failedAllocateRequests.value();
+ }
+
+ public void incrFailedAllocateRequests() {
+ failedAllocateRequests.incr();
+ }
+
+ public long getFailedAppRecoveryCount() {
+ return failedAppRecoveryCount.value();
+ }
+
+ public void incrFailedAppRecoveryCount() {
+ failedAppRecoveryCount.incr();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
index 52f33135084..d3c4a1d5288 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
@@ -75,7 +75,9 @@
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
+import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.MonotonicClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,6 +99,7 @@
private static final String NMSS_USER_KEY = "user";
private static final String NMSS_AMRMTOKEN_KEY = "amrmtoken";
+ private final Clock clock = new MonotonicClock();
private Server server;
private final Context nmContext;
private final AsyncDispatcher dispatcher;
@@ -104,6 +107,7 @@
private AMRMProxyTokenSecretManager secretManager;
private Map
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestAMRMProxyMetrics extends BaseAMRMProxyTest {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(TestAMRMProxyMetrics.class);
+ private static AMRMProxyMetrics metrics;
+
+ @BeforeClass public static void init() {
+ metrics = AMRMProxyMetrics.getMetrics();
+ LOG.info("Test: aggregate metrics are initialized correctly");
+
+ Assert.assertEquals(0, metrics.getFailedAppStartRequests());
+ Assert.assertEquals(0, metrics.getFailedRegisterAMRequests());
+ Assert.assertEquals(0, metrics.getFailedFinishAMRequests());
+ Assert.assertEquals(0, metrics.getFailedAllocateRequests());
+
+ Assert.assertEquals(0, metrics.getNumSucceededAppStartRequests());
+ Assert
+ .assertEquals(0, metrics.getNumSucceededRegisterAMRequests());
+ Assert.assertEquals(0, metrics.getNumSucceededFinishAMRequests());
+ Assert.assertEquals(0, metrics.getNumSucceededAllocateRequests());
+
+ LOG.info("Test: aggregate metrics are updated correctly");
+ }
+
+ @Test public void testAllocateRequestWithNullValues() throws Exception {
+ long failedAppStartRequests = metrics.getFailedAppStartRequests();
+ long failedRegisterAMRequests =
+ metrics.getFailedRegisterAMRequests();
+ long failedFinishAMRequests = metrics.getFailedFinishAMRequests();
+ long failedAllocateRequests = metrics.getFailedAllocateRequests();
+
+ long succeededAppStartRequests =
+ metrics.getNumSucceededAppStartRequests();
+ long succeededRegisterAMRequests =
+ metrics.getNumSucceededRegisterAMRequests();
+ long succeededFinishAMRequests =
+ metrics.getNumSucceededFinishAMRequests();
+ long succeededAllocateRequests =
+ metrics.getNumSucceededAllocateRequests();
+
+ int testAppId = 1;
+ RegisterApplicationMasterResponse registerResponse =
+ registerApplicationMaster(testAppId);
+ Assert.assertNotNull(registerResponse);
+ Assert
+ .assertEquals(Integer.toString(testAppId), registerResponse.getQueue());
+
+ AllocateResponse allocateResponse = allocate(testAppId);
+ Assert.assertNotNull(allocateResponse);
+
+ FinishApplicationMasterResponse finshResponse =
+ finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+
+ Assert.assertNotNull(finshResponse);
+ Assert.assertEquals(true, finshResponse.getIsUnregistered());
+
+ Assert.assertEquals(failedAppStartRequests,
+ metrics.getFailedAppStartRequests());
+ Assert.assertEquals(failedRegisterAMRequests,
+ metrics.getFailedRegisterAMRequests());
+ Assert.assertEquals(failedFinishAMRequests,
+ metrics.getFailedFinishAMRequests());
+ Assert.assertEquals(failedAllocateRequests,
+ metrics.getFailedAllocateRequests());
+
+ Assert.assertEquals(succeededAppStartRequests,
+ metrics.getNumSucceededAppStartRequests());
+ Assert.assertEquals(1 + succeededRegisterAMRequests,
+ metrics.getNumSucceededRegisterAMRequests());
+ Assert.assertEquals(1 + succeededFinishAMRequests,
+ metrics.getNumSucceededFinishAMRequests());
+ Assert.assertEquals(1 + succeededAllocateRequests,
+ metrics.getNumSucceededAllocateRequests());
+ }
+
+ @Test public void testFinishOneApplicationMasterWithFailure()
+ throws Exception {
+ long failedAppStartRequests = metrics.getFailedAppStartRequests();
+ long failedRegisterAMRequests =
+ metrics.getFailedRegisterAMRequests();
+ long failedFinishAMRequests = metrics.getFailedFinishAMRequests();
+ long failedAllocateRequests = metrics.getFailedAllocateRequests();
+
+ long succeededAppStartRequests =
+ metrics.getNumSucceededAppStartRequests();
+ long succeededRegisterAMRequests =
+ metrics.getNumSucceededRegisterAMRequests();
+ long succeededFinishAMRequests =
+ metrics.getNumSucceededFinishAMRequests();
+ long succeededAllocateRequests =
+ metrics.getNumSucceededAllocateRequests();
+
+ int testAppId = 1;
+ RegisterApplicationMasterResponse registerResponse =
+ registerApplicationMaster(testAppId);
+ Assert.assertNotNull(registerResponse);
+ Assert
+ .assertEquals(Integer.toString(testAppId), registerResponse.getQueue());
+
+ FinishApplicationMasterResponse finshResponse =
+ finishApplicationMaster(testAppId, FinalApplicationStatus.FAILED);
+
+ Assert.assertNotNull(finshResponse);
+
+ try {
+ // Try to finish an application master that is already finished.
+ finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+ Assert
+ .fail("The request to finish application master should have failed");
+ } catch (Throwable ex) {
+ // This is expected. So nothing required here.
+ LOG.info("Finish registration failed as expected because it was not "
+ + "registered");
+ }
+
+ Assert.assertEquals(failedAppStartRequests,
+ metrics.getFailedAppStartRequests());
+ Assert.assertEquals(failedRegisterAMRequests,
+ metrics.getFailedRegisterAMRequests());
+ Assert.assertEquals(1 + failedFinishAMRequests,
+ metrics.getFailedFinishAMRequests());
+ Assert.assertEquals(failedAllocateRequests,
+ metrics.getFailedAllocateRequests());
+
+ Assert.assertEquals(succeededAppStartRequests,
+ metrics.getNumSucceededAppStartRequests());
+ Assert.assertEquals(1 + succeededRegisterAMRequests,
+ metrics.getNumSucceededRegisterAMRequests());
+ Assert.assertEquals(1 + succeededFinishAMRequests,
+ metrics.getNumSucceededFinishAMRequests());
+ Assert.assertEquals(succeededAllocateRequests,
+ metrics.getNumSucceededAllocateRequests());
+ }
+}