diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index 15e1cea..9ee6fe0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -531,8 +532,20 @@ public GetApplicationsResponse getApplications(GetApplicationsRequest request) throws YarnException, IOException { validateRunning(); - - return GetApplicationsResponse.newInstance(null); + List reports = new ArrayList<>(); + for (ApplicationId applicationId : applicationMap) { + GetApplicationReportResponse response = + Records.newRecord(GetApplicationReportResponse.class); + ApplicationReport report = Records.newRecord(ApplicationReport.class); + report.setYarnApplicationState(YarnApplicationState.ACCEPTED); + report.setApplicationId(applicationId); + report.setCurrentApplicationAttemptId( + ApplicationAttemptId.newInstance(applicationId, 1)); + report.setAMRMToken(Token.newInstance(new byte[0], "", new byte[0], "")); + response.setApplicationReport(report); + reports.add(report); + } + return GetApplicationsResponse.newInstance(reports); } @Override @@ -567,8 +580,8 @@ public GetDelegationTokenResponse getDelegationToken( GetDelegationTokenRequest request) throws YarnException, IOException { validateRunning(); - - return GetDelegationTokenResponse.newInstance(null); + Token token = Token.newInstance(new byte[0], "", new byte[0], ""); + return GetDelegationTokenResponse.newInstance(token); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java index 6d75471..3edaac6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -51,6 +51,8 @@ private MutableGaugeInt numAppsFailedRetrieved; @Metric("# of multiple applications reports failed to be retrieved") private MutableGaugeInt numMultipleAppsFailedRetrieved; + @Metric("# of delegation token failed to obtain") + private MutableGaugeInt numDelegationTokenFailedObtain; // Aggregate metrics are shared, and don't have to be looked up per call @Metric("Total number of successful Submitted apps and latency(ms)") @@ -64,6 +66,9 @@ @Metric("Total number of successful Retrieved multiple apps reports and " + "latency(ms)") private MutableRate totalSucceededMultipleAppsRetrieved; + @Metric("Total number of successful Obtain delegation token and " + + "latency(ms)") + private MutableRate totalSucceededDelegationTokenObtain; /** * Provide quantile counters for all latencies. @@ -73,6 +78,7 @@ private MutableQuantiles killApplicationLatency; private MutableQuantiles getApplicationReportLatency; private MutableQuantiles getApplicationsReportLatency; + private MutableQuantiles obtainDelegationTokenLatency; private static volatile RouterMetrics INSTANCE = null; private static MetricsRegistry registry; @@ -92,6 +98,9 @@ private RouterMetrics() { getApplicationsReportLatency = registry.newQuantiles("getApplicationsReportLatency", "latency of get applications report", "ops", "latency", 10); + obtainDelegationTokenLatency = + registry.newQuantiles("obtainDelegationTokenLatency", + "latency of get delegation token", "ops", "latency", 10); } public static RouterMetrics getMetrics() { @@ -139,6 +148,11 @@ public long getNumSucceededMultipleAppsRetrieved() { } @VisibleForTesting + public long getNumSucceededDelegtationTokenObtain() { + return totalSucceededDelegationTokenObtain.lastStat().numSamples(); + } + + @VisibleForTesting public double getLatencySucceededAppsCreated() { return totalSucceededAppsCreated.lastStat().mean(); } @@ -164,6 +178,11 @@ public double getLatencySucceededMultipleGetAppReport() { } @VisibleForTesting + public double getLatencySucceededDelegationToken() { + return totalSucceededDelegationTokenObtain.lastStat().mean(); + } + + @VisibleForTesting public int getAppsFailedCreated() { return numAppsFailedCreated.value(); } @@ -188,6 +207,11 @@ public int getMultipleAppsFailedRetrieved() { return numMultipleAppsFailedRetrieved.value(); } + @VisibleForTesting + public int getDelegationTokenFailedObtain() { + return numDelegationTokenFailedObtain.value(); + } + public void succeededAppsCreated(long duration) { totalSucceededAppsCreated.add(duration); getNewApplicationLatency.add(duration); @@ -213,6 +237,11 @@ public void succeededMultipleAppsRetrieved(long duration) { getApplicationsReportLatency.add(duration); } + public void succeededDelegationTokenObtain(long duration) { + totalSucceededDelegationTokenObtain.add(duration); + obtainDelegationTokenLatency.add(duration); + } + public void incrAppsFailedCreated() { numAppsFailedCreated.incr(); } @@ -233,4 +262,8 @@ public void incrMultipleAppsFailedRetrieved() { numMultipleAppsFailedRetrieved.incr(); } + public void incrDelegtationTokenFailedObtain() { + numDelegationTokenFailedObtain.incr(); + } + } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index 07eaf97..7cc6855 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -93,9 +93,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade; @@ -141,6 +144,8 @@ private RouterPolicyFacade policyFacade; private RouterMetrics routerMetrics; private final Clock clock = new MonotonicClock(); + private final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); @Override public void init(String userName) { @@ -564,7 +569,44 @@ public GetApplicationReportResponse getApplicationReport( @Override public GetApplicationsResponse getApplications(GetApplicationsRequest request) throws YarnException, IOException { - throw new NotImplementedException(); + + long startTime = clock.getTime(); + if (request == null) { + routerMetrics.incrAppsFailedRetrieved(); + RouterServerUtil.logAndThrowException( + "Missing getApplications request or information.", + null); + } + Map subClustersActive = null; + try { + subClustersActive = federationFacade.getSubClusters(true); + } catch (YarnException e) { + routerMetrics.incrMultipleAppsFailedRetrieved(); + RouterServerUtil + .logAndThrowException("Read FederationStateStore failed.", e); + } + GetApplicationsResponse reps = null; + List apps = new ArrayList<>(); + try { + for (SubClusterId subClusterId : subClustersActive.keySet()) { + ApplicationClientProtocol clientRMProxy = + getClientRMProxyForSubCluster(subClusterId); + reps = clientRMProxy.getApplications(request); + if (reps != null) { + apps.addAll(reps.getApplicationList()); + } + } + } catch (Exception e){ + routerMetrics.incrMultipleAppsFailedRetrieved(); + RouterServerUtil + .logAndThrowException("Unable to getApplications.", e); + } + GetApplicationsResponse response = recordFactory + .newRecordInstance(GetApplicationsResponse.class); + response.setApplicationList(apps); + long stopTime = clock.getTime(); + routerMetrics.succeededMultipleAppsRetrieved(stopTime - startTime); + return response; } @Override @@ -674,7 +716,35 @@ public GetContainersResponse getContainers(GetContainersRequest request) @Override public GetDelegationTokenResponse getDelegationToken( GetDelegationTokenRequest request) throws YarnException, IOException { - throw new NotImplementedException(); + long startTime = clock.getTime(); + Map subClustersActive = + federationFacade.getSubClusters(true); + for (int i = 0; i < numSubmitRetries; ++i) { + SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive); + LOG.info("getDelegationToken try #{} on SubCluster {}", i, subClusterId); + ApplicationClientProtocol clientRMProxy = + getClientRMProxyForSubCluster(subClusterId); + GetDelegationTokenResponse response = null; + try { + response = clientRMProxy.getDelegationToken(request); + } catch (Exception e) { + LOG.warn("Unable to getDelegationToken in SubCluster {}", + subClusterId.getId(), e); + } + if (response != null) { + long stopTime = clock.getTime(); + routerMetrics.succeededDelegationTokenObtain(stopTime - startTime); + return response; + } else { + // Empty response from the ResourceManager. + // Blacklist this subcluster for this request. + subClustersActive.remove(subClusterId); + } + } + routerMetrics.incrDelegtationTokenFailedObtain(); + String errMsg = "Fail to getDelegationToken."; + LOG.error(errMsg); + throw new YarnException(errMsg); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java index 4c18ace..725e099 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java @@ -53,6 +53,8 @@ public static void init() { Assert.assertEquals(0, metrics.getAppsFailedKilled()); Assert.assertEquals(0, metrics.getAppsFailedRetrieved()); + Assert.assertEquals(0, metrics.getDelegationTokenFailedObtain()); + LOG.info("Test: aggregate metrics are updated correctly"); } @@ -235,6 +237,46 @@ public void testMulipleAppsReportFailed() { metrics.getMultipleAppsFailedRetrieved()); } + /** + * This test validates the correctness of the metric: Get Delegation Token + * successfully. + */ + @Test + public void testSucceededDelegationTokenObtaion() { + + long totalGoodBefore = metrics.getNumSucceededDelegtationTokenObtain(); + + goodSubCluster.getDelegationToken(100); + + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededDelegtationTokenObtain()); + Assert.assertEquals(100, + metrics.getLatencySucceededDelegationToken(), + 0); + + goodSubCluster.getDelegationToken(200); + + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededDelegtationTokenObtain()); + Assert.assertEquals(150, metrics.getLatencySucceededDelegationToken(), + 0); + } + + /** + * This test validates the correctness of the metric: Failed to retrieve + * Multiple Apps. + */ + @Test + public void testDelegationTokenFailedObtaion() { + + long totalBadbefore = metrics.getMultipleAppsFailedRetrieved(); + + badSubCluster.getApplicationsReport(); + + Assert.assertEquals(totalBadbefore + 1, + metrics.getMultipleAppsFailedRetrieved()); + } + // Records failures for all calls private class MockBadSubCluster { public void getNewApplication() { @@ -294,5 +336,11 @@ public void getApplicationsReport(long duration) { duration); metrics.succeededMultipleAppsRetrieved(duration); } + + public void getDelegationToken(long duration) { + LOG.info("Mocked: successful getDelegationToken call with duration {}", + duration); + metrics.succeededDelegationTokenObtain(duration); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index 87dfc95..6ae5fbc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -24,6 +24,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; @@ -351,6 +355,53 @@ public void testGetApplicationReport() } /** + * This test validates the correctness of GetApplications in case the + * application exists in the cluster. + */ + @Test + public void testGetApplicationsReport() + throws YarnException, IOException, InterruptedException { + System.out + .println("Test FederationClientInterceptor: Get Applications report"); + + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationSubmissionContext context = ApplicationSubmissionContext + .newInstance(appId, "", "", null, null, false, false, -1, null, null); + + SubmitApplicationRequest request = + SubmitApplicationRequest.newInstance(context); + // Submit the application we want the report later + SubmitApplicationResponse response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + + ApplicationId appId2 = + ApplicationId.newInstance(System.currentTimeMillis(), 2); + ApplicationSubmissionContext context2 = ApplicationSubmissionContext + .newInstance(appId2, "", "", null, null, false, false, -1, null, null); + + SubmitApplicationRequest request2 = + SubmitApplicationRequest.newInstance(context2); + // Submit the application we want the report later + SubmitApplicationResponse response2 = + interceptor.submitApplication(request2); + + Assert.assertNotNull(response2); + Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId2)); + + GetApplicationsRequest requestGet = + GetApplicationsRequest.newInstance(); + + GetApplicationsResponse responseGet = + interceptor.getApplications(requestGet); + + Assert.assertNotNull(responseGet); + Assert.assertEquals(responseGet.getApplicationList().size(), 2); + } + + /** * This test validates the correctness of GetApplicationReport in case the * application does not exist in StateStore. */ @@ -400,4 +451,23 @@ public void testGetApplicationEmptyRequest() } } + /** + * This test validates the correctness of Get Delegation Token in case of + * empty request. + */ + @Test + public void testGetDelegationToken() throws IOException, YarnException { + System.out.println( + "Test FederationClientInterceptor: Get Delegation Token"); + + GetDelegationTokenResponse response = + interceptor.getDelegationToken(null); + Assert.assertNotNull(response); + + GetDelegationTokenRequest request = + GetDelegationTokenRequest.newInstance("x"); + + response = interceptor.getDelegationToken(request); + Assert.assertNotNull(response); + } }