diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index a721fe0d8ec..34cacc78439 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -682,10 +682,21 @@ public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) throw new NotImplementedException("Code is not implemented"); } + //Now only support get consistent queue across all sub-clusters @Override public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + Map subclusters = + federationFacade.getSubClusters(true); + ClientMethod remoteMethod = new ClientMethod("getQueueInfo", + new Class[] {GetQueueInfoRequest.class}, + new Object[] {request}); + ArrayList clusterList = new ArrayList<>(subclusters.keySet()); + Map + queueInfoResponseMap = invokeConcurrent(clusterList, + remoteMethod, GetQueueInfoResponse.class); + return RouterYarnClientUtils.mergeQueueInfo( + queueInfoResponseMap.values()); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java index 50abcf40a80..9bf3ee1dce0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java @@ -17,9 +17,17 @@ */ package org.apache.hadoop.yarn.server.router.clientrm; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; + import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueStatistics; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; +import org.apache.hadoop.yarn.exceptions.YarnException; /** * Util class for Router Yarn client API calls. @@ -52,4 +60,116 @@ public static GetClusterMetricsResponse merge( } return GetClusterMetricsResponse.newInstance(tmp); } + + public static GetQueueInfoResponse mergeQueueInfo( + Collection responses) throws + YarnException { + List list = new ArrayList(responses); + QueueInfo tmp = QueueInfo. + newInstance(null, 0, 0, + 0, null,null, + null, null, + null, null,true); + //The fixed field should be same. + if (list != null) { + GetQueueInfoResponse first = list.get(0); + tmp.setQueueName(first.getQueueInfo().getQueueName()); + tmp.setQueueState(first.getQueueInfo().getQueueState()); + tmp.setAccessibleNodeLabels(first.getQueueInfo().getAccessibleNodeLabels()); + tmp.setDefaultNodeLabelExpression(first.getQueueInfo().getDefaultNodeLabelExpression()); + tmp.setPreemptionDisabled(first.getQueueInfo().getPreemptionDisabled()); + tmp.setIntraQueuePreemptionDisabled(first.getQueueInfo().getIntraQueuePreemptionDisabled()); + tmp.setQueueConfigurations(first.getQueueInfo().getQueueConfigurations()); + tmp.setQueueStatistics(QueueStatistics. + newInstance(0,0,0,0, + 0,0,0,0, + 0,0,0, + 0,0,0,0)); + + } + for (GetQueueInfoResponse response : list) { + if (tmp.getQueueName() != response.getQueueInfo().getQueueName()) { + throw new YarnException("The QueueName not consistent across sub clusters."); + } + if(tmp.getQueueState() != response.getQueueInfo().getQueueState()) { + throw new YarnException("The QueueState not consistent across sub clusters."); + } + if (!tmp.getAccessibleNodeLabels(). + equals(response.getQueueInfo().getAccessibleNodeLabels())) { + throw new YarnException("The Queue AccessibleNodeLabels " + + "not consistent across sub clusters."); + } + if (tmp.getDefaultNodeLabelExpression() != + response.getQueueInfo().getDefaultNodeLabelExpression()) { + throw new YarnException("The Queue DefaultNodeLabelExpression " + + "not consistent across sub clusters."); + } + if (tmp.getPreemptionDisabled() != response.getQueueInfo().getPreemptionDisabled()) { + throw new YarnException("The Queue PreemptionDisabled " + + "not consistent across sub clusters."); + } + if (tmp.getIntraQueuePreemptionDisabled() != response.getQueueInfo().getIntraQueuePreemptionDisabled()) { + throw new YarnException("The Queue IntraQueuePreemptionDisabled " + + "not consistent across sub clusters."); + } + if (!tmp.getQueueConfigurations().equals(response.getQueueInfo().getQueueConfigurations())) { + throw new YarnException("The Queue QueueConfigurations " + + "not consistent across sub clusters."); + } + //todo how to get total absolute capacity across all sub clusters. + //Now just sum the capacity related values, it's wrong. + //We should add the absolute value. + tmp.setCapacity(tmp.getCapacity() + response.getQueueInfo().getCapacity()); + tmp.setMaximumCapacity(tmp.getMaximumCapacity() + + response.getQueueInfo().getMaximumCapacity()); + tmp.setCurrentCapacity(tmp.getCurrentCapacity() + + response.getQueueInfo().getCurrentCapacity()); + + List currChild = new ArrayList<>(tmp.getChildQueues()); + currChild.addAll(response.getQueueInfo().getChildQueues()); + tmp.setChildQueues(currChild); + List currReport = new ArrayList<>(tmp.getApplications()); + currReport.addAll(response.getQueueInfo().getApplications()); + tmp.setApplications(currReport); + QueueStatistics currStatistics = tmp.getQueueStatistics(); + currStatistics.setAllocatedContainers(currStatistics.getAllocatedContainers() + + response.getQueueInfo().getQueueStatistics().getAllocatedContainers()); + currStatistics.setAllocatedMemoryMB(currStatistics.getAllocatedMemoryMB() + + response.getQueueInfo().getQueueStatistics().getAllocatedMemoryMB()); + currStatistics.setAllocatedVCores(currStatistics.getAllocatedVCores() + + response.getQueueInfo().getQueueStatistics().getAllocatedVCores()); + currStatistics.setAvailableMemoryMB(currStatistics.getAvailableMemoryMB() + + response.getQueueInfo().getQueueStatistics().getAvailableMemoryMB()); + currStatistics.setAvailableVCores(currStatistics.getAvailableVCores() + + response.getQueueInfo().getQueueStatistics().getAvailableVCores()); + currStatistics.setNumActiveUsers(currStatistics.getNumActiveUsers() + + response.getQueueInfo().getQueueStatistics().getNumActiveUsers()); + currStatistics.setNumAppsCompleted(currStatistics.getNumAppsCompleted() + + response.getQueueInfo().getQueueStatistics().getNumAppsCompleted()); + currStatistics.setNumAppsFailed(currStatistics.getNumAppsFailed() + + response.getQueueInfo().getQueueStatistics().getNumAppsFailed()); + currStatistics.setNumAppsKilled(currStatistics.getNumAppsKilled() + + response.getQueueInfo().getQueueStatistics().getNumAppsKilled()); + currStatistics.setNumAppsPending(currStatistics.getNumAppsPending() + + response.getQueueInfo().getQueueStatistics().getNumAppsPending()); + currStatistics.setNumAppsRunning(currStatistics.getNumAppsRunning() + + response.getQueueInfo().getQueueStatistics().getNumAppsRunning()); + currStatistics.setNumAppsSubmitted(currStatistics.getNumAppsSubmitted() + + response.getQueueInfo().getQueueStatistics().getNumAppsSubmitted()); + currStatistics.setPendingContainers(currStatistics.getPendingContainers() + + response.getQueueInfo().getQueueStatistics().getPendingContainers()); + currStatistics.setPendingMemoryMB(currStatistics.getPendingMemoryMB() + + response.getQueueInfo().getQueueStatistics().getPendingMemoryMB()); + currStatistics.setPendingVCores(currStatistics.getPendingVCores() + + response.getQueueInfo().getQueueStatistics().getPendingVCores()); + currStatistics.setReservedContainers(currStatistics.getReservedContainers() + + response.getQueueInfo().getQueueStatistics().getReservedContainers()); + currStatistics.setReservedMemoryMB(currStatistics.getReservedMemoryMB() + + response.getQueueInfo().getQueueStatistics().getReservedMemoryMB()); + currStatistics.setReservedVCores(currStatistics.getReservedVCores() + + response.getQueueInfo().getQueueStatistics().getReservedVCores()); + tmp.setQueueStatistics(currStatistics); + } + return GetQueueInfoResponse.newInstance(tmp); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index ee6e7b8eaf6..d358f258034 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -27,16 +27,7 @@ import java.util.Map; import org.apache.hadoop.yarn.MockApps; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; -import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; -import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.*; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; @@ -431,4 +422,13 @@ public void testGetClusterMetricsRequest() throws YarnException, IOException { GetClusterMetricsResponse.class); Assert.assertEquals(true, clusterMetrics.isEmpty()); } + + @Test + public void testGetQueueInfo() throws YarnException, IOException { + LOG.info("Test FederationClientInterceptor : Get QueueInfo"); + GetQueueInfoRequest request = GetQueueInfoRequest.newInstance("default", + false,false,false); + GetQueueInfoResponse response = interceptor.getQueueInfo(request); + //todo + } }