diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java index 6d75471518b..e7499cc10c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -51,6 +51,8 @@ private MutableGaugeInt numAppsFailedRetrieved; @Metric("# of multiple applications reports failed to be retrieved") private MutableGaugeInt numMultipleAppsFailedRetrieved; + @Metric("# of applicationAttempt reports failed to be retrieved") + private MutableGaugeInt numAppAttemptsFailedRetrieved; // Aggregate metrics are shared, and don't have to be looked up per call @Metric("Total number of successful Submitted apps and latency(ms)") @@ -64,6 +66,9 @@ @Metric("Total number of successful Retrieved multiple apps reports and " + "latency(ms)") private MutableRate totalSucceededMultipleAppsRetrieved; + @Metric("Total number of successful Retrieved appAttempt reports and latency(ms)") + private MutableRate totalSucceededAppAttemptsRetrieved; + /** * Provide quantile counters for all latencies. @@ -73,6 +78,7 @@ private MutableQuantiles killApplicationLatency; private MutableQuantiles getApplicationReportLatency; private MutableQuantiles getApplicationsReportLatency; + private MutableQuantiles getApplicationAttemptReportLatency; private static volatile RouterMetrics INSTANCE = null; private static MetricsRegistry registry; @@ -213,6 +219,11 @@ public void succeededMultipleAppsRetrieved(long duration) { getApplicationsReportLatency.add(duration); } + public void succeededAppAttemptsRetrieved(long duration) { + totalSucceededAppAttemptsRetrieved.add(duration); + getApplicationAttemptReportLatency.add(duration); + } + public void incrAppsFailedCreated() { numAppsFailedCreated.incr(); } @@ -233,4 +244,8 @@ public void incrMultipleAppsFailedRetrieved() { numMultipleAppsFailedRetrieved.incr(); } + public void incrAppAttemptsFailedRetrieved() { + numAppAttemptsFailedRetrieved.incr(); + } + } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index c4ae6ab4d4b..8ec0a7e0d75 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -749,11 +749,72 @@ public GetClusterNodeLabelsResponse getClusterNodeLabels( throw new NotImplementedException("Code is not implemented"); } + /** + * The YARN Router will forward to the respective YARN RM in which the AM is + * running. + * + * Possible failure: + * + * Client: identical behavior as {@code ClientRMService}. + * + * Router: the Client will timeout and resubmit the request. + * + * ResourceManager: the Router will timeout and the call will fail. + * + * State Store: the Router will timeout and it will retry depending on the + * FederationFacade settings - if the failure happened before the select + * operation. + */ @Override public GetApplicationAttemptReportResponse getApplicationAttemptReport( GetApplicationAttemptReportRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + long startTime = clock.getTime(); + + if (request == null || request.getApplicationAttemptId() == null) { + routerMetrics.incrAppAttemptsFailedRetrieved(); + RouterServerUtil.logAndThrowException( + "Missing getApplicationAttemptReport request or applicationAttemptId information.", + null); + } + + SubClusterId subClusterId = null; + + try { + subClusterId = federationFacade + .getApplicationHomeSubCluster(request.getApplicationAttemptId().getApplicationId()); + } catch (YarnException e) { + routerMetrics.incrAppAttemptsFailedRetrieved(); + RouterServerUtil + .logAndThrowException("ApplicationAttempt " + request.getApplicationAttemptId() + + "belongs to Application " + request.getApplicationAttemptId().getApplicationId() + + " does not exist in FederationStateStore", e); + } + + ApplicationClientProtocol clientRMProxy = + getClientRMProxyForSubCluster(subClusterId); + + GetApplicationAttemptReportResponse response = null; + try { + response = clientRMProxy.getApplicationAttemptReport(request); + } catch (Exception e) { + routerMetrics.incrAppsFailedRetrieved(); + LOG.error("Unable to get the applicationAttempt report for " + + request.getApplicationAttemptId() + "to SubCluster " + + subClusterId.getId(), e); + throw e; + } + + if (response == null) { + LOG.error("No response when attempting to retrieve the report of " + + "the applicationAttempt " + request.getApplicationAttemptId() + " to SubCluster " + + subClusterId.getId()); + } + + long stopTime = clock.getTime(); + routerMetrics.succeededAppAttemptsRetrieved(stopTime - startTime); + return response; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index ee6e7b8eaf6..5bba1306da7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -29,6 +29,8 @@ import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; @@ -38,6 +40,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.Priority; @@ -410,6 +413,89 @@ public void testGetApplicationEmptyRequest() } } + /** + * This test validates the correctness of GetApplicationAttemptReport in case the + * application exists in the cluster. + */ + @Test + public void testGetApplicationAttemptReport() + throws YarnException, IOException, InterruptedException { + LOG.info("Test FederationClientInterceptor: Get Application Report"); + + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationAttemptId appAttemptID = + ApplicationAttemptId.newInstance(appId, 1); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + + // Submit the application we want the report later + SubmitApplicationResponse response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + + GetApplicationAttemptReportRequest requestGet = + GetApplicationAttemptReportRequest.newInstance(appAttemptID); + + GetApplicationAttemptReportResponse responseGet = + interceptor.getApplicationAttemptReport(requestGet); + + Assert.assertNotNull(responseGet); + } + + /** + * This test validates the correctness of GetApplicationAttemptReport in case the + * application does not exist in StateStore. + */ + @Test + public void testGetApplicationAttemptNotExists() + throws YarnException, IOException, InterruptedException { + LOG.info( + "Test ApplicationClientProtocol: Get ApplicationAttempt Report - Not Exists"); + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationAttemptId appAttemptID = + ApplicationAttemptId.newInstance(appId, 1); + GetApplicationAttemptReportRequest requestGet = + GetApplicationAttemptReportRequest.newInstance(appAttemptID); + try { + interceptor.getApplicationAttemptReport(requestGet); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().equals("ApplicationAttempt " + appAttemptID + + "belongs to Application " + appId + " does not exist in FederationStateStore")); + } + } + + /** + * This test validates the correctness of GetApplicationAttemptReport in case of + * empty request. + */ + @Test + public void testGetApplicationAttemptEmptyRequest() + throws YarnException, IOException, InterruptedException { + LOG.info( + "Test FederationClientInterceptor: Get ApplicationAttempt Report - Empty"); + try { + interceptor.getApplicationAttemptReport(null); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue( + e.getMessage().startsWith("Missing getApplicationAttemptReport request or " + + "applicationAttemptId information.")); + } + try { + interceptor + .getApplicationAttemptReport(GetApplicationAttemptReportRequest.newInstance(null)); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue( + e.getMessage().startsWith("Missing getApplicationAttemptReport request or " + + "applicationAttemptId information.")); + } + } + + @Test public void testGetClusterMetricsRequest() throws YarnException, IOException { LOG.info("Test FederationClientInterceptor : Get Cluster Metrics request");