diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java index 884e06e4ba0..946922592fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -47,6 +47,8 @@ private MutableGaugeInt numAppsFailedCreated; @Metric("# of applications failed to be killed") private MutableGaugeInt numAppsFailedKilled; + @Metric("# of failApplicationAttempt failed to be failed") + private MutableGaugeInt numAppAttemptsFailedToFail; @Metric("# of application reports failed to be retrieved") private MutableGaugeInt numAppsFailedRetrieved; @Metric("# of multiple applications reports failed to be retrieved") @@ -57,6 +59,8 @@ private MutableRate totalSucceededAppsSubmitted; @Metric("Total number of successful Killed apps and latency(ms)") private MutableRate totalSucceededAppsKilled; + @Metric("Total number of successful Failed appAttempts and latency(ms)") + private MutableRate totalSucceededAppAttempsFailed; @Metric("Total number of successful Created apps and latency(ms)") private MutableRate totalSucceededAppsCreated; @Metric("Total number of successful Retrieved app reports and latency(ms)") @@ -71,6 +75,7 @@ private MutableQuantiles submitApplicationLatency; private MutableQuantiles getNewApplicationLatency; private MutableQuantiles killApplicationLatency; + private MutableQuantiles failApplicationAttemptLatency; private MutableQuantiles getApplicationReportLatency; private MutableQuantiles getApplicationsReportLatency; @@ -86,6 +91,9 @@ private RouterMetrics() { "latency of submit application", "ops", "latency", 10); killApplicationLatency = registry.newQuantiles("killApplicationLatency", "latency of kill application", "ops", "latency", 10); + failApplicationAttemptLatency = registry.newQuantiles( + "failApplicationAttemptLatency", + "latency of fail applicationAttempt", "ops", "latency", 10); getApplicationReportLatency = registry.newQuantiles("getApplicationReportLatency", "latency of get application report", "ops", "latency", 10); @@ -203,6 +211,11 @@ public void succeededAppsKilled(long duration) { killApplicationLatency.add(duration); } + public void succeededAppAttemptsToFail(long duration) { + totalSucceededAppAttempsFailed.add(duration); + failApplicationAttemptLatency.add(duration); + } + public void succeededAppsRetrieved(long duration) { totalSucceededAppsRetrieved.add(duration); getApplicationReportLatency.add(duration); @@ -225,6 +238,10 @@ public void incrAppsFailedKilled() { numAppsFailedKilled.incr(); } + public void incrAppAttemptsFailedToFail() { + numAppAttemptsFailedToFail.incr(); + } + public void incrAppsFailedRetrieved() { numAppsFailedRetrieved.incr(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index a721fe0d8ec..c85cb1165ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -792,10 +792,77 @@ public CancelDelegationTokenResponse cancelDelegationToken( throw new NotImplementedException("Code is not implemented"); } + /** + * The YARN Router will forward to the respective YARN RM in which the AM is + * running. + * + * Possible failure: + * + * Client: identical behavior as {@code ClientRMService}. + * + * Router: the Client will timeout and resubmit the request. + * + * ResourceManager: the Router will timeout and the call will fail. + * + * State Store: the Router will timeout and it will retry depending on the + * FederationFacade settings - if the failure happened before the select + * operation. + */ @Override public FailApplicationAttemptResponse failApplicationAttempt( FailApplicationAttemptRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + long startTime = clock.getTime(); + + if (request == null || request.getApplicationAttemptId() == null + || request.getApplicationAttemptId().getApplicationId() == null) { + routerMetrics.incrAppAttemptsFailedToFail(); + RouterServerUtil.logAndThrowException( + "Missing failApplicationAttempt " + + "request or applicationId " + + "or applicationAttemptId information.", + null); + } + + SubClusterId subClusterId = null; + + try { + subClusterId = federationFacade + .getApplicationHomeSubCluster( + request.getApplicationAttemptId().getApplicationId()); + } catch (YarnException e) { + routerMetrics.incrAppAttemptsFailedToFail(); + RouterServerUtil + .logAndThrowException("ApplicationAttempt " + + request.getApplicationAttemptId() + + "belongs to Application " + + request.getApplicationAttemptId().getApplicationId() + + " does not exist in FederationStateStore", e); + } + + ApplicationClientProtocol clientRMProxy = + getClientRMProxyForSubCluster(subClusterId); + + FailApplicationAttemptResponse response = null; + try { + response = clientRMProxy.failApplicationAttempt(request); + } catch (Exception e) { + routerMetrics.incrAppAttemptsFailedToFail(); + LOG.error("Unable to fail the applicationAttempt for " + + request.getApplicationAttemptId() + "to SubCluster " + + subClusterId.getId(), e); + throw e; + } + + if (response == null) { + LOG.error("No response when attempting to fail " + + "the applicationAttempt " + + request.getApplicationAttemptId() + " to SubCluster " + + subClusterId.getId()); + } + + long stopTime = clock.getTime(); + routerMetrics.succeededAppAttemptsToFail(stopTime - startTime); + return response; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index ee6e7b8eaf6..259b44b05ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -26,7 +26,10 @@ import java.util.Map; +import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.yarn.MockApps; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; @@ -38,6 +41,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.Priority; @@ -177,7 +181,7 @@ private SubmitApplicationRequest mockSubmitApplicationRequest( ApplicationId appId) { ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class); ApplicationSubmissionContext context = ApplicationSubmissionContext - .newInstance(appId, MockApps.newAppName(), "q1", + .newInstance(appId, MockApps.newAppName(), "default", Priority.newInstance(0), amContainerSpec, false, false, -1, Resources.createResource( YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB), @@ -332,6 +336,101 @@ public void testForceKillApplicationEmptyRequest() } } + /** + * This test validates the correctness of + * FailApplicationAttempt in case the + * application exists in the cluster. + */ + @Test + public void testfailApplicationAttempt() + throws YarnException, IOException, InterruptedException { + LOG.info("Test FederationClientInterceptor: " + + "Fail ApplicationAttempt"); + + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationAttemptId appAttemptID = + ApplicationAttemptId.newInstance(appId, 1); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + + // Submit the application we want the fail + // applicationAttempt later + SubmitApplicationResponse response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + + FailApplicationAttemptRequest requestFail = + FailApplicationAttemptRequest.newInstance(appAttemptID); + + FailApplicationAttemptResponse responseFail = + interceptor.failApplicationAttempt(requestFail); + + Assert.assertNotNull(responseFail); + } + + /** + * This test validates the correctness of + * FailApplicationAttempt in case the + * application does not exist in StateStore. + */ + @Test + public void testfailApplicationAttemptNotExists() + throws Exception { + LOG.info( + "Test ApplicationClientProtocol: " + + "Fail ApplicationAttempt - Not Exists"); + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationAttemptId appAttemptID = + ApplicationAttemptId.newInstance(appId, 1); + FailApplicationAttemptRequest requestFail = + FailApplicationAttemptRequest.newInstance(appAttemptID); + + LambdaTestUtils.intercept(YarnException.class, "ApplicationAttempt " + + appAttemptID + "belongs to Application " + + appId + " does not exist in FederationStateStore", + () -> interceptor.failApplicationAttempt(requestFail)); + } + + /** + * This test validates + * the correctness of FailApplicationAttempt in case of + * empty request. + */ + @Test + public void testfailApplicationAttemptEmptyRequest() + throws Exception { + LOG.info("Test FederationClientInterceptor: " + + "Fail ApplicationAttempt - Empty"); + + LambdaTestUtils.intercept(YarnException.class, + "Missing failApplicationAttempt " + + "request or applicationId " + + "or applicationAttemptId information.", + () -> interceptor.failApplicationAttempt(null)); + + LambdaTestUtils.intercept(YarnException.class, + "Missing failApplicationAttempt " + + "request or applicationId " + + "or applicationAttemptId information.", + () -> interceptor + .failApplicationAttempt( + FailApplicationAttemptRequest + .newInstance(null))); + + LambdaTestUtils.intercept(YarnException.class, + "Missing failApplicationAttempt " + + "request or applicationId " + + "or applicationAttemptId information.", + () -> interceptor + .failApplicationAttempt( + FailApplicationAttemptRequest.newInstance( + ApplicationAttemptId + .newInstance(null, 1) + ))); + } + /** * This test validates the correctness of GetApplicationReport in case the * application exists in the cluster.