diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/AppAttemptInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/AppAttemptInfo.java index cd30bf0..cae1743 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/AppAttemptInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/AppAttemptInfo.java @@ -104,4 +104,43 @@ public long getFinishedTime() { return finishedTime; } + public void setAppAttemptId(String appAttemptId) { + this.appAttemptId = appAttemptId; + } + + public void setHost(String host) { + this.host = host; + } + + public void setRpcPort(int rpcPort) { + this.rpcPort = rpcPort; + } + + public void setTrackingUrl(String trackingUrl) { + this.trackingUrl = trackingUrl; + } + + public void setOriginalTrackingUrl(String originalTrackingUrl) { + this.originalTrackingUrl = originalTrackingUrl; + } + + public void setDiagnosticsInfo(String diagnosticsInfo) { + this.diagnosticsInfo = diagnosticsInfo; + } + + public void setAppAttemptState(YarnApplicationAttemptState appAttemptState) { + this.appAttemptState = appAttemptState; + } + + public void setAmContainerId(String amContainerId) { + this.amContainerId = amContainerId; + } + + public void setStartedTime(long startedTime) { + this.startedTime = startedTime; + } + + public void setFinishedTime(long finishedTime) { + this.finishedTime = finishedTime; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeLabelsInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeLabelsInfo.java index 2c3a8a5..0b3b90a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeLabelsInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeLabelsInfo.java @@ -49,7 +49,14 @@ public NodeLabelsInfo(List nodeLabels) { this.nodeLabelsInfo.add(new NodeLabelInfo(label)); } } - + + public NodeLabelsInfo(HashSet nodeLabels) { + this.nodeLabelsInfo = new ArrayList(); + for (NodeLabel label : nodeLabels) { + this.nodeLabelsInfo.add(new NodeLabelInfo(label)); + } + } + public NodeLabelsInfo(Set nodeLabelsName) { this.nodeLabelsInfo = new ArrayList(); for (String labelName : nodeLabelsName) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeToLabelsInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeToLabelsInfo.java index 0b6e4bc..3b35aef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeToLabelsInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeToLabelsInfo.java @@ -38,4 +38,8 @@ public NodeToLabelsInfo() { public HashMap getNodeToLabels() { return nodeToLabels; } + + public void setNodeToLabels(HashMap nodeToLabels) { + this.nodeToLabels = nodeToLabels; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/StatisticsItemInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/StatisticsItemInfo.java index e12dd5f..0790668 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/StatisticsItemInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/StatisticsItemInfo.java @@ -53,4 +53,15 @@ public long getCount() { return count; } + public void setCount(long count) { + this.count = count; + } + + public void setState(YarnApplicationState state) { + this.state = state; + } + + public void setType(String type) { + this.type = type; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index 626d794..7dba7a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -969,7 +969,6 @@ public ClusterMetricsInfo call() { } // Collect all the responses in parallel - for (int i = 0; i < subClustersActive.size(); i++) { try { Future future = compSvc.take(); @@ -979,7 +978,7 @@ public ClusterMetricsInfo call() { RouterWebServiceUtil.mergeMetrics(metrics, metricsResponse); } } catch (Throwable e) { - LOG.warn("Failed to get nodes report ", e); + LOG.warn("Failed to get Cluster Metrics ", e); } } @@ -1021,19 +1020,125 @@ public AppActivitiesInfo getAppActivities(HttpServletRequest hsr, @Override public ApplicationStatisticsInfo getAppStatistics(HttpServletRequest hsr, Set stateQueries, Set typeQueries) { - throw new NotImplementedException(); + ApplicationStatisticsInfo appStatisticInfo = new ApplicationStatisticsInfo(); + Map subClustersActive = null; + try { + subClustersActive = federationFacade.getSubClusters(true); + } catch (YarnException e) { + LOG.error(e.getLocalizedMessage()); + return appStatisticInfo; + } + + final HttpServletRequest hsrCopy = clone(hsr); + // Send the requests in parallel + CompletionService compSvc = + new ExecutorCompletionService<>(this.threadpool); + + for (final SubClusterInfo info : subClustersActive.values()) { + compSvc.submit(new Callable() { + @Override + public ApplicationStatisticsInfo call() { + DefaultRequestInterceptorREST interceptor = + getOrCreateInterceptorForSubCluster( + info.getSubClusterId(), info.getRMWebServiceAddress()); + try { + return interceptor.getAppStatistics(hsrCopy, stateQueries, typeQueries); + } catch (Exception e) { + LOG.error("Subcluster {} failed to return AppStatistics.", + info.getSubClusterId(), e); + return null; + } + } + }); + } + // Collect all the responses in parallel + for (int i = 0; i < subClustersActive.size(); i++) { + try { + Future future = compSvc.take(); + ApplicationStatisticsInfo xinfo = future.get(); + if (xinfo != null) { + appStatisticInfo = RouterWebServiceUtil.mergeApplicationStatisticsInfo(appStatisticInfo, xinfo); + } + } catch (Throwable e) { + LOG.warn("Failed to get App Statistics report ", e); + } + } + return appStatisticInfo; } @Override public AppState getAppState(HttpServletRequest hsr, String appId) throws AuthorizationException { - throw new NotImplementedException(); + ApplicationId applicationId = null; + try { + applicationId = ApplicationId.fromString(appId); + } catch (IllegalArgumentException e) { + LOG.error("Unable to get the AppState {} ", + appId, e); + return null; + } + + SubClusterInfo subClusterInfo = getHomeSubClusterInfo(applicationId); + if (subClusterInfo == null) { + return null; + } + + DefaultRequestInterceptorREST interceptor = + getOrCreateInterceptorForSubCluster( + subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + return interceptor.getAppState(hsr, appId); } @Override public NodeToLabelsInfo getNodeToLabels(HttpServletRequest hsr) throws IOException { - throw new NotImplementedException(); + NodeToLabelsInfo nodeToLabelsInfo = new NodeToLabelsInfo(); + Map subClustersActive = null; + try { + subClustersActive = federationFacade.getSubClusters(true); + } catch (YarnException e) { + LOG.error(e.getLocalizedMessage()); + return nodeToLabelsInfo; + } + + final HttpServletRequest hsrCopy = clone(hsr); + // Send the requests in parallel + CompletionService compSvc = + new ExecutorCompletionService<>(this.threadpool); + + for (final SubClusterInfo info : subClustersActive.values()) { + compSvc.submit(new Callable() { + @Override + public NodeToLabelsInfo call() { + DefaultRequestInterceptorREST interceptor = + getOrCreateInterceptorForSubCluster( + info.getSubClusterId(), info.getRMWebServiceAddress()); + try { + return interceptor.getNodeToLabels(hsrCopy); + } catch (Exception e) { + LOG.error("Subcluster {} failed to return NodeToLabels.", + info.getSubClusterId(), e); + return null; + } + } + }); + } + + // Collect all the responses in parallel + for (int i = 0; i < subClustersActive.size(); i++) { + try { + Future future = compSvc.take(); + NodeToLabelsInfo xinfo = future.get(); + + if (xinfo != null) { + nodeToLabelsInfo = RouterWebServiceUtil.mergeNodeToLabels(nodeToLabelsInfo, xinfo); + } + } catch (Throwable e) { + LOG.error("Failed to get NodeToLabels ", e); + } + } + + return nodeToLabelsInfo; } @Override @@ -1081,27 +1186,113 @@ public NodeLabelsInfo getLabelsOnNode(HttpServletRequest hsr, String nodeId) @Override public AppPriority getAppPriority(HttpServletRequest hsr, String appId) throws AuthorizationException { - throw new NotImplementedException(); + if (appId == null || appId.isEmpty()) { + throw new NotFoundException("appId, " + appId + ", is empty or null"); + } + ApplicationId applicationId = null; + try { + applicationId = ApplicationId.fromString(appId); + } catch (IllegalArgumentException e) { + LOG.error("Unable to get the AppPriority {} ", + appId, e); + return null; + } + + SubClusterInfo subClusterInfo = getHomeSubClusterInfo(applicationId); + if (subClusterInfo == null) { + return null; + } + + DefaultRequestInterceptorREST interceptor = + getOrCreateInterceptorForSubCluster( + subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + return interceptor.getAppPriority(hsr, appId); } @Override public Response updateApplicationPriority(AppPriority targetPriority, HttpServletRequest hsr, String appId) throws AuthorizationException, YarnException, InterruptedException, IOException { - throw new NotImplementedException(); + if (appId == null || appId.isEmpty()) { + throw new NotFoundException("appId, " + appId + ", is empty or null"); + } + if (targetPriority == null) { + throw new NotFoundException("targetPriority, " + targetPriority + ", is emprty."); + } + ApplicationId applicationId = null; + try { + applicationId = ApplicationId.fromString(appId); + } catch (IllegalArgumentException e) { + LOG.error("Unable to update the AppPriority {} ", + appId, e); + return null; + } + + SubClusterInfo subClusterInfo = getHomeSubClusterInfo(applicationId); + if (subClusterInfo == null) { + return null; + } + + DefaultRequestInterceptorREST interceptor = + getOrCreateInterceptorForSubCluster( + subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + return interceptor.updateApplicationPriority(targetPriority, hsr, appId); } @Override public AppQueue getAppQueue(HttpServletRequest hsr, String appId) throws AuthorizationException { - throw new NotImplementedException(); + if (appId == null || appId.isEmpty()) { + throw new NotFoundException("appId, " + appId + ", is empty or null"); + } + ApplicationId applicationId = null; + try { + applicationId = ApplicationId.fromString(appId); + } catch (IllegalArgumentException e) { + LOG.error("Unable to get the AppQueue {} ", + appId, e); + return null; + } + + SubClusterInfo subClusterInfo = getHomeSubClusterInfo(applicationId); + if (subClusterInfo == null) { + return null; + } + + DefaultRequestInterceptorREST interceptor = + getOrCreateInterceptorForSubCluster( + subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + return interceptor.getAppQueue(hsr, appId); } @Override public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr, String appId) throws AuthorizationException, YarnException, InterruptedException, IOException { - throw new NotImplementedException(); + if (appId == null || appId.isEmpty()) { + throw new NotFoundException("appId, " + appId + ", is empty or null"); + } + if (targetQueue == null || targetQueue.getQueue().isEmpty()) { + throw new NotFoundException("targetQueue, " + targetQueue + ", is empty or null"); + } + ApplicationId applicationId = null; + try { + applicationId = ApplicationId.fromString(appId); + } catch (IllegalArgumentException e) { + LOG.error("Unable to update the AppQueue {} ", + appId, e); + return null; + } + + SubClusterInfo subClusterInfo = getHomeSubClusterInfo(applicationId); + if (subClusterInfo == null) { + return null; + } + + DefaultRequestInterceptorREST interceptor = + getOrCreateInterceptorForSubCluster( + subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + return interceptor.updateAppQueue(targetQueue, hsr, appId); } @Override @@ -1162,44 +1353,206 @@ public Response listReservation(String queue, String reservationId, @Override public AppTimeoutInfo getAppTimeout(HttpServletRequest hsr, String appId, String type) throws AuthorizationException { - throw new NotImplementedException(); + if (appId == null || appId.isEmpty()) { + throw new NotFoundException("appId, " + appId + ", is empty or null"); + } + if (type == null || type.isEmpty()) { + throw new NotFoundException("type, " + type + ", is empty or null"); + } + ApplicationId applicationId = null; + try { + applicationId = ApplicationId.fromString(appId); + } catch (IllegalArgumentException e) { + LOG.error("Unable to get the AppTimeout {} ", + appId, e); + return null; + } + + SubClusterInfo subClusterInfo = getHomeSubClusterInfo(applicationId); + if (subClusterInfo == null) { + return null; + } + + DefaultRequestInterceptorREST interceptor = + getOrCreateInterceptorForSubCluster( + subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + return interceptor.getAppTimeout(hsr, appId, type); } @Override public AppTimeoutsInfo getAppTimeouts(HttpServletRequest hsr, String appId) throws AuthorizationException { - throw new NotImplementedException(); + if (appId == null || appId.isEmpty()) { + throw new NotFoundException("appId, " + appId + ", is empty or null"); + } + ApplicationId applicationId = null; + try { + applicationId = ApplicationId.fromString(appId); + } catch (IllegalArgumentException e) { + LOG.error("Unable to get the AppTimeouts {} ", + appId, e); + return null; + } + + SubClusterInfo subClusterInfo = getHomeSubClusterInfo(applicationId); + if (subClusterInfo == null) { + return null; + } + + DefaultRequestInterceptorREST interceptor = + getOrCreateInterceptorForSubCluster( + subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + return interceptor.getAppTimeouts(hsr, appId); } @Override public Response updateApplicationTimeout(AppTimeoutInfo appTimeout, HttpServletRequest hsr, String appId) throws AuthorizationException, YarnException, InterruptedException, IOException { - throw new NotImplementedException(); + if (appId == null || appId.isEmpty()) { + throw new NotFoundException("appId, " + appId + ", is empty or null"); + } + if (appTimeout == null) { + throw new NotFoundException("appTimeout, " + appTimeout + ", is empty or null"); + } + + ApplicationId applicationId = null; + try { + applicationId = ApplicationId.fromString(appId); + } catch (IllegalArgumentException e) { + LOG.error("Unable to update the ApplicationTimeout {} ", + appId, e); + return null; + } + + SubClusterInfo subClusterInfo = getHomeSubClusterInfo(applicationId); + if (subClusterInfo == null) { + return null; + } + + DefaultRequestInterceptorREST interceptor = + getOrCreateInterceptorForSubCluster( + subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + return interceptor.updateApplicationTimeout(appTimeout, hsr, appId); } @Override public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) { - throw new NotImplementedException(); + if (appId == null || appId.isEmpty()) { + throw new NotFoundException("appId, " + appId + ", is empty or null"); + } + ApplicationId applicationId = null; + try { + applicationId = ApplicationId.fromString(appId); + } catch (IllegalArgumentException e) { + LOG.error("Unable to get the AppAttempts {} ", + appId, e); + return null; + } + + SubClusterInfo subClusterInfo = getHomeSubClusterInfo(applicationId); + if (subClusterInfo == null) { + return null; + } + + DefaultRequestInterceptorREST interceptor = + getOrCreateInterceptorForSubCluster( + subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + return interceptor.getAppAttempts(hsr, appId); } @Override public AppAttemptInfo getAppAttempt(HttpServletRequest req, HttpServletResponse res, String appId, String appAttemptId) { - throw new NotImplementedException(); + if (appId == null || appId.isEmpty()) { + throw new NotFoundException("appId, " + appId + ", is empty or null"); + } + if (appAttemptId == null || appAttemptId.isEmpty()) { + throw new NotFoundException("appAttemptId, " + appAttemptId + ", is empty or null"); + } + + ApplicationId applicationId = null; + try { + applicationId = ApplicationId.fromString(appId); + } catch (IllegalArgumentException e) { + LOG.error("Unable to get the AppAttempt appId: {} appAttemptId: {} ", + appId, appAttemptId, e); + return null; + } + + SubClusterInfo subClusterInfo = getHomeSubClusterInfo(applicationId); + if (subClusterInfo == null) { + return null; + } + + DefaultRequestInterceptorREST interceptor = + getOrCreateInterceptorForSubCluster( + subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + return interceptor.getAppAttempt(req, res, appId, appAttemptId); } @Override public ContainersInfo getContainers(HttpServletRequest req, HttpServletResponse res, String appId, String appAttemptId) { - throw new NotImplementedException(); + if (appId == null || appId.isEmpty()) { + throw new NotFoundException("appId, " + appId + ", is empty or null"); + } + if (appAttemptId == null || appAttemptId.isEmpty()) { + throw new NotFoundException("appAttemptId, " + appAttemptId + ", is empty or null"); + } + + ApplicationId applicationId = null; + try { + applicationId = ApplicationId.fromString(appId); + } catch (IllegalArgumentException e) { + LOG.warn("Unable to get the Containers, appId: {} appAttemptId: {} ", + appId, appAttemptId, e); + return null; + } + + SubClusterInfo subClusterInfo = getHomeSubClusterInfo(applicationId); + if (subClusterInfo == null) { + return null; + } + + DefaultRequestInterceptorREST interceptor = + getOrCreateInterceptorForSubCluster( + subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + return interceptor.getContainers(req, res, appId, appAttemptId); } @Override public ContainerInfo getContainer(HttpServletRequest req, HttpServletResponse res, String appId, String appAttemptId, String containerId) { - throw new NotImplementedException(); + if (appId == null || appId.isEmpty()) { + throw new NotFoundException("appId, " + appId + ", is empty or null"); + } + if (appAttemptId == null || appAttemptId.isEmpty()) { + throw new NotFoundException("appAttemptId, " + appAttemptId + ", is empty or null"); + } + if (containerId == null || containerId.isEmpty()) { + throw new NotFoundException("containerId, " + containerId + ", is empty or null"); + } + + ApplicationId applicationId = null; + try { + applicationId = ApplicationId.fromString(appId); + } catch (IllegalArgumentException e) { + LOG.error("Unable to get the AppAttempt appId: {} appAttemptId: {} containerId: {} ", + appId, appAttemptId, containerId, e); + return null; + } + + SubClusterInfo subClusterInfo = getHomeSubClusterInfo(applicationId); + if (subClusterInfo == null) { + return null; + } + + DefaultRequestInterceptorREST interceptor = + getOrCreateInterceptorForSubCluster( + subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + return interceptor.getContainer(req, res, appId, appAttemptId, containerId); } @Override @@ -1216,4 +1569,22 @@ public void shutdown() { threadpool.shutdown(); } } + + private SubClusterInfo getHomeSubClusterInfo(ApplicationId applicationId) { + SubClusterInfo subClusterInfo = null; + SubClusterId subClusterId = null; + try { + subClusterId = + federationFacade.getApplicationHomeSubCluster(applicationId); + if (subClusterId == null) { + LOG.error("Can't get HomeSubCluster by applicationId {} ", applicationId); + return null; + } + subClusterInfo = federationFacade.getSubCluster(subClusterId); + } catch (YarnException e) { + LOG.error("Get HomeSubClusterInfo by applicationId {} failed.", applicationId, e); + return null; + } + return subClusterInfo; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java index 40bdbd8..ac730f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.HashSet; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.HttpHeaders; @@ -39,12 +40,17 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo; import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.ForbiddenException; @@ -461,4 +467,67 @@ public static void mergeMetrics(ClusterMetricsInfo metrics, return header; } + /** + * Merges a list of ApplicationStatisticsInfo grouping by type and state. Our current policy is + * to merge the statistics reports from the reacheable SubClusters. + * + * @param first first ApplicationStatisticsInfo to merge + * @param second second ApplicationStatisticsInfo to merge + * @return the merged ApplicationStatisticsInfo + */ + public static ApplicationStatisticsInfo mergeApplicationStatisticsInfo(ApplicationStatisticsInfo first, + ApplicationStatisticsInfo second) { + ApplicationStatisticsInfo result = new ApplicationStatisticsInfo(); + HashMap map = new HashMap(); + + for (StatisticsItemInfo item : first.getStatItems()) { + map.put(item.getType() + item.getState().toString(), + new StatisticsItemInfo(item.getState(), item.getType(), item.getCount())); + } + + String key = null; + for (StatisticsItemInfo item : second.getStatItems()) { + key = item.getType() + item.getState().toString(); + StatisticsItemInfo a = map.get(key); + if (a != null) { + a.setCount(a.getCount() + item.getCount()); + map.put(key, a); + } else { + map.put(key, item); + } + } + for (StatisticsItemInfo item : map.values()) { + result.add(item); + } + return result; + } + + /** + * Merges a list of NodeToLabelsInfo grouping by type and state. Our current policy is + * to merge the nodes reports from the reacheable SubClusters. + * + * @param first first NodeToLabelsInfo to merge + * @param second second NodeToLabelsInfo to merge + * @return the merged NodeToLabelsInfo + */ + public static NodeToLabelsInfo mergeNodeToLabels(NodeToLabelsInfo first, + NodeToLabelsInfo second) { + NodeToLabelsInfo nodeToLabelsInfo = new NodeToLabelsInfo(); + HashMap nodeToLabels = first.getNodeToLabels(); + for (Map.Entry item : second.getNodeToLabels().entrySet()) { + if (nodeToLabels.containsKey(item.getKey())) { + NodeLabelsInfo a = nodeToLabels.get(item.getKey()); + NodeLabelsInfo b = item.getValue(); + + HashSet hashSet = new HashSet<>(); + hashSet.addAll(a.getNodeLabels()); + hashSet.addAll(b.getNodeLabels()); + nodeToLabels.put(item.getKey(), new NodeLabelsInfo(hashSet)); + } else { + nodeToLabels.put(item.getKey(), item.getValue()); + } + } + nodeToLabelsInfo.setNodeToLabels(nodeToLabels); + return nodeToLabelsInfo; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java index 9f54582..8d77711 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java @@ -20,20 +20,32 @@ import java.io.IOException; import java.net.ConnectException; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ApplicationTimeout; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo; @@ -43,7 +55,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo; import org.apache.hadoop.yarn.webapp.NotFoundException; +import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; +import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,7 +82,8 @@ // This property allows us to write tests for specific scenario as YARN RM // down e.g. network issue, failover. private boolean isRunning = true; - private HashSet applicationMap = new HashSet<>(); + private HashMap applicationMap + = new HashMap<>(); private void validateRunning() throws ConnectException { if (!isRunning) { @@ -89,7 +112,24 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp, ApplicationId appId = ApplicationId.fromString(newApp.getApplicationId()); LOG.info("Application submitted: " + appId); - applicationMap.add(appId); + ApplicationReport report = ApplicationReport.newInstance(appId, + ApplicationAttemptId.newInstance(appId, 1), + null, newApp.getQueue(), + null, null, 0, null, + YarnApplicationState.ACCEPTED, "", null, + 0, 0, null, + null, null, + 0, null, null, null, + false, Priority.newInstance(newApp.getPriority()), + null, null); + HashMap timeouts + = new HashMap<>(); + timeouts.put(ApplicationTimeoutType.LIFETIME, + ApplicationTimeout.newInstance(ApplicationTimeoutType.LIFETIME, + "UNLIMITED", + 10)); + report.setApplicationTimeouts(timeouts); + applicationMap.put(appId, report); return Response.status(Status.ACCEPTED).header(HttpHeaders.LOCATION, "") .entity(getSubClusterId()).build(); } @@ -102,7 +142,7 @@ public AppInfo getApp(HttpServletRequest hsr, String appId, } ApplicationId applicationId = ApplicationId.fromString(appId); - if (!applicationMap.contains(applicationId)) { + if (!applicationMap.containsKey(applicationId)) { throw new NotFoundException("app with id: " + appId + " not found"); } @@ -137,13 +177,18 @@ public Response updateAppState(AppState targetState, HttpServletRequest hsr, validateRunning(); ApplicationId applicationId = ApplicationId.fromString(appId); - if (!applicationMap.remove(applicationId)) { - throw new ApplicationNotFoundException( - "Trying to kill an absent application: " + appId); - } - if (targetState == null) { return Response.status(Status.BAD_REQUEST).build(); + } else if (targetState != null && targetState.getState().equals("KILLED")) { + if (applicationMap.remove(applicationId) == null) { + throw new ApplicationNotFoundException( + "Trying to kill an absent application: " + appId); + } + } else { + if (applicationMap.containsKey(applicationId)) { + applicationMap.get(applicationId).setYarnApplicationState( + YarnApplicationState.valueOf(targetState.getState())); + } } LOG.info("Force killing application: " + appId); @@ -192,6 +237,263 @@ public ClusterMetricsInfo getClusterMetricsInfo() { return metrics; } + @Override + public ApplicationStatisticsInfo getAppStatistics(HttpServletRequest hsr, + Set stateQueries, + Set typeQueries) { + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + ConcurrentHashMap result = new ConcurrentHashMap(); + ApplicationStatisticsInfo metrics = new ApplicationStatisticsInfo(); + Long count = Long.valueOf(0); + for (HashMap.Entry item : applicationMap.entrySet()) { + count = result.get(item.getValue().getYarnApplicationState().toString()); + if (count == null) { + result.put(item.getValue().getYarnApplicationState().toString(), 1l); + } else { + result.put(item.getValue().getYarnApplicationState().toString(), count + 1l); + } + } + + for (HashMap.Entry item : result.entrySet()) { + metrics.add(new StatisticsItemInfo(YarnApplicationState.valueOf(item.getKey()), + "MapReduce", item.getValue())); + } + return metrics; + } + + @Override + public AppState getAppState(HttpServletRequest hsr, String appId) { + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + ApplicationId applicationId = ApplicationId.fromString(appId); + if (!applicationMap.containsKey(applicationId)) { + throw new NotFoundException("app with id: " + appId + " not found"); + } + YarnApplicationState state = applicationMap.get(applicationId).getYarnApplicationState(); + return new AppState(state.toString()); + } + + + @Override + public NodeToLabelsInfo getNodeToLabels(HttpServletRequest hsr) { + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + String cpuLabel = "CPU"; + HashSet cpulables = new HashSet<>(); + cpulables.add(cpuLabel); + + String gpuLabel = "GPU"; + HashSet gpulables = new HashSet<>(); + gpulables.add(gpuLabel); + + NodeLabelsInfo cpuNode = new NodeLabelsInfo(cpulables); + NodeLabelsInfo gpuNode = new NodeLabelsInfo(gpulables); + + NodeToLabelsInfo info = new NodeToLabelsInfo(); + HashMap nodeLabels = new HashMap<>(); + nodeLabels.put("node1", cpuNode); + nodeLabels.put("node2", gpuNode); + info.setNodeToLabels(nodeLabels); + return info; + } + + @Override + public Response updateApplicationPriority(AppPriority targetPriority, + HttpServletRequest hsr, String appId) throws AuthorizationException, + YarnException, InterruptedException, IOException { + validateRunning(); + + ApplicationId applicationId = ApplicationId.fromString(appId); + if (targetPriority == null) { + return Response.status(Status.BAD_REQUEST).build(); + } + + LOG.info("update application priority: " + appId); + applicationMap.get(applicationId) + .setPriority(Priority.newInstance(targetPriority.getPriority())); + return Response.status(Status.OK).build(); + } + + @Override + public AppPriority getAppPriority(HttpServletRequest hsr, String appId) + throws AuthorizationException { + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + ApplicationId applicationId = ApplicationId.fromString(appId); + if (!applicationMap.containsKey(applicationId)) { + throw new NotFoundException("app with id: " + appId + " not found"); + } + int priority = applicationMap.get(applicationId).getPriority().getPriority(); + return new AppPriority(priority); + } + + @Override + public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr, + String appId) throws AuthorizationException, YarnException, + InterruptedException, IOException { + validateRunning(); + ApplicationId applicationId = ApplicationId.fromString(appId); + if (targetQueue == null) { + return Response.status(Status.BAD_REQUEST).build(); + } + + LOG.info("update application queue: " + appId); + applicationMap.get(applicationId).setQueue(targetQueue.getQueue()); + return Response.status(Status.OK).build(); + } + + @Override + public AppQueue getAppQueue(HttpServletRequest hsr, String appId) + throws AuthorizationException { + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + ApplicationId applicationId = ApplicationId.fromString(appId); + if (!applicationMap.containsKey(applicationId)) { + throw new NotFoundException("app with id: " + appId + " not found"); + } + String queue = applicationMap.get(applicationId).getQueue(); + return new AppQueue(queue); + } + + @Override + public AppTimeoutInfo getAppTimeout(HttpServletRequest hsr, String appId, + String type) throws AuthorizationException { + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + ApplicationId applicationId = ApplicationId.fromString(appId); + if (!applicationMap.containsKey(applicationId)) { + throw new NotFoundException("app with id: " + appId + " not found"); + } + Map timeouts = + applicationMap.get(applicationId).getApplicationTimeouts(); + + if (!timeouts.containsKey(ApplicationTimeoutType.valueOf(type))) { + throw new NotFoundException("timeout with id: " + appId + " not found"); + } + ApplicationTimeout value = timeouts.get(ApplicationTimeoutType.valueOf(type)); + + AppTimeoutInfo timeoutInfo = new AppTimeoutInfo(); + timeoutInfo.setExpiryTime(value.getExpiryTime()); + timeoutInfo.setTimeoutType(value.getTimeoutType()); + timeoutInfo.setRemainingTime(value.getRemainingTime()); + return timeoutInfo; + } + + @Override + public AppTimeoutsInfo getAppTimeouts(HttpServletRequest hsr, String appId) + throws AuthorizationException { + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + ApplicationId applicationId = ApplicationId.fromString(appId); + if (!applicationMap.containsKey(applicationId)) { + throw new NotFoundException("app with id: " + appId + " not found"); + } + Map timeouts = + applicationMap.get(applicationId).getApplicationTimeouts(); + AppTimeoutsInfo timeoutsInfo = new AppTimeoutsInfo(); + for (ApplicationTimeout timeout : timeouts.values()) { + AppTimeoutInfo timeoutInfo = new AppTimeoutInfo(); + timeoutInfo.setExpiryTime(timeout.getExpiryTime()); + timeoutInfo.setTimeoutType(timeout.getTimeoutType()); + timeoutInfo.setRemainingTime(timeout.getRemainingTime()); + timeoutsInfo.add(timeoutInfo); + } + + return timeoutsInfo; + } + + @Override + public Response updateApplicationTimeout(AppTimeoutInfo appTimeout, + HttpServletRequest hsr, String appId) throws AuthorizationException, + YarnException, InterruptedException, IOException { + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + ApplicationId applicationId = ApplicationId.fromString(appId); + if (!applicationMap.containsKey(applicationId)) { + throw new NotFoundException("app with id: " + appId + " not found"); + } + Map timeouts = + applicationMap.get(applicationId).getApplicationTimeouts(); + + if (!timeouts.containsKey(appTimeout.getTimeoutType())) { + throw new NotFoundException("TimeOutType with id: " + appId + " not found"); + } + for (ApplicationTimeout time : timeouts.values()) { + time.setExpiryTime(appTimeout.getExpireTime()); + } + return Response.status(Status.OK).build(); + } + + @Override + public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) { + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + ApplicationId applicationId = ApplicationId.fromString(appId); + if (!applicationMap.containsKey(applicationId)) { + throw new NotFoundException("app with id: " + appId + " not found"); + } + ApplicationReport report = applicationMap.get(applicationId); + AppAttemptsInfo info = new AppAttemptsInfo(); + ApplicationAttemptId attemptId = report.getCurrentApplicationAttemptId(); + info.add(new AppAttemptInfo()); + return info; + } + @Override + public org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo getAppAttempt(HttpServletRequest req, + HttpServletResponse res, String appId, String appAttemptId) { + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + ApplicationId applicationId = ApplicationId.fromString(appId); + if (!applicationMap.containsKey(applicationId)) { + throw new NotFoundException("app with id: " + appId + " not found"); + } + org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo info = + new org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo(); + info.setAppAttemptId(applicationMap.get(applicationId) + .getCurrentApplicationAttemptId().toString()); + return info; + } + + @Override + public ContainersInfo getContainers(HttpServletRequest req, + HttpServletResponse res, String appId, String appAttemptId) { + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + ApplicationId applicationId = ApplicationId.fromString(appId); + if (!applicationMap.containsKey(applicationId)) { + throw new NotFoundException("app with id: " + appId + " not found"); + } + + return new ContainersInfo(); + } + + @Override + public ContainerInfo getContainer(HttpServletRequest req, + HttpServletResponse res, String appId, String appAttemptId, + String containerId) { + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + ApplicationId applicationId = ApplicationId.fromString(appId); + if (!applicationMap.containsKey(applicationId)) { + throw new NotFoundException("app with id: " + appId + " not found"); + } + + return new ContainerInfo(); + } + public void setSubClusterId(int subClusterId) { setSubClusterId(SubClusterId.newInstance(Integer.toString(subClusterId))); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java index fae4ecf..adf4f1a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java @@ -21,10 +21,14 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.HashMap; import javax.ws.rs.core.Response; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; @@ -40,8 +44,21 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo; +import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; +import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; import org.junit.Assert; +import org.junit.FixMethodOrder; import org.junit.Test; +import org.junit.runners.MethodSorters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +69,7 @@ * {@code RouterClientRMService} has been written cleverly so that it can be * reused to validate different request intercepter chains. */ +@FixMethodOrder(value = MethodSorters.NAME_ASCENDING) public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { private static final Logger LOG = LoggerFactory.getLogger(TestFederationInterceptorREST.class); @@ -443,4 +461,413 @@ public void testGetClusterMetrics() { // The merge operations is tested in TestRouterWebServiceUtil } + /** + * This test validates the correctness of getAppStatistics in case each + * SubCluster provided a AppStatistics with appsSubmitted set to the + * SubClusterId. The expected result would be appSubmitted equals to 1. + * SubClusterId in this case is an integer. + */ + @Test + public void testGetAppStatistics() + throws IOException, InterruptedException, YarnException { + AppState appStateRUNNING = new AppState(YarnApplicationState.RUNNING.name()); + // Submit application to multiSubCluster + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationSubmissionContextInfo context = + new ApplicationSubmissionContextInfo(); + context.setApplicationId(appId.toString()); + // Submit the application we are going to kill later + Assert.assertNotNull(interceptor.submitApplication(context, null)); + + Response responseRUNNING = + interceptor.updateAppState(appStateRUNNING, null, appId.toString()); + Assert.assertNotNull(responseRUNNING); + + ApplicationStatisticsInfo responseGet = interceptor.getAppStatistics( + null, null, null); + Assert.assertNotNull(responseGet); + Assert.assertTrue(!responseGet.getStatItems().isEmpty()); + Assert.assertEquals(1, responseGet.getStatItems().get(0).getCount()); + Assert.assertEquals(YarnApplicationState.RUNNING, responseGet.getStatItems().get(0).getState()); + Assert.assertEquals("MapReduce", responseGet.getStatItems().get(0).getType()); + // The merge operations is tested in TestRouterWebServiceUtil + } + + /** + * This test validates the correctness of getAppStatistics in case each + * SubCluster provided a AppStatistics with appsSubmitted set to the + * SubClusterId. The expected result would be appSubmitted equals to 4. + * SubClusterId in this case is an integer. + */ + @Test + public void testGetMultiSubClusterAppStatistics() + throws IOException, InterruptedException, YarnException { + // Submit application to multiSubCluster + AppState appStateRUNNING = new AppState("RUNNING"); + for (int i = 0; i < NUM_SUBCLUSTER; i ++ ) { + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), i); + ApplicationSubmissionContextInfo context = + new ApplicationSubmissionContextInfo(); + context.setApplicationId(appId.toString()); + // Submit the application we are going to kill later + Assert.assertNotNull(interceptor.submitApplication(context, null)); + Response responseRUNNING = + interceptor.updateAppState(appStateRUNNING, null, appId.toString()); + Assert.assertNotNull(responseRUNNING); + } + + AppState appStateFINISHED = new AppState("FINISHED"); + for (int i = 0; i < NUM_SUBCLUSTER; i ++ ) { + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), i); + ApplicationSubmissionContextInfo context = + new ApplicationSubmissionContextInfo(); + context.setApplicationId(appId.toString()); + // Submit the application we are going to success later + Response response = interceptor.submitApplication(context, null); + Assert.assertNotNull(response); + Response responseSuccess = + interceptor.updateAppState(appStateFINISHED, null, appId.toString()); + Assert.assertNotNull(responseSuccess); + } + + Thread.sleep(100); + ApplicationStatisticsInfo responseGet = interceptor.getAppStatistics( + null, null, null); + Assert.assertNotNull(responseGet); + Assert.assertTrue(!responseGet.getStatItems().isEmpty()); + boolean bCaseKilled = false; + boolean bCaseFinished = false; + for (StatisticsItemInfo item : responseGet.getStatItems()) { + if (YarnApplicationState.FINISHED.equals(item.getState())) { + Assert.assertEquals(NUM_SUBCLUSTER, item.getCount()); + Assert.assertEquals(YarnApplicationState.FINISHED, item.getState()); + bCaseKilled = true; + } else if (YarnApplicationState.RUNNING.equals(item.getState())) { + Assert.assertEquals(NUM_SUBCLUSTER, item.getCount()); + Assert.assertEquals(YarnApplicationState.RUNNING, item.getState()); + bCaseFinished = true; + } + } + Assert.assertTrue(bCaseKilled && bCaseFinished); + // The merge operations is tested in TestRouterWebServiceUtil + } + + /** + * This test validates the correctness of getAppState in case each + * SubCluster provided a AppState with appsSubmitted set to the + * SubClusterId. The expected result would be appSubmitted equals to 1 + * and the Appstate is FAILED. + * SubClusterId in this case is an integer. + */ + @Test + public void testGetAppState() throws IOException, InterruptedException, YarnException { + AppState appStateFailed = new AppState(YarnApplicationState.FAILED.name()); + // Submit application to multiSubCluster + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationSubmissionContextInfo context = + new ApplicationSubmissionContextInfo(); + context.setApplicationId(appId.toString()); + // Submit the application we are going to kill later + Assert.assertNotNull(interceptor.submitApplication(context, null)); + + Response responseFailed = + interceptor.updateAppState(appStateFailed, null, appId.toString()); + Assert.assertNotNull(responseFailed); + + AppState state = interceptor.getAppState(null, appId.toString()); + Assert.assertEquals(state.getState(), appStateFailed.getState()); + } + + /** + * This test validates the correctness of getNodeToLabels. + * The expected result would be 2 nodes, the first node is CPU label and + * the second node is GPU label. + */ + @Test + public void testGetNodeToLabels () throws IOException { + NodeToLabelsInfo info = interceptor.getNodeToLabels(null); + HashMap map = info.getNodeToLabels(); + Assert.assertNotNull(map); + Assert.assertEquals(map.size(), 2); + boolean bCaseNode1 = false; + boolean bCaseNode2 = false; + for (HashMap.Entry item: map.entrySet()){ + if (item.getKey().equals("node1")) { + Assert.assertEquals(item.getValue().getNodeLabelsName().size(), 1); + Assert.assertEquals("CPU", item.getValue().getNodeLabelsName().get(0)); + bCaseNode1 = true; + } else if (item.getKey().equals("node2")) { + Assert.assertEquals(item.getValue().getNodeLabelsName().size(), 1); + Assert.assertEquals("GPU", item.getValue().getNodeLabelsName().get(0)); + bCaseNode2 = true; + } + } + + Assert.assertTrue(bCaseNode1 && bCaseNode2); + } + + /** + * This test validates the correctness of getApplicationPriority. + * The expected result would be equal to set priority. + */ + @Test + public void testGetApplicationPriority() throws IOException, InterruptedException, YarnException { + int iPriority = 101; + // Submit application to multiSubCluster + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationSubmissionContextInfo context = + new ApplicationSubmissionContextInfo(); + context.setApplicationId(appId.toString()); + context.setPriority(iPriority); + + Assert.assertNotNull(interceptor.submitApplication(context, null)); + // Get Priority by application + AppPriority priority = interceptor.getAppPriority(null, appId.toString()); + Assert.assertNotNull(priority); + Assert.assertEquals(priority.getPriority(), iPriority); + } + + /** + * This test validates the correctness of updateApplicationPriority. + * The expected result would be equal to update priority. + */ + @Test + public void testUpdateApplicationPriority() throws IOException, InterruptedException, YarnException { + + // Submit application to multiSubCluster + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationSubmissionContextInfo context = + new ApplicationSubmissionContextInfo(); + context.setApplicationId(appId.toString()); + context.setPriority(20); + // Submit the application we are going to kill later + Assert.assertNotNull(interceptor.submitApplication(context, null)); + + int iPriority = 10; + // Set Priority for application + Response response = interceptor.updateApplicationPriority(new AppPriority(iPriority), null, + appId.toString()); + + Assert.assertNotNull(response); + // Get Priority by application + AppPriority priority = interceptor.getAppPriority(null, appId.toString()); + Assert.assertNotNull(priority); + Assert.assertEquals(priority.getPriority(), iPriority); + } + + /** + * This test validates the correctness of getAppQueue. + * The expected result would be equal to set queue. + */ + @Test + public void testGetAppQueue() throws IOException, InterruptedException, YarnException { + String queueName = "queueName"; + // Submit application to multiSubCluster + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationSubmissionContextInfo context = + new ApplicationSubmissionContextInfo(); + context.setApplicationId(appId.toString()); + context.setQueue(queueName); + + Assert.assertNotNull(interceptor.submitApplication(context, null)); + // Get Priority by application + AppQueue queue = interceptor.getAppQueue(null, appId.toString()); + Assert.assertNotNull(queue); + Assert.assertEquals(queue.getQueue(), queueName); + } + + /** + * This test validates the correctness of updateAppQueue. + * The expected result would be equal to update queue. + */ + @Test + public void testUpdateAppQueue() throws IOException, InterruptedException, YarnException { + + String oldQueue = "oldQueue"; + String newQueue = "newQueue"; + // Submit application to multiSubCluster + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationSubmissionContextInfo context = + new ApplicationSubmissionContextInfo(); + context.setApplicationId(appId.toString()); + context.setPriority(20); + context.setQueue(oldQueue); + // Submit the application + Assert.assertNotNull(interceptor.submitApplication(context, null)); + + int iPriority = 10; + // Set Priority for application + Response response = interceptor.updateAppQueue(new AppQueue(newQueue), null, + appId.toString()); + + Assert.assertNotNull(response); + // Get Priority by application + AppQueue queue = interceptor.getAppQueue(null, appId.toString()); + Assert.assertNotNull(queue); + Assert.assertEquals(queue.getQueue(), newQueue); + } + + /** + * This test validates the correctness of getAppTimeout. + * The expected result would be equal to mockApp timeout. + */ + @Test + public void testGetAppTimeout() + throws IOException, InterruptedException, YarnException { + // Submit application to multiSubCluster + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationSubmissionContextInfo context = + new ApplicationSubmissionContextInfo(); + context.setApplicationId(appId.toString()); + + Assert.assertNotNull(interceptor.submitApplication(context, null)); + // Get Priority by application + AppTimeoutInfo timeoutInfo = interceptor.getAppTimeout(null, appId.toString(), + ApplicationTimeoutType.LIFETIME.toString()); + Assert.assertNotNull(timeoutInfo); + Assert.assertEquals(timeoutInfo.getTimeoutType().toString(), + ApplicationTimeoutType.LIFETIME.toString()); + } + + /** + * This test validates the correctness of getAppTimeouts. + * The expected result would be equal to mockApp timeouts. + */ + @Test + public void testGetAppTimeouts() + throws IOException, InterruptedException, YarnException { + // Submit application to multiSubCluster + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationSubmissionContextInfo context = + new ApplicationSubmissionContextInfo(); + context.setApplicationId(appId.toString()); + + Assert.assertNotNull(interceptor.submitApplication(context, null)); + // Get Priority by application + AppTimeoutsInfo timeoutsInfo = interceptor.getAppTimeouts(null, appId.toString()); + Assert.assertNotNull(timeoutsInfo); + Assert.assertTrue(!timeoutsInfo.getAppTimeouts().isEmpty()); + } + + /** + * This test validates the correctness of updateAppTimeout. + * The expected result would be equal to update timeout. + */ + @Test + public void testUpdateApplicationTimeout() + throws IOException, InterruptedException, YarnException { + // Submit application to multiSubCluster + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationSubmissionContextInfo context = + new ApplicationSubmissionContextInfo(); + context.setApplicationId(appId.toString()); + Assert.assertNotNull(interceptor.submitApplication(context, null)); + + String timeout = "1111111111111111"; + AppTimeoutInfo info = new AppTimeoutInfo(); + info.setExpiryTime(timeout); + info.setTimeoutType(ApplicationTimeoutType.LIFETIME); + Response response = interceptor.updateApplicationTimeout(info,null, appId.toString()); + Assert.assertNotNull(response); + + AppTimeoutInfo timeoutInfo = interceptor.getAppTimeout(null, appId.toString(), + ApplicationTimeoutType.LIFETIME.toString()); + Assert.assertEquals(timeoutInfo.getExpireTime().toString(), timeout); + } + + /** + * This test validates the correctness of getAppAttempts. + * The expected result would be not null. + */ + @Test + public void testGetAppAttempts() + throws IOException, InterruptedException, YarnException { + // Submit application to multiSubCluster + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationSubmissionContextInfo context = + new ApplicationSubmissionContextInfo(); + context.setApplicationId(appId.toString()); + Assert.assertNotNull(interceptor.submitApplication(context, null)); + + AppAttemptsInfo appAttemptsInfo = interceptor.getAppAttempts(null, + appId.toString()); + Assert.assertNotNull(appAttemptsInfo); + + Assert.assertTrue(!appAttemptsInfo.getAttempts().isEmpty()); + } + + /** + * This test validates the correctness of getAppAttempt. + * The expected result would be not null. + */ + @Test + public void testGetAppAttempt() + throws IOException, InterruptedException, YarnException { + // Submit application to multiSubCluster + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationSubmissionContextInfo context = + new ApplicationSubmissionContextInfo(); + context.setApplicationId(appId.toString()); + Assert.assertNotNull(interceptor.submitApplication(context, null)); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo + appAttemptInfo = interceptor.getAppAttempt(null, null, + appId.toString(), appAttemptId.toString()); + Assert.assertNotNull(appAttemptInfo); + + Assert.assertEquals(appAttemptInfo.getAppAttemptId(), appAttemptId.toString()); + } + + /** + * This test validates the correctness of getContainers. + * The expected result would be not null. + */ + @Test + public void testGetContainers() + throws IOException, InterruptedException, YarnException { + // Submit application to multiSubCluster + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationSubmissionContextInfo context = + new ApplicationSubmissionContextInfo(); + context.setApplicationId(appId.toString()); + Assert.assertNotNull(interceptor.submitApplication(context, null)); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + ContainersInfo containersInfo = interceptor.getContainers(null, null, + appId.toString(), appAttemptId.toString()); + Assert.assertNotNull(containersInfo); + } + + /** + * This test validates the correctness of getContainer. + * The expected result would be not null. + */ + @Test + public void testGetContainer() + throws IOException, InterruptedException, YarnException { + // Submit application to multiSubCluster + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationSubmissionContextInfo context = + new ApplicationSubmissionContextInfo(); + context.setApplicationId(appId.toString()); + Assert.assertNotNull(interceptor.submitApplication(context, null)); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + ContainerInfo containerInfo = interceptor.getContainer(null, null, + appId.toString(), appAttemptId.toString(), "containerId"); + Assert.assertNotNull(containerInfo); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java index edf3804..01cd8dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java @@ -21,6 +21,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.HashSet; +import java.util.HashMap; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -30,6 +32,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceRequestInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo; import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; import org.junit.Assert; import org.junit.Test; @@ -579,4 +585,158 @@ private void setUpClusterMetrics(ClusterMetricsInfo metrics, long seed) { metrics.setActiveNodes(rand.nextInt(1000)); metrics.setShutdownNodes(rand.nextInt(1000)); } + + /** + * This test validates the correctness of RouterWebServiceUtil#mergeApplicationStatisticsInfo + * in case we want to merge 2 ApplicationStatisticsInfo. The + * excepted result would be the same report. + */ + @Test + public void testMergeApplicationStatisticsInfo() { + ApplicationStatisticsInfo infoA = new ApplicationStatisticsInfo(); + ApplicationStatisticsInfo infoB = new ApplicationStatisticsInfo(); + StatisticsItemInfo item1 = new StatisticsItemInfo(YarnApplicationState.ACCEPTED, + "*", 10); + StatisticsItemInfo item2 = new StatisticsItemInfo(YarnApplicationState.ACCEPTED, + "*", 20); + + infoA.add(item1); + infoB.add(item2); + + ApplicationStatisticsInfo mergeInfo = RouterWebServiceUtil.mergeApplicationStatisticsInfo(infoA, infoB); + + Assert.assertEquals(1, mergeInfo.getStatItems().size()); + Assert.assertEquals(item1.getCount() + item2.getCount(), + mergeInfo.getStatItems().get(0).getCount()); + Assert.assertEquals(item1.getType(), + mergeInfo.getStatItems().get(0).getType()); + Assert.assertEquals(item1.getState(), + mergeInfo.getStatItems().get(0).getState()); + } + + /** + * This test validates the correctness of RouterWebServiceUtil#mergeApplicationStatisticsInfo + * in case we want to merge 4 different ApplicationStatisticsInfo. (diff type and diff state) + * The excepted result would be the 4 report. + */ + @Test + public void testMergeDiffApplicationStatisticsInfo() { + ApplicationStatisticsInfo infoA = new ApplicationStatisticsInfo(); + ApplicationStatisticsInfo infoB = new ApplicationStatisticsInfo(); + StatisticsItemInfo item1 = new StatisticsItemInfo(YarnApplicationState.ACCEPTED, + "*", 10); + StatisticsItemInfo item2 = new StatisticsItemInfo(YarnApplicationState.NEW_SAVING, + "xxxx1", 20); + + StatisticsItemInfo item3 = new StatisticsItemInfo(YarnApplicationState.NEW_SAVING, + "xxxx1", 30); + StatisticsItemInfo item4 = new StatisticsItemInfo(YarnApplicationState.FINISHED, + "xxxx3", 40); + + infoA.add(item1); + infoA.add(item2); + + infoB.add(item3); + infoB.add(item4); + + ApplicationStatisticsInfo mergeInfo = RouterWebServiceUtil.mergeApplicationStatisticsInfo(infoA, infoB); + + Assert.assertEquals(3, mergeInfo.getStatItems().size()); + ArrayList statisticsItemInfos = mergeInfo.getStatItems(); + boolean bCaseA = false; + boolean bCaseB = false; + boolean bCaseC = false; + for (StatisticsItemInfo info : statisticsItemInfos) { + if (info.getState().equals(YarnApplicationState.ACCEPTED)) { + Assert.assertEquals(info.getType(), item1.getType()); + Assert.assertEquals(info.getCount(), item1.getCount()); + bCaseA = true; + } else if (info.getState().equals(YarnApplicationState.NEW_SAVING)) { + Assert.assertEquals(item2.getCount() + item3.getCount(), + info.getCount()); + Assert.assertEquals(item2.getType(), info.getType()); + bCaseB = true; + } else { + Assert.assertEquals(item4.getState(), info.getState()); + Assert.assertEquals(item4.getCount(), info.getCount()); + Assert.assertEquals(item4.getType(), info.getType()); + bCaseC = true; + } + } + Assert.assertTrue(bCaseA && bCaseB && bCaseC); + } + + /** + * This test validates the correctness of RouterWebServiceUtil#mergeNodeToLabels + * in case we want to merge 2 different SubCluster and each cluster have 2 node. + * The excepted result would be the 4 node report. + */ + @Test + public void testMergeNodeToLabels() { + HashSet labelsA = new HashSet<>(); + labelsA.add("A_CPU"); + labelsA.add("B_CPU"); + labelsA.add("C_CPU"); + HashSet labelsB = new HashSet<>(); + labelsB.add("D_CPU"); + labelsB.add("E_CPU"); + labelsB.add("F_CPU"); + + NodeToLabelsInfo infoA = new NodeToLabelsInfo(); + HashMap nodeToLabelsA = new HashMap<>(); + nodeToLabelsA.put("A_Cluster_NODEA", new NodeLabelsInfo(labelsA)); + nodeToLabelsA.put("A_Cluster_NODEB", new NodeLabelsInfo(labelsB)); + infoA.setNodeToLabels(nodeToLabelsA); + + + NodeToLabelsInfo infoB = new NodeToLabelsInfo(); + HashMap nodeToLabelsB = new HashMap<>(); + nodeToLabelsB.put("B_Cluster_NODEA", new NodeLabelsInfo(labelsA)); + nodeToLabelsB.put("B_Cluster_NODEB", new NodeLabelsInfo(labelsB)); + infoB.setNodeToLabels(nodeToLabelsB); + + NodeToLabelsInfo result = RouterWebServiceUtil.mergeNodeToLabels(infoA, infoB); + + boolean bCase1 = false; + boolean bCase2 = false; + boolean bCase3 = false; + boolean bCase4 = false; + + Assert.assertEquals(result.getNodeToLabels().size(),4); + for (HashMap.Entry item : result.getNodeToLabels().entrySet()) { + Assert.assertEquals(item.getValue().getNodeLabels().size(), 3); + if (item.getKey().equals("A_Cluster_NODEA")) { + Assert.assertEquals(item.getValue().getNodeLabels().size(), labelsA.size()); + Assert.assertTrue(validateNodeToLabelsInfo(item.getValue().getNodeLabelsName(), + labelsA)); + bCase1 = true; + } else if (item.getKey().equals("A_Cluster_NODEB")) { + Assert.assertEquals(item.getValue().getNodeLabels().size(), labelsB.size()); + Assert.assertTrue(validateNodeToLabelsInfo(item.getValue().getNodeLabelsName(), + labelsB)); + bCase2 = true; + } else if (item.getKey().equals("B_Cluster_NODEA")) { + Assert.assertEquals(item.getValue().getNodeLabels().size(), labelsA.size()); + Assert.assertTrue(validateNodeToLabelsInfo(item.getValue().getNodeLabelsName(), + labelsA)); + bCase3 = true; + } else if (item.getKey().equals("B_Cluster_NODEB")) { + Assert.assertEquals(item.getValue().getNodeLabels().size(), labelsB.size()); + Assert.assertTrue(validateNodeToLabelsInfo(item.getValue().getNodeLabelsName(), + labelsB)); + bCase4 = true; + } + } + + Assert.assertTrue(bCase1 && bCase2 && bCase3 && bCase4); + } + + private boolean validateNodeToLabelsInfo(List left, HashSet right) { + for (String item : left) { + if (!right.contains(item)) { + return false; + } + } + return true; + } } \ No newline at end of file