diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 350f4a32ca6..1de396cda2d 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -198,6 +198,11 @@ public ResourceUtilization getAggregatedContainersUtilization() { return null; } + @Override + public Map getAggregatedAppUtilizations() { + return null; + } + @Override public ResourceUtilization getNodeUtilization() { return null; diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index bb6fb9d8632..2703c409d0e 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -224,6 +224,11 @@ public RMContext getRMContext() { return node.getRMContext(); } + @Override + public Map getAggregatedAppUtilizations() { + return node.getAggregatedAppUtilizations(); + } + @Override public Resource getPhysicalResource() { return null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java index 2ae4872fbcd..21d2538ef29 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java @@ -156,6 +156,20 @@ public void addTo(int pmem, int vmem, float cpu) { this.setCPU(this.getCPU() + cpu); } + /** + * Add utilization to the current one. + * @param resUtil Resource Utilization to add. + */ + @Public + @Unstable + public void addTo(ResourceUtilization resUtil) { + this.setPhysicalMemory( + this.getPhysicalMemory() + resUtil.getPhysicalMemory()); + this.setVirtualMemory( + this.getVirtualMemory() + resUtil.getVirtualMemory()); + this.setCPU(this.getCPU() + resUtil.getCPU()); + } + /** * Subtract utilization from the current one. * @param pmem Physical memory to be subtracted. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java index 440cd0a2902..87586d9ef33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java @@ -19,6 +19,7 @@ import java.util.List; +import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -132,4 +133,22 @@ public abstract void setIncreasedContainers( @Unstable public abstract void setOpportunisticContainersStatus( OpportunisticContainersStatus opportunisticContainersStatus); + + /** + * Set per application ResourceUtilization. + * @param applicationUtilizations Per application utilization map. + */ + @Private + @Unstable + public abstract void setApplicationUtilizations( + Map applicationUtilizations); + + /** + * Get per application ResourceUtilization. + * @return Per application utilizations map. + */ + @Private + @Unstable + public abstract Map + getApplicationUtilizations(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java index 8aebc6fa913..a40e1fbef9b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java @@ -20,9 +20,11 @@ import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -38,6 +40,8 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.AppResourceUtilizationProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.AppResourceUtilizationProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder; @@ -57,6 +61,7 @@ private NodeHealthStatus nodeHealthStatus = null; private List keepAliveApplications = null; private List increasedContainers = null; + private Map appUtilizations = null; public NodeStatusPBImpl() { builder = NodeStatusProto.newBuilder(); @@ -90,6 +95,9 @@ private synchronized void mergeLocalToBuilder() { if (this.increasedContainers != null) { addIncreasedContainersToProto(); } + if (this.appUtilizations != null) { + addAppUtilizations(); + } } private synchronized void mergeLocalToProto() { @@ -107,6 +115,22 @@ private synchronized void maybeInitBuilder() { } viaProto = false; } + private void addAppUtilizations() { + maybeInitBuilder(); + builder.clearApplicationUtilizations(); + if (this.appUtilizations == null) + return; + List protoList = + new ArrayList<>(); + + for (Map.Entry entry : this.appUtilizations + .entrySet()) { + protoList.add(AppResourceUtilizationProto.newBuilder() + .setApplicationId(convertToProtoFormat(entry.getKey())) + .setUtilization(convertToProtoFormat(entry.getValue())).build()); + } + builder.addAllApplicationUtilizations(protoList); + } private synchronized void addContainersToProto() { maybeInitBuilder(); @@ -425,6 +449,36 @@ public synchronized void setOpportunisticContainersStatus( convertToProtoFormat(opportunisticContainersStatus)); } + private synchronized void initAppUtilizations() { + if (this.appUtilizations != null) { + return; + } + NodeStatusProtoOrBuilder p = viaProto ? proto : builder; + List protoList = p.getApplicationUtilizationsList(); + this.appUtilizations = new HashMap<>(); + for (AppResourceUtilizationProto au : protoList) { + this.appUtilizations.put(convertFromProtoFormat(au.getApplicationId()), + convertFromProtoFormat(au.getUtilization())); + } + } + + @Override + public synchronized void setApplicationUtilizations( + Map applicationUtilizations) { + maybeInitBuilder(); + if (applicationUtilizations == null) { + builder.clearApplicationUtilizations(); + } + this.appUtilizations = applicationUtilizations; + } + + @Override + public synchronized Map + getApplicationUtilizations() { + initAppUtilizations(); + return this.appUtilizations; + } + private NodeIdProto convertToProtoFormat(NodeId nodeId) { return ((NodeIdPBImpl)nodeId).getProto(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 82008081551..30a1e292e05 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -40,6 +40,12 @@ message NodeStatusProto { optional ResourceUtilizationProto node_utilization = 7; repeated ContainerProto increased_containers = 8; optional OpportunisticContainersStatusProto opportunistic_containers_status = 9; + repeated AppResourceUtilizationProto application_utilizations = 11; +} + +message AppResourceUtilizationProto { + required ApplicationIdProto application_id = 1; + optional ResourceUtilizationProto utilization = 3; } message OpportunisticContainersStatusProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index d7573762763..e09fe0831eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -514,6 +514,8 @@ protected NodeStatus getNodeStatus(int responseId) throws IOException { List containersStatuses = getContainerStatuses(); ResourceUtilization containersUtilization = getContainersUtilization(); ResourceUtilization nodeUtilization = getNodeUtilization(); + Map appUtilizations = + getAppUtilizations(); List increasedContainers = getIncreasedContainers(); NodeStatus nodeStatus = @@ -523,6 +525,7 @@ protected NodeStatus getNodeStatus(int responseId) throws IOException { nodeStatus.setOpportunisticContainersStatus( getOpportunisticContainersStatus()); + nodeStatus.setApplicationUtilizations(appUtilizations); return nodeStatus; } @@ -546,6 +549,12 @@ private ResourceUtilization getContainersUtilization() { return containersMonitor.getContainersUtilization(false).getUtilization(); } + private Map getAppUtilizations() { + ContainersMonitor containersMonitor = + this.context.getContainerManager().getContainersMonitor(); + return containersMonitor.getAppUtilizations(false).getUtilizations(); + } + /** * Get the utilization of the node. This includes the containers. * @return Resource utilization of the node. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java index 8da4ec4c411..c2193ec66cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java @@ -18,7 +18,9 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; +import java.util.Map; import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.event.EventHandler; @@ -36,6 +38,14 @@ */ ContainersResourceUtilization getContainersUtilization(boolean latest); + /** + * Get the per app aggregate resource utilization of containers running on + * the node. + * @param latest true if the latest result should be returned. + * @return AppResourceUtilization per app resource utilization. + */ + AppResourceUtilizations getAppUtilizations(boolean latest); + /** * Get the policy to over-allocate containers when over-allocation is on. * @return null if over-allocation is turned off @@ -102,4 +112,27 @@ public ResourceUtilization getUtilization() { return utilization; } } + + /** + * A snapshot of resource utilization of all containers with the timestamp. + */ + final class AppResourceUtilizations { + private final Map utilizations; + private final long timestamp; + + public AppResourceUtilizations( + Map utilizations, + long timestamp) { + this.utilizations = utilizations; + this.timestamp = timestamp; + } + + public long getTimestamp() { + return timestamp; + } + + public Map getUtilizations() { + return utilizations; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index c36dfd483be..f1c1b43c4a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -20,7 +20,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import java.util.HashMap; import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupElasticMemoryController; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.MemoryResourceHandler; @@ -122,6 +124,7 @@ } private ContainersResourceUtilization latestContainersUtilization; + private AppResourceUtilizations latestAppUtilizations; private NMAllocationPolicy overAllocationPolicy; private ResourceThresholds overAllocationPreemptionThresholds; @@ -622,6 +625,9 @@ public void run() { long vmemUsageByAllContainers = 0; long pmemByAllContainers = 0; long cpuUsagePercentPerCoreByAllContainers = 0; + + Map perAppResourceUtilization = + new HashMap<>(); for (Entry entry : trackingContainers .entrySet()) { ContainerId containerId = entry.getKey(); @@ -657,7 +663,8 @@ public void run() { } recordUsage(containerId, pId, pTree, ptInfo, currentVmemUsage, - currentPmemUsage, trackedContainersUtilization); + currentPmemUsage, trackedContainersUtilization, + perAppResourceUtilization); checkLimit(containerId, pId, pTree, ptInfo, currentVmemUsage, currentPmemUsage); @@ -686,6 +693,8 @@ public void run() { // Save the aggregated utilization of the containers setLatestContainersUtilization(trackedContainersUtilization); + // Save the aggregated app utilizations + setLatestAppUtilizations(perAppResourceUtilization); // check opportunity to start containers if over-allocation is on checkUtilization(); @@ -781,7 +790,8 @@ private void recordUsage(ContainerId containerId, String pId, ResourceCalculatorProcessTree pTree, ProcessTreeInfo ptInfo, long currentVmemUsage, long currentPmemUsage, - ResourceUtilization trackedContainersUtilization) { + ResourceUtilization trackedContainersUtilization, + Map perAppUtil) { // if machine has 6 cores and 3 are used, // cpuUsagePercentPerCore should be 300% and // cpuUsageTotalCoresPercentage should be 50% @@ -806,12 +816,21 @@ private void recordUsage(ContainerId containerId, String pId, cpuUsageTotalCoresPercentage)); } - // Add resource utilization for this container - trackedContainersUtilization.addTo( + ResourceUtilization currResUtil = + ResourceUtilization.newInstance( (int) (currentPmemUsage >> 20), (int) (currentVmemUsage >> 20), milliVcoresUsed / 1000.0f); + // Add resource utilization for this container + trackedContainersUtilization.addTo(currResUtil); + + ResourceUtilization appUtil = + perAppUtil.computeIfAbsent( + containerId.getApplicationAttemptId().getApplicationId(), + (x -> ResourceUtilization.newInstance(0, 0, 0.0f))); + appUtil.addTo(currResUtil); + // Add usage to container metrics if (containerMetricsEnabled) { ContainerMetrics.forContainer( @@ -1084,6 +1103,13 @@ public ContainersResourceUtilization getContainersUtilization( return this.latestContainersUtilization; } + @Override + public AppResourceUtilizations getAppUtilizations(boolean latest) { + // TODO If latest is true, kickoff an immediate app utilization + // and return value. + return this.latestAppUtilizations; + } + @Override public NMAllocationPolicy getContainerOverAllocationPolicy() { return overAllocationPolicy; @@ -1098,6 +1124,12 @@ private void setLatestContainersUtilization(ResourceUtilization utilization) { utilization, Time.now()); } + private void setLatestAppUtilizations( + Map appUtilization) { + this.latestAppUtilizations = new AppResourceUtilizations( + appUtilization, Time.now()); + } + /** * Check the resource utilization of the node. If the utilization is below * the over-allocation threshold, {@link ContainerScheduler} is notified to diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 16f019f3c84..7883d516ad2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -206,6 +206,7 @@ private final String zkRootNodePassword = Long.toString(new SecureRandom().nextLong()); private boolean recoveryEnabled; + protected ResourceUtilizationAggregator resUtilAggregator; @VisibleForTesting protected String webAppAddress; @@ -344,6 +345,8 @@ protected void serviceInit(Configuration conf) throws Exception { rmContext.setSystemMetricsPublisher(systemMetricsPublisher); registerMXBean(); + this.resUtilAggregator = new ResourceUtilizationAggregator(rmContext); + addService(this.resUtilAggregator); super.serviceInit(this.conf); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceUtilizationAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceUtilizationAggregator.java new file mode 100644 index 00000000000..b0e0748683a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceUtilizationAggregator.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import com.google.common.annotations.VisibleForTesting; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; + + +/** + * This computes a snapshot of aggregated actual resource utilization across + * Applications, Users and Queues. Queue aggregation will be performed + * only at the LeafQueue level. + * The snapshot calculation interval is set to the Node heartbeat interval. + * It is assumed that all nodes would have heartbeat-ed to the RM in that + * interval. + */ +public class ResourceUtilizationAggregator extends AbstractService { + + private static final Log LOG = + LogFactory.getLog(ResourceUtilizationAggregator.class); + + final RMContext rmContext; + final ScheduledExecutorService scheduledExecutor; + private volatile Map stalePerAppUtilization = + new HashMap<>(); + private volatile Map stalePerUserUtilization = + new HashMap<>(); + private volatile Map stalePerQueueUtilization = + new HashMap<>(); + + private AggregationTask aggTask = null; + + final class AggregationTask implements Runnable { + @Override + public void run() { + ConcurrentMap rmNodes = rmContext.getRMNodes(); + Map perAppUtilization = + new HashMap<>(); + Map perUserUtilization = + new HashMap<>(); + Map perQueueUtilization = + new HashMap<>(); + rmNodes.values().stream() + .filter(n -> !n.getState().isUnusable()) + .forEach(rmNode -> { + Map aggAppUtilizations = + rmNode.getAggregatedAppUtilizations(); + if (aggAppUtilizations != null) { + aggAppUtilizations.forEach((appId, appResUtilPerNode) -> { + RMApp rmApp = rmContext.getRMApps().get(appId); + if (rmApp != null) { + SchedulerApplicationAttempt appAttempt = + ((AbstractYarnScheduler) rmContext.getScheduler()) + .getApplicationAttempt( + rmApp.getCurrentAppAttempt().getAppAttemptId()); + if (appAttempt != null) { + Queue queue = appAttempt.getQueue(); + perQueueUtilization.computeIfAbsent(queue, + (x -> ResourceUtilization.newInstance(0, 0, 0.0f))) + .addTo(appResUtilPerNode); + perAppUtilization.computeIfAbsent(appId, + (x -> ResourceUtilization.newInstance(0, 0, 0.0f))) + .addTo(appResUtilPerNode); + String user = appAttempt.getUser(); + if (user != null) { + perUserUtilization.computeIfAbsent(user, + (x -> ResourceUtilization.newInstance(0, 0, 0.0f))) + .addTo(appResUtilPerNode); + } else { + LOG.warn("No user found for application attempt " + + "[" + appAttempt.getApplicationAttemptId() + "] !!"); + } + } else { + LOG.warn("No App Attempt for application " + + "[" + appId + "] found !!"); + } + } else { + LOG.warn("Invalid Application " + + "[" + appId + "] received !!"); + } + }); + } + }); + stalePerAppUtilization = perAppUtilization; + stalePerQueueUtilization = perQueueUtilization; + stalePerUserUtilization = perUserUtilization; + } + } + + /** + * Construct the service. + */ + public ResourceUtilizationAggregator(RMContext rmContext) { + super("Resource Utilization Aggregator"); + this.rmContext = rmContext; + this.scheduledExecutor = new HadoopScheduledThreadPoolExecutor(1); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + long aggInterval = conf.getLong( + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS); + this.aggTask = new AggregationTask(); + this.scheduledExecutor.scheduleAtFixedRate(aggTask, + aggInterval, aggInterval, TimeUnit.MILLISECONDS); + } + + @VisibleForTesting + void kickoffAggregation() { + this.aggTask.run(); + } + + /** + * Return aggregated Resource Utilization for the User. + * @param user User. + * @return Resource Utilization. + */ + public ResourceUtilization getUserResourceUtilization(String user) { + return stalePerUserUtilization.computeIfAbsent(user, + (x -> ResourceUtilization.newInstance(0, 0, 0.0f))); + } + + /** + * Return aggregated Resource Utilization for the Queue. Currently, + * user is expected to provide the Leaf Queue. Aggregation across + * the queue hierarchy is not supported since queue traversal is + * not consistent across schedulers. + * @param queue Queue. + * @return Resource Utilization. + */ + public ResourceUtilization getQueueResourceUtilization(Queue queue) { + return stalePerQueueUtilization.computeIfAbsent(queue, + (x -> ResourceUtilization.newInstance(0, 0, 0.0f))); + } + + /** + * Return aggregated Resource Utilization for the application. + * @param applicationId Application Id. + * @return Resource Utilization. + */ + public ResourceUtilization getAppResourceUtilization( + ApplicationId applicationId) { + return stalePerAppUtilization.computeIfAbsent(applicationId, + (x -> ResourceUtilization.newInstance(0, 0, 0.0f))); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index aa194837163..aca750be8f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -111,6 +111,14 @@ */ public ResourceUtilization getAggregatedContainersUtilization(); + /** + * the per app aggregated utilization of the containers running + * on the node. + * @return the aggregated per-app container utilzation; + */ + public Map + getAggregatedAppUtilizations(); + /** * the total resource utilization of the node. * @return the total resource utilization of the node. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 0e32f1ebbb5..5d5172776f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -138,6 +138,8 @@ private ResourceUtilization containersUtilization; /* Resource utilization for the node. */ private ResourceUtilization nodeUtilization; + /* Per app aggregate utilization. */ + private Map appUtilizations; /** Physical resources in the node. */ private volatile Resource physicalResource; @@ -508,9 +510,28 @@ public String getNodeManagerVersion() { } @Override - public ResourceUtilization getAggregatedContainersUtilization() { + public Map getAggregatedAppUtilizations() { this.readLock.lock(); + try { + return this.appUtilizations; + } finally { + this.readLock.unlock(); + } + } + + public void setAggregatedAppUtilizations( + Map appUtilizations) { + this.writeLock.lock(); + try { + this.appUtilizations = appUtilizations; + } finally { + this.writeLock.unlock(); + } + } + @Override + public ResourceUtilization getAggregatedContainersUtilization() { + this.readLock.lock(); try { return this.containersUtilization; } finally { @@ -830,6 +851,8 @@ private static NodeHealthStatus updateRMNodeFromStatusEvents( rmNode.setAggregatedContainersUtilization(statusEvent .getAggregatedContainersUtilization()); rmNode.setNodeUtilization(statusEvent.getNodeUtilization()); + rmNode.setAggregatedAppUtilizations( + statusEvent.getAggregateAppUtilization()); return remoteNodeHealthStatus; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java index 5f5fe24d173..e799ba4ed46 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.List; +import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -63,6 +64,10 @@ public ResourceUtilization getAggregatedContainersUtilization() { return this.nodeStatus.getContainersUtilization(); } + public Map getAggregateAppUtilization() { + return this.nodeStatus.getApplicationUtilizations(); + } + public ResourceUtilization getNodeUtilization() { return this.nodeStatus.getNodeUtilization(); } @@ -79,7 +84,7 @@ public void setLogAggregationReportsForApps( List logAggregationReportsForApps) { this.logAggregationReportsForApps = logAggregationReportsForApps; } - + public List getNMReportedIncreasedContainers() { return this.nodeStatus.getIncreasedContainers() == null ? Collections.emptyList() : this.nodeStatus.getIncreasedContainers(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 2e2839552ee..6a0d209fcd3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; @@ -141,7 +142,7 @@ public void containerIncreaseStatus(Container container) throws Exception { container.getResource()); List increasedConts = Collections.singletonList(container); nodeHeartbeat(Collections.singletonList(containerStatus), increasedConts, - true, responseId); + true, responseId, null); } public void addRegisteringCollector(ApplicationId appId, @@ -211,7 +212,13 @@ public RegisterNodeManagerResponse registerNode( public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception { return nodeHeartbeat(Collections.emptyList(), - Collections.emptyList(), isHealthy, responseId); + Collections.emptyList(), isHealthy, responseId, null); + } + + public NodeHeartbeatResponse nodeHeartbeat( + Map appUtil) throws Exception { + return nodeHeartbeat(Collections.emptyList(), + Collections.emptyList(), true, responseId, appUtil); } public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId, @@ -224,7 +231,7 @@ public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId, containerStatusList.add(containerStatus); Log.getLog().info("ContainerStatus: " + containerStatus); return nodeHeartbeat(containerStatusList, - Collections.emptyList(), true, responseId); + Collections.emptyList(), true, responseId, null); } public NodeHeartbeatResponse nodeHeartbeat(MapemptyList(), - isHealthy, resId); + isHealthy, resId, null); } public NodeHeartbeatResponse nodeHeartbeat( List updatedStats, boolean isHealthy) throws Exception { return nodeHeartbeat(updatedStats, Collections.emptyList(), - isHealthy, responseId); + isHealthy, responseId, null); } public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, - List increasedConts, boolean isHealthy, int resId) - throws Exception { + List increasedConts, boolean isHealthy, int resId, + Map appUtil) throws Exception { NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class); NodeStatus status = Records.newRecord(NodeStatus.class); status.setResponseId(resId); @@ -273,6 +280,11 @@ public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, healthStatus.setIsNodeHealthy(isHealthy); healthStatus.setLastHealthReportTime(1); status.setNodeHealthStatus(healthStatus); + + if (appUtil != null && !appUtil.isEmpty()) { + status.setApplicationUtilizations(appUtil); + } + req.setNodeStatus(status); req.setLastKnownContainerTokenMasterKey(this.currentContainerTokenMasterKey); req.setLastKnownNMTokenMasterKey(this.currentNMTokenMasterKey); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index d841ff0f2b3..60c7abc23c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -121,6 +121,7 @@ public static Resource newAvailResource(Resource total, Resource used) { private Set labels; private ResourceUtilization containersUtilization; private ResourceUtilization nodeUtilization; + private Map appUtilization; private Resource physicalResource; private OverAllocationInfo overAllocationInfo; private List containerUpdates = @@ -330,13 +331,27 @@ public Resource getPhysicalResource() { return this.physicalResource; } + @Override + public Map getAggregatedAppUtilizations() { + return this.appUtilization; + } + public void updateContainersInfoAndUtilization( UpdatedContainerInfo updatedContainerInfo, ResourceUtilization resourceUtilization) { + updateContainersInfoAndUtilization( + updatedContainerInfo, resourceUtilization, null); + } + + public void updateContainersInfoAndUtilization( + UpdatedContainerInfo updatedContainerInfo, + ResourceUtilization resourceUtilization, + Map appUtilization) { if (updatedContainerInfo != null) { containerUpdates = Collections.singletonList(updatedContainerInfo); } this.containersUtilization = resourceUtilization; + this.appUtilization = appUtilization; } }; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceUtilizationAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceUtilizationAggregator.java new file mode 100644 index 00000000000..21508a8e529 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceUtilizationAggregator.java @@ -0,0 +1,345 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + + +public class TestResourceUtilizationAggregator { + + private static final int GB = 1024; + + private MockRM rm; + private DrainDispatcher dispatcher; + MockNM nm1, nm2, nm3, nm4; + + @Before + public void createAndStartRM() throws Exception { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100); + startRM(conf); + + nm1 = new MockNM("h1:1234", 8 * GB, rm.getResourceTrackerService()); + nm2 = new MockNM("h1:4321", 8 * GB, rm.getResourceTrackerService()); + nm3 = new MockNM("h2:1234", 8 * GB, rm.getResourceTrackerService()); + nm4 = new MockNM("h2:4321", 8 * GB, rm.getResourceTrackerService()); + nm1.registerNode(); + nm2.registerNode(); + nm3.registerNode(); + nm4.registerNode(); + } + + private void startRM(final YarnConfiguration conf) { + dispatcher = new DrainDispatcher(); + rm = new MockRM(conf) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm.start(); + } + + @After + public void stopRM() { + if (rm != null) { + rm.stop(); + } + } + + /** + * Check if Resource Utilization Aggregation works correctly. + * Start 3 Apps across 4 nodes : 2 apps by 'user1' and 1 by 'user2' + * .. but all on the same queue. + * + * Step 1: Send Node Heartbeats with App Resource Utilization. + * Ensure the Resource utilization is correctly aggregated across + * apps, users and queues. + * + * Step 2: Resend Node Heartbeats with Increase in one App's Utilization + * Ensure the Resource utilization is correctly aggregated across + * apps, users and queues. + * + * Step 3: Resend Node Heatbeats with Decrease in utilization across + * all app. Ensure the Resource utilization is correctly aggregated across + * apps, users and queues. + * + * @throws Exception + */ + @Test(timeout = 60000) + public void testResourceUtilizationAggregation() throws Exception { + + RMApp app1 = rm.submitApp(1 * GB, "app1", "user1", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + + RMApp app2 = rm.submitApp(1 * GB, "app2", "user2", null, "default"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm3); + + RMApp app3 = rm.submitApp(1 * GB, "app3", "user2", null, "default"); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm, nm4); + + am1.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 1, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.GUARANTEED, true))), + null); + + am2.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 1, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.GUARANTEED, true))), + null); + + am3.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 1, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.GUARANTEED, true))), + null); + + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + nm3.nodeHeartbeat(true); + nm4.nodeHeartbeat(true); + rm.drainEvents(); + dispatcher.waitForEventThreadToWait(); + + ResourceUtilizationAggregator resUtilAggregator = + rm.getRMContext().getResourceManager().resUtilAggregator; + + AllocateResponse alloc1 = am1.allocate( + new ArrayList<>(), new ArrayList<>()); + assertEquals(1, alloc1.getAllocatedContainers().size()); + + AllocateResponse alloc2 = am2.allocate( + new ArrayList<>(), new ArrayList<>()); + assertEquals(1, alloc2.getAllocatedContainers().size()); + + AllocateResponse alloc3 = am3.allocate( + new ArrayList<>(), new ArrayList<>()); + assertEquals(1, alloc3.getAllocatedContainers().size()); + + // START Step 1 ========> + // Send Node Heartbeats with App Resource Utilization. + // Ensure the Resource utilization is correctly aggregated across + // apps, users and queues. + sendHeartBeatsWithAppUtil( + mkmap( + e(app1.getApplicationId(), ResourceUtilization.newInstance(1, 1, 0.1f)), + e(app2.getApplicationId(), ResourceUtilization.newInstance(3, 3, 0.3f)) + ), + mkmap( + e(app1.getApplicationId(), ResourceUtilization.newInstance(2, 2, 0.2f)) + ), + mkmap( + e(app3.getApplicationId(), ResourceUtilization.newInstance(5, 5, 0.5f)) + ), + mkmap( + e(app2.getApplicationId(), ResourceUtilization.newInstance(4, 4, 0.4f)) + )); + + AbstractYarnScheduler sched = + (AbstractYarnScheduler) rm.getRMContext().getScheduler(); + SchedulerApplicationAttempt appAttempt1 = + sched.getApplicationAttempt( + app1.getCurrentAppAttempt().getAppAttemptId()); + SchedulerApplicationAttempt appAttempt2 = + sched.getApplicationAttempt( + app2.getCurrentAppAttempt().getAppAttemptId()); + + ResourceUtilization aRU1 = + resUtilAggregator.getAppResourceUtilization(app1.getApplicationId()); + ResourceUtilization aRU2 = + resUtilAggregator.getAppResourceUtilization(app2.getApplicationId()); + ResourceUtilization aRU3 = + resUtilAggregator.getAppResourceUtilization(app3.getApplicationId()); + + ResourceUtilization uRU1 = + resUtilAggregator.getUserResourceUtilization(appAttempt1.getUser()); + ResourceUtilization uRU2 = + resUtilAggregator.getUserResourceUtilization(appAttempt2.getUser()); + + // Check aggregated utilization across nodes for + // each application + assertEquals(3, aRU1.getPhysicalMemory()); + assertEquals(3, aRU1.getVirtualMemory()); + assertEquals(7, aRU2.getPhysicalMemory()); + assertEquals(7, aRU2.getVirtualMemory()); + assertEquals(5, aRU3.getPhysicalMemory()); + assertEquals(5, aRU3.getVirtualMemory()); + + // Check aggregated utilization across nodes for + // each user + assertEquals(3, uRU1.getPhysicalMemory()); + assertEquals(3, uRU1.getVirtualMemory()); + assertEquals(12, uRU2.getPhysicalMemory()); + assertEquals(12, uRU2.getVirtualMemory()); + + assertEquals(appAttempt1.getQueue(), appAttempt2.getQueue()); + + // All three applications are bound to the same queue, + // so the queue utilization should be the total aggregate.. + ResourceUtilization qRU = + resUtilAggregator.getQueueResourceUtilization(appAttempt1.getQueue()); + assertEquals(15, qRU.getPhysicalMemory()); + assertEquals(15, qRU.getVirtualMemory()); + // <======== END Step 1 + + // START Step 2 ========> + // Resend Node Heartbeats with Increase in one App's Utilization + // Ensure the Resource utilization is correctly aggregated across + // apps, users and queues. + sendHeartBeatsWithAppUtil( + mkmap( + e(app1.getApplicationId(), ResourceUtilization.newInstance(2, 2, 0.1f)), + e(app2.getApplicationId(), ResourceUtilization.newInstance(3, 3, 0.3f)) + ), + mkmap( + e(app1.getApplicationId(), ResourceUtilization.newInstance(2, 2, 0.2f)) + ), + mkmap( + e(app3.getApplicationId(), ResourceUtilization.newInstance(5, 5, 0.5f)) + ), + mkmap( + e(app2.getApplicationId(), ResourceUtilization.newInstance(4, 4, 0.4f)) + )); + + aRU1 = resUtilAggregator.getAppResourceUtilization(app1.getApplicationId()); + aRU2 = resUtilAggregator.getAppResourceUtilization(app2.getApplicationId()); + aRU3 = resUtilAggregator.getAppResourceUtilization(app3.getApplicationId()); + + uRU1 = resUtilAggregator.getUserResourceUtilization(appAttempt1.getUser()); + uRU2 = resUtilAggregator.getUserResourceUtilization(appAttempt2.getUser()); + + qRU = resUtilAggregator.getQueueResourceUtilization(appAttempt1.getQueue()); + + // App1, User1 and overall Queue utilization should increase + // Everything else should stay the same.. + assertEquals(4, aRU1.getPhysicalMemory()); + assertEquals(7, aRU2.getPhysicalMemory()); + assertEquals(5, aRU3.getPhysicalMemory()); + assertEquals(4, uRU1.getPhysicalMemory()); + assertEquals(12, uRU2.getPhysicalMemory()); + assertEquals(16, qRU.getPhysicalMemory()); + // <======== END Step 2 + + // START Step 3 ========> + // Resend Node Heatbeats with Decrease in utilization across + // all app. Ensure the Resource utilization is correctly aggregated across + // apps, users and queues. + sendHeartBeatsWithAppUtil( + mkmap( + e(app1.getApplicationId(), ResourceUtilization.newInstance(1, 1, 0.1f)), + e(app2.getApplicationId(), ResourceUtilization.newInstance(1, 1, 0.1f)) + ), + mkmap( + e(app1.getApplicationId(), ResourceUtilization.newInstance(1, 1, 0.1f)) + ), + mkmap( + e(app3.getApplicationId(), ResourceUtilization.newInstance(1, 1, 0.1f)) + ), + mkmap( + e(app2.getApplicationId(), ResourceUtilization.newInstance(1, 1, 0.1f)) + )); + + aRU1 = resUtilAggregator.getAppResourceUtilization(app1.getApplicationId()); + aRU2 = resUtilAggregator.getAppResourceUtilization(app2.getApplicationId()); + aRU3 = resUtilAggregator.getAppResourceUtilization(app3.getApplicationId()); + + uRU1 = resUtilAggregator.getUserResourceUtilization(appAttempt1.getUser()); + uRU2 = resUtilAggregator.getUserResourceUtilization(appAttempt2.getUser()); + + qRU = resUtilAggregator.getQueueResourceUtilization(appAttempt1.getQueue()); + + // All Utilizations should decrease.. + assertEquals(2, aRU1.getPhysicalMemory()); + assertEquals(2, aRU2.getPhysicalMemory()); + assertEquals(1, aRU3.getPhysicalMemory()); + assertEquals(2, uRU1.getPhysicalMemory()); + assertEquals(3, uRU2.getPhysicalMemory()); + assertEquals(5, qRU.getPhysicalMemory()); + // <======== END Step 3 + } + + private void sendHeartBeatsWithAppUtil( + Map nm1AppUtil, + Map nm2AppUtil, + Map nm3AppUtil, + Map nm4AppUtil) throws Exception{ + nm1.nodeHeartbeat(nm1AppUtil); + nm2.nodeHeartbeat(nm2AppUtil); + nm3.nodeHeartbeat(nm3AppUtil); + nm4.nodeHeartbeat(nm4AppUtil); + + // Wait for scheduler to process all events + rm.drainEvents(); + dispatcher.waitForEventThreadToWait(); + + ResourceUtilizationAggregator resUtilAggregator = + rm.getRMContext().getResourceManager().resUtilAggregator; + resUtilAggregator.kickoffAggregation(); + } + /** + * Utility function to create a map + */ + private static Map mkmap(AbstractMap.SimpleEntry... es) { + return Stream.of(es).collect( + Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + /** + * Utility function to create a map entry to me used by above function + */ + private static AbstractMap.SimpleEntry e(K key, V val) { + return new AbstractMap.SimpleEntry<>(key, val); + } +}