diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java index e69de29bb2d..3836e2ee071 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metrics; + +@Metrics(context = "yarn") +public class PartitionQueueMetrics extends QueueMetrics { + + private String partition; + + protected PartitionQueueMetrics(MetricsSystem ms, String queueName, + Queue parent, boolean enableUserMetrics, Configuration conf, + String partition) { + super(ms, queueName, parent, enableUserMetrics, conf); + this.partition = partition; + } + + protected PartitionQueueMetrics(MetricsSystem ms, String queueName, + Queue parent, boolean enableUserMetrics, Configuration conf, String partition, + boolean partitionQueueMetrics) { + super(ms, queueName, parent, enableUserMetrics, conf, partitionQueueMetrics); + this.partition = partition; + } + + /** + * Partition * Queue * User Metrics + * + * @param userName Name of the user + * @return QueueMetrics + */ + @Override + public synchronized QueueMetrics getUserMetrics(String userName) { + System.out + .println("user is " + userName + " users size is " + users + + " part queue is "); + if (users == null) { + return null; + } + + String metricName = partition + this.queueName + userName; + QueueMetrics metrics = (PartitionQueueMetrics) users.get(metricName); + if (metrics == null) { + metrics = new PartitionQueueMetrics(this.metricsSystem, + this.queueName, null, false, this.conf, this.partition, false); + users.put(metricName, metrics); + metricsSystem.register(pSourceName(partition).append(qSourceName(queueName)). + append(",user=").append(userName).toString(), + "Metrics for user '" + userName + "' in queue '" + queueName + "'", + ((PartitionQueueMetrics) metrics.tag(PARTITION_INFO, partition). + tag(QUEUE_INFO, queueName)).tag(USER_INFO, + userName)); + getQueueMetrics().put(metricName, metrics); + } + return metrics; + } + + public synchronized static PartitionQueueMetrics forQueue( + MetricsSystem ms, String queueName, Queue parent, + boolean enableUserMetrics, Configuration conf, String partition) { + QueueMetrics metrics = getQueueMetrics().get(partition); + if (metrics == null) { + metrics = + new PartitionQueueMetrics(ms, queueName, parent, enableUserMetrics, + conf, partition, false); + + // Register with the MetricsSystems + if (ms != null) { + metrics = + ms.register(pSourceName(partition).toString(), + "Metrics for partition: " + partition, + (PartitionQueueMetrics) metrics.tag(PARTITION_INFO, partition)); + } + getQueueMetrics().put(partition, metrics); + } + + return (PartitionQueueMetrics) metrics; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index c126338ef0d..4f5d250f99f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -43,9 +43,11 @@ import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueMetrics; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,16 +88,31 @@ MutableCounterLong aggregateContainersAllocated; @Metric("Aggregate # of released containers") MutableCounterLong aggregateContainersReleased; + private static String ALLOCATED_RESOURCE_METRIC_PREFIX = + "AllocatedResource."; + private static String ALLOCATED_RESOURCE_METRIC_DESC = + "Allocated NAME"; @Metric("Available memory in MB") MutableGaugeLong availableMB; @Metric("Available CPU in virtual cores") MutableGaugeInt availableVCores; + private static String AVAILABLE_RESOURCE_METRIC_PREFIX = + "AvailableResource."; + private static String AVAILABLE_RESOURCE_METRIC_DESC = + "Available NAME"; @Metric("Pending memory allocation in MB") MutableGaugeLong pendingMB; @Metric("Pending CPU allocation in virtual cores") MutableGaugeInt pendingVCores; @Metric("# of pending containers") MutableGaugeInt pendingContainers; + private static String PENDING_RESOURCE_METRIC_PREFIX = + "PendingResource."; + private static String PENDING_RESOURCE_METRIC_DESC = + "Pending NAME allocation"; @Metric("# of reserved memory in MB") MutableGaugeLong reservedMB; @Metric("Reserved CPU in virtual cores") MutableGaugeInt reservedVCores; @Metric("# of reserved containers") MutableGaugeInt reservedContainers; - + private static String RESERVED_RESOURCE_METRIC_PREFIX = + "ReservedResource."; + private static String RESERVED_RESOURCE_METRIC_DESC = + "Reserved NAME"; private final MutableGaugeInt[] runningTime; private TimeBucketMetrics runBuckets; @@ -106,6 +123,8 @@ info("Queue", "Metrics by queue"); protected static final MetricsInfo USER_INFO = info("User", "Metrics by user"); + protected static final MetricsInfo PARTITION_INFO = + info("Partition", "Metrics by partition"); static final Splitter Q_SPLITTER = Splitter.on('.').omitEmptyStrings().trimResults(); @@ -117,20 +136,43 @@ protected final Configuration conf; private QueueMetricsForCustomResources queueMetricsForCustomResources; + /** + * + * Ensure Partition * Queue Metrics flow doesn't get into infinite loops + + */ + protected final boolean repeatPartitionQueueMetrics; + + protected final boolean enableUserMetrics; + + protected static final MetricsInfo P_RECORD_INFO = + info("PartitionQueueMetrics", "Metrics for the resource scheduler"); + public static final String DEFAULT_PARTITION = + "default"; + + public QueueMetrics(MetricsSystem ms, String queueName, Queue parent, + boolean enableUserMetrics, Configuration conf) { + this(ms, queueName, parent, enableUserMetrics, conf, true); + } + protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent, - boolean enableUserMetrics, Configuration conf) { + boolean enableUserMetrics, Configuration conf, + boolean partitionQueueMetrics) { registry = new MetricsRegistry(RECORD_INFO); this.queueName = queueName; this.parent = parent != null ? parent.getMetrics() : null; this.users = enableUserMetrics ? new HashMap() : null; + this.enableUserMetrics = + enableUserMetrics; + this.repeatPartitionQueueMetrics = + partitionQueueMetrics; metricsSystem = ms; this.conf = conf; runningTime = buildBuckets(conf); if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) { this.queueMetricsForCustomResources = - new QueueMetricsForCustomResources(); + new QueueMetricsForCustomResources(); } } @@ -148,10 +190,30 @@ protected static StringBuilder sourceName(String queueName) { return sb; } - public synchronized - static QueueMetrics forQueue(String queueName, Queue parent, - boolean enableUserMetrics, - Configuration conf) { + static StringBuilder pSourceName(String partition) { + StringBuilder sb = + new StringBuilder(P_RECORD_INFO.name()); + int i = + 0; + for (String node : Q_SPLITTER.split(partition)) { + sb.append(",p").append(i++).append('=').append(node); + } + return sb; + } + + static StringBuilder qSourceName(String queueName) { + StringBuilder sb = + new StringBuilder(); + int i = + 0; + for (String node : Q_SPLITTER.split(queueName)) { + sb.append(",q").append(i++).append('=').append(node); + } + return sb; + } + + public synchronized static QueueMetrics forQueue(String queueName, + Queue parent, boolean enableUserMetrics, Configuration conf) { return forQueue(DefaultMetricsSystem.instance(), queueName, parent, enableUserMetrics, conf); } @@ -175,25 +237,29 @@ public synchronized static void clearQueueMetrics() { * * @return A string to {@link QueueMetrics} map. */ - protected static Map getQueueMetrics() { + public static Map getQueueMetrics() { return QUEUE_METRICS; } - public synchronized - static QueueMetrics forQueue(MetricsSystem ms, String queueName, - Queue parent, boolean enableUserMetrics, - Configuration conf) { + public synchronized static QueueMetrics forQueue(MetricsSystem ms, + String queueName, Queue parent, boolean enableUserMetrics, + Configuration conf) { QueueMetrics metrics = QUEUE_METRICS.get(queueName); + + System.out.print( + "part is queue is " + queueName + " metric is " + metrics + + " this fine\n"); + if (metrics == null) { metrics = - new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf). - tag(QUEUE_INFO, queueName); + new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf) + .tag(QUEUE_INFO, queueName); // Register with the MetricsSystems if (ms != null) { metrics = ms.register( - sourceName(queueName).toString(), + sourceName(queueName).toString(), "Metrics for queue: " + queueName, metrics); } QUEUE_METRICS.put(queueName, metrics); @@ -203,17 +269,111 @@ static QueueMetrics forQueue(MetricsSystem ms, String queueName, } public synchronized QueueMetrics getUserMetrics(String userName) { + System.out + .println("\nuser is " + userName + " users size is " + users + + " queue are " + + this.queueName); if (users == null) { return null; } - QueueMetrics metrics = users.get(userName); + String metricName = + this.queueName + userName; + QueueMetrics metrics = + users.get(metricName); if (metrics == null) { - metrics = new QueueMetrics(metricsSystem, queueName, null, false, conf); - users.put(userName, metrics); + metrics = + new QueueMetrics(metricsSystem, queueName, null, false, conf, false); + users.put(metricName, metrics); metricsSystem.register( sourceName(queueName).append(",user=").append(userName).toString(), "Metrics for user '"+ userName +"' in queue '"+ queueName +"'", metrics.tag(QUEUE_INFO, queueName).tag(USER_INFO, userName)); + QUEUE_METRICS.put(metricName, metrics); + } + return metrics; + } + + /** + * Partition * Queue Metrics + * + * @param partition + * @return QueueMetrics + */ + public synchronized QueueMetrics getPartitionQueueMetrics(String partition) { + if (!repeatPartitionQueueMetrics) { + return null; + } + + // Use 'default' to register into metrics system for default partition + if ((partition == null) + || (partition.equals(RMNodeLabelsManager.NO_LABEL))) { + partition = + DEFAULT_PARTITION; + } + + String metricName = + partition + this.queueName; + QueueMetrics metrics = + QUEUE_METRICS.get(metricName); + + System.out.print("part is" + partition + " queue is " + this.queueName + + " metric is " + metrics + " this " + this.toString()); + + if (metrics == null) { + metrics = + new PartitionQueueMetrics(metricsSystem, this.queueName, null, + this.enableUserMetrics, this.conf, partition, false); + metricsSystem.register( + pSourceName(partition).append(qSourceName(this.queueName)) + .toString(), + "Metrics for queue: " + this.queueName, + metrics.tag(PARTITION_INFO, partition).tag(QUEUE_INFO, + this.queueName)); + QUEUE_METRICS.put(metricName, metrics); + return metrics; + } else { + return metrics; + } + + } + + /** + * Partition Metrics + * + * @param partition + * @return QueueMetrics + */ + public synchronized QueueMetrics getPartitionMetrics(String partition) { + + /** + * Ensure metrics computation only once at Partition level rather than doing + * it even for all child queues in the queue hierarchy + */ + if (!this.queueName.equals("root")) { + return null; + } + + // Use 'default' to register into metrics system for default partition + if ((partition == null) + || (partition.equals(RMNodeLabelsManager.NO_LABEL))) { + partition = + DEFAULT_PARTITION; + } + + QueueMetrics metrics = + QUEUE_METRICS.get(partition); + if (metrics == null) { + metrics = + new PartitionQueueMetrics(metricsSystem, this.queueName, null, false, + this.conf, partition, false); + + // Register with the MetricsSystems + if (metricsSystem != null) { + metricsSystem.register(pSourceName(partition).toString(), + "Metrics for partition: " + partition, + (PartitionQueueMetrics) metrics.tag(PARTITION_INFO, partition)); + } + QUEUE_METRICS.put(partition, metrics); } return metrics; } @@ -357,15 +517,25 @@ public void moveAppTo(AppSchedulingInfo app) { * @param limit resource limit */ public void setAvailableResourcesToQueue(String partition, Resource limit) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - availableMB.set(limit.getMemorySize()); - availableVCores.set(limit.getVirtualCores()); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.setAvailable(limit); + availableMB.set(limit.getMemorySize()); + availableVCores.set(limit.getVirtualCores()); + } + + public void setAvailableResourcesToQueuePerPartition(String partition, + Resource limit) { + QueueMetrics partitionQueueMetrics = + getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.setAvailableResourcesToQueue(partition, limit); + QueueMetrics partitionMetrics = + getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.setAvailableResourcesToQueue(partition, limit); } } } + /** * Set available resources. To be called by scheduler periodically as * resources become available. @@ -382,16 +552,31 @@ public void setAvailableResourcesToQueue(Resource limit) { * @param user * @param limit resource limit */ - public void setAvailableResourcesToUser(String partition, - String user, Resource limit) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.setAvailableResourcesToQueue(partition, limit); + public void setAvailableResourcesToUser(String partition, String user, + Resource limit) { + QueueMetrics userMetrics = + getUserMetrics(user); + if (userMetrics != null) { + userMetrics.setAvailableResourcesToQueue(partition, limit); + } + } + + public void setAvailableResourcesToUserPerPartition(String partition, + String user, + Resource limit) { + QueueMetrics partitionQueueMetrics = + getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.setAvailableResourcesToUser(partition, user, limit); + QueueMetrics partitionMetrics = + getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.setAvailableResourcesToUser(partition, user, limit); } } } + /** * Increment pending resource metrics * @param partition Node Partition @@ -402,14 +587,26 @@ public void setAvailableResourcesToUser(String partition, */ public void incrPendingResources(String partition, String user, int containers, Resource res) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - _incrPendingResources(containers, res); - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.incrPendingResources(partition, user, containers, res); - } - if (parent != null) { - parent.incrPendingResources(partition, user, containers, res); + _incrPendingResources(containers, res); + QueueMetrics userMetrics = + getUserMetrics(user); + if (userMetrics != null) { + userMetrics.incrPendingResources(partition, user, containers, res); + } + if (parent != null) { + parent.incrPendingResources(partition, user, containers, res); + } + QueueMetrics partitionQueueMetrics = + getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.incrPendingResources(partition, user, containers, + res); + + QueueMetrics partitionMetrics = + getPartitionMetrics(partition); + + if (partitionMetrics != null) { + partitionMetrics.incrPendingResources(partition, user, containers, res); } } } @@ -418,32 +615,104 @@ private void _incrPendingResources(int containers, Resource res) { pendingContainers.incr(containers); pendingMB.incr(res.getMemorySize() * containers); pendingVCores.incr(res.getVirtualCores() * containers); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.increasePending(res, containers); + incrResourceTypesMetrics(PENDING_RESOURCE_METRIC_PREFIX, + PENDING_RESOURCE_METRIC_DESC, containers, res); + } + + /* + * Increment different resource metrics for all resource vectors. + * + * @param metricPrefix prefix of the metric name + * + * @param metricDesc description of the metric + * + * @param containers number of containers + * + * @param res resource containing memory size, vcores etc + */ + private void incrResourceTypesMetrics(String metricPrefix, String metricDesc, + int containers, Resource res) { + for (ResourceInformation ri : res.getResources()) { + MutableGaugeLong resourceMetric = + null; + String description = + metricDesc.replace("NAME", ri.getName()); + if (this.registry.get(metricPrefix + ri.getName()) == null) { + resourceMetric = + this.registry.newGauge(metricPrefix + ri.getName(), description, 0L); + } else { + resourceMetric = + (MutableGaugeLong) this.registry.get(metricPrefix + ri.getName()); + } + resourceMetric.incr(containers * ri.getValue()); } } - public void decrPendingResources(String partition, String user, int containers, Resource res) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - _decrPendingResources(containers, res); - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.decrPendingResources(partition, user, containers, res); - } - if (parent != null) { - parent.decrPendingResources(partition, user, containers, res); + _decrPendingResources(containers, res); + QueueMetrics userMetrics = + getUserMetrics(user); + if (userMetrics != null) { + userMetrics.decrPendingResources(partition, user, containers, res); + } + if (parent != null) { + parent.decrPendingResources(partition, user, containers, res); + } + QueueMetrics partitionQueueMetrics = + getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.decrPendingResources(partition, user, containers, + res); + + QueueMetrics partitionMetrics = + getPartitionMetrics(partition); + + if (partitionMetrics != null) { + partitionMetrics.decrPendingResources(partition, user, containers, res); } } } private void _decrPendingResources(int containers, Resource res) { + pendingContainers.decr(containers); pendingMB.decr(res.getMemorySize() * containers); pendingVCores.decr(res.getVirtualCores() * containers); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.decreasePending(res, containers); + System.out.println("step 1" + pendingMB); + decrResourceTypesMetrics(PENDING_RESOURCE_METRIC_PREFIX, + PENDING_RESOURCE_METRIC_DESC, containers, res); + } + + /* + * Decrement different resource metrics for all resource vectors. + * + * @param metricPrefix prefix of the metric name + * + * @param metricDesc description of the metric + * + * @param containers number of containers + * + * @param res resource containing memory size, vcores etc + */ + private void decrResourceTypesMetrics(String metricPrefix, String metricDesc, + int containers, Resource res) { + if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) { + for (ResourceInformation ri : res.getResources()) { + MutableGaugeLong resourceMetric = + null; + String description = + metricDesc.replace("NAME", ri.getName()); + if (this.registry.get(metricPrefix + ri.getName()) == null) { + resourceMetric = + this.registry.newGauge(metricPrefix + ri.getName(), description, + 0L); + } else { + resourceMetric = + (MutableGaugeLong) this.registry.get(metricPrefix + ri.getName()); + } + resourceMetric.decr(containers * ri.getValue()); + } } } @@ -468,28 +737,50 @@ public void incrNodeTypeAggregations(String user, NodeType type) { public void allocateResources(String partition, String user, int containers, Resource res, boolean decrPending) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - allocatedContainers.incr(containers); - aggregateContainersAllocated.incr(containers); - - allocatedMB.incr(res.getMemorySize() * containers); - allocatedVCores.incr(res.getVirtualCores() * containers); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.increaseAllocated(res, containers); + _allocateResources(containers, res, decrPending); + QueueMetrics userMetrics = + getUserMetrics(user); + if (userMetrics != null) { + userMetrics.allocateResources(partition, user, containers, res, + decrPending); + } + if (parent != null) { + parent.allocateResources(partition, user, containers, res, decrPending); + } + QueueMetrics partitionQueueMetrics = + getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.allocateResources(partition, user, containers, res, + decrPending); + QueueMetrics partitionMetrics = + getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.allocateResources(partition, user, containers, res, + decrPending); } + } + } - if (decrPending) { - _decrPendingResources(containers, res); - } - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.allocateResources(partition, user, - containers, res, decrPending); - } - if (parent != null) { - parent.allocateResources(partition, user, containers, res, decrPending); - } + /** + * Allocate Resources for a partition with support for resource vectors. + * + * @param containers number of containers + * @param res resource containing memory size, vcores etc + * @param decrPending decides whether to decrease pending resource or not + */ + + private void _allocateResources(int containers, Resource res, + boolean decrPending) { + System.out.println("Allocate resource is "); + allocatedContainers.incr(containers); + aggregateContainersAllocated.incr(containers); + allocatedMB.incr(res.getMemorySize() * containers); + allocatedVCores.incr(res.getVirtualCores() * containers); + if (decrPending) { + _decrPendingResources(containers, res); } + incrResourceTypesMetrics(ALLOCATED_RESOURCE_METRIC_PREFIX, + ALLOCATED_RESOURCE_METRIC_DESC, containers, res); } /** @@ -499,70 +790,64 @@ public void allocateResources(String partition, String user, * @param res */ public void allocateResources(String partition, String user, Resource res) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - allocatedMB.incr(res.getMemorySize()); - allocatedVCores.incr(res.getVirtualCores()); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.increaseAllocated(res); - } - pendingMB.decr(res.getMemorySize()); - pendingVCores.decr(res.getVirtualCores()); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.decreasePending(res); - } + allocatedMB.incr(res.getMemorySize()); + allocatedVCores.incr(res.getVirtualCores()); - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.allocateResources(partition, user, res); - } - if (parent != null) { - parent.allocateResources(partition, user, res); - } + pendingMB.decr(res.getMemorySize()); + pendingVCores.decr(res.getVirtualCores()); + + QueueMetrics userMetrics = + getUserMetrics(user); + if (userMetrics != null) { + userMetrics.allocateResources(partition, user, res); + } + if (parent != null) { + parent.allocateResources(partition, user, res); } } public void releaseResources(String partition, String user, int containers, Resource res) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - allocatedContainers.decr(containers); - aggregateContainersReleased.incr(containers); - allocatedMB.decr(res.getMemorySize() * containers); - allocatedVCores.decr(res.getVirtualCores() * containers); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.decreaseAllocated(res, containers); - } - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.releaseResources(partition, user, containers, res); - } - if (parent != null) { - parent.releaseResources(partition, user, containers, res); + _releaseResources(containers, res); + + QueueMetrics userMetrics = + getUserMetrics(user); + if (userMetrics != null) { + userMetrics.releaseResources(partition, user, containers, res); + } + if (parent != null) { + parent.releaseResources(partition, user, containers, res); + } + + QueueMetrics partitionQueueMetrics = + getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.releaseResources(partition, user, containers, res); + QueueMetrics partitionMetrics = + getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.releaseResources(partition, user, containers, res); } } } /** - * Release Resource for container size change. - * - * @param user - * @param res + * Release Resources for a partition with support for resource vectors. + * + * @param containers number of containers + * @param res resource containing memory size, vcores etc */ - private void releaseResources(String user, Resource res) { - allocatedMB.decr(res.getMemorySize()); - allocatedVCores.decr(res.getVirtualCores()); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.decreaseAllocated(res); - } + private void _releaseResources(int containers, Resource res) { + allocatedContainers.decr(containers); + aggregateContainersReleased.incr(containers); - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.releaseResources(user, res); - } - if (parent != null) { - parent.releaseResources(user, res); - } + allocatedMB.decr(res.getMemorySize() * containers); + allocatedVCores.decr(res.getVirtualCores() * containers); + + decrResourceTypesMetrics(ALLOCATED_RESOURCE_METRIC_PREFIX, + ALLOCATED_RESOURCE_METRIC_DESC, containers, res); } public void preemptContainer() { @@ -598,49 +883,83 @@ public void updatePreemptedSecondsForCustomResources(Resource res, } public void reserveResource(String partition, String user, Resource res) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - reserveResource(user, res); + _reserveResource(res); + + QueueMetrics userMetrics = + getUserMetrics(user); + if (userMetrics != null) { + userMetrics.reserveResource(partition, user, res); + } + if (parent != null) { + parent.reserveResource(partition, user, res); + } + + QueueMetrics partitionQueueMetrics = + getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.reserveResource(partition, user, res); + QueueMetrics partitionMetrics = + getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.reserveResource(partition, user, res); + } } } - public void reserveResource(String user, Resource res) { + /** + * Reserve Resources for a partition with support for resource vectors + * + * @param res resource containing memory size, vcores etc + */ + public void _reserveResource(Resource res) { + int containers = + 1; reservedContainers.incr(); reservedMB.incr(res.getMemorySize()); reservedVCores.incr(res.getVirtualCores()); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.increaseReserved(res); - } - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.reserveResource(user, res); - } - if (parent != null) { - parent.reserveResource(user, res); - } + incrResourceTypesMetrics(RESERVED_RESOURCE_METRIC_PREFIX, + RESERVED_RESOURCE_METRIC_DESC, containers, res); } - private void unreserveResource(String user, Resource res) { - reservedContainers.decr(); - reservedMB.decr(res.getMemorySize()); - reservedVCores.decr(res.getVirtualCores()); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.decreaseReserved(res); - } - QueueMetrics userMetrics = getUserMetrics(user); + public void unreserveResource(String partition, String user, Resource res) { + _unReserveResource(res); + QueueMetrics userMetrics = + getUserMetrics(user); if (userMetrics != null) { - userMetrics.unreserveResource(user, res); + userMetrics.unreserveResource(partition, user, res); } if (parent != null) { - parent.unreserveResource(user, res); + parent.unreserveResource(partition, user, res); } - } - public void unreserveResource(String partition, String user, Resource res) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - unreserveResource(user, res); + QueueMetrics partitionQueueMetrics = + getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.unreserveResource(partition, user, res); + + QueueMetrics partitionMetrics = + getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.unreserveResource(partition, user, res); + } } } + /** + * UnReserve Resources for a partition with support for resource vectors + * + * @param res resource containing memory size, vcores etc + */ + private void _unReserveResource(Resource res) { + int containers = + 1; + reservedContainers.decr(containers); + reservedMB.decr(res.getMemorySize()); + reservedVCores.decr(res.getVirtualCores()); + decrResourceTypesMetrics(RESERVED_RESOURCE_METRIC_PREFIX, + RESERVED_RESOURCE_METRIC_DESC, containers, res); + } + public void incrActiveUsers() { activeUsers.incr(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java index 83826650414..713d52e59c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java @@ -61,6 +61,13 @@ super(ms, queueName, parent, enableUserMetrics, conf); } + CSQueueMetrics(MetricsSystem ms, String queueName, Queue parent, + boolean enableUserMetrics, Configuration conf, + boolean partitionQueueMetrics) { + super(ms, queueName, parent, enableUserMetrics, conf, + partitionQueueMetrics); + } + public long getAMResourceLimitMB() { return AMResourceLimitMB.value(); } @@ -169,6 +176,10 @@ public synchronized static CSQueueMetrics forQueue(String queueName, Queue parent, boolean enableUserMetrics, Configuration conf) { MetricsSystem ms = DefaultMetricsSystem.instance(); QueueMetrics metrics = QueueMetrics.getQueueMetrics().get(queueName); + + System.out.print( + "part is queue is " + queueName + " metric is " + metrics + + " this fine\n"); if (metrics == null) { metrics = new CSQueueMetrics(ms, queueName, parent, enableUserMetrics, conf) @@ -188,18 +199,26 @@ public synchronized static CSQueueMetrics forQueue(String queueName, @Override public synchronized QueueMetrics getUserMetrics(String userName) { + System.out.println("\nuser is " + userName + " users size is " + users + + " queue are " + this.queueName); if (users == null) { return null; } - CSQueueMetrics metrics = (CSQueueMetrics) users.get(userName); + + String metricName = + this.queueName + userName; + CSQueueMetrics metrics = + (CSQueueMetrics) users.get(metricName); if (metrics == null) { - metrics = new CSQueueMetrics(metricsSystem, queueName, null, false, conf); - users.put(userName, metrics); + metrics = + new CSQueueMetrics(metricsSystem, queueName, null, false, conf, false); + users.put(metricName, metrics); metricsSystem.register( sourceName(queueName).append(",user=").append(userName).toString(), "Metrics for user '" + userName + "' in queue '" + queueName + "'", ((CSQueueMetrics) metrics.tag(QUEUE_INFO, queueName)).tag(USER_INFO, userName)); + getQueueMetrics().put(metricName, metrics); } return metrics; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index 5c5f30f329c..90f55a970a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -250,24 +250,38 @@ public static void updateUsedCapacity(final ResourceCalculator rc, } - private static Resource getMaxAvailableResourceToQueuePartition( + private static Resource getMaxAvailableResourceToQueue( final ResourceCalculator rc, CSQueue queue, - Resource cluster, String partition) { - // Calculate guaranteed resource for a label in a queue by below logic. - // (total label resource) * (absolute capacity of label in that queue) - Resource queueGuaranteedResource = queue.getEffectiveCapacity(partition); - - // Available resource in queue for a specific label will be calculated as - // {(guaranteed resource for a label in a queue) - - // (resource usage of that label in the queue)} - Resource available = (Resources.greaterThan(rc, cluster, - queueGuaranteedResource, - queue.getQueueResourceUsage().getUsed(partition))) ? Resources - .componentwiseMax(Resources.subtractFrom(queueGuaranteedResource, - queue.getQueueResourceUsage().getUsed(partition)), Resources - .none()) : Resources.none(); - - return available; + Resource cluster) { + Set nodeLabels = queue.getNodeLabelsForQueue(); + Resource totalAvailableResource = Resources.createResource(0, 0); + + for (String partition : nodeLabels) { + + // Calculate guaranteed resource for a label in a queue by below logic. + // (total label resource) * (absolute capacity of label in that queue) + Resource queueGuaranteedResource = + queue.getEffectiveCapacity(partition); + + // Available resource in queue for a specific label will be calculated as + // {(guaranteed resource for a label in a queue) - + // (resource usage of that label in the queue)} + // Finally accumulate this available resource to get total. + Resource available = + (Resources.greaterThan(rc, cluster, queueGuaranteedResource, + queue.getQueueResourceUsage().getUsed(partition))) + ? Resources.componentwiseMax( + Resources.subtractFrom(queueGuaranteedResource, + queue.getQueueResourceUsage().getUsed(partition)), + Resources.none()) + : Resources.none(); + + // Update Partition Queue Metrics for the given queue and a label + queue.getMetrics().setAvailableResourcesToQueuePerPartition(partition, + available); + Resources.addTo(totalAvailableResource, available); + } + return totalAvailableResource; } /** @@ -298,28 +312,16 @@ public static void updateQueueStatistics( queueResourceUsage.getNodePartitionsSet())) { updateUsedCapacity(rc, nlm.getResourceByLabel(partition, cluster), partition, childQueue); - - // Update queue metrics w.r.t node labels. - // In QueueMetrics, null label is handled the same as NO_LABEL. - // This is because queue metrics for partitions are not tracked. - // In the future, will have to change this when/if queue metrics - // for partitions also get tracked. - childQueue.getMetrics().setAvailableResourcesToQueue( - partition, - getMaxAvailableResourceToQueuePartition(rc, childQueue, - cluster, partition)); } } else { updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster), nodePartition, childQueue); - - // Same as above. - childQueue.getMetrics().setAvailableResourcesToQueue( - nodePartition, - getMaxAvailableResourceToQueuePartition(rc, childQueue, - cluster, nodePartition)); } - } + + // Same as above. + childQueue.getMetrics().setAvailableResourcesToQueue(nodePartition, + getMaxAvailableResourceToQueue(rc, childQueue, cluster)); + } /** * Updated configured capacity/max-capacity for queue. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index a178f9e9a0b..69607f1a810 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1057,7 +1057,8 @@ public CSAssignment assignContainers(Resource clusterResource, if (LOG.isDebugEnabled()) { LOG.debug("assignContainers: partition=" + candidates.getPartition() - + " #applications=" + orderingPolicy.getNumSchedulableEntities()); + + " #applications=" + orderingPolicy.getNumSchedulableEntities() + + " node=" + node.getHttpAddress() + " sche is " + schedulingMode); } setPreemptionAllowed(currentResourceLimits, candidates.getPartition()); @@ -1421,6 +1422,17 @@ private Resource getHeadroom(User user, csContext.getClusterResourceUsage().getUsed(partition)); headroom = Resources.min(resourceCalculator, clusterPartitionResource, clusterFreePartitionResource, headroom); + + if (LOG.isDebugEnabled()) { + LOG.debug("Headroom calculation for user " + user + ": " + + " user limit = " + userLimitResource + " user consumed = " + + user.getUsed(partition) + " clusterPartitionResource=" + + clusterPartitionResource + + " queueMaxAvailRes=" + currentPartitionResourceLimit + + " consumed=" + queueUsage.getUsed(partition) + " partition=" + + partition + " headroom=" + headroom); + } + return headroom; } @@ -1463,7 +1475,7 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, + userLimit + " queueMaxAvailRes=" + cachedResourceLimitsForHeadroom.getLimit() + " consumed=" + queueUser.getUsed() + " partition=" - + nodePartition); + + nodePartition + " headroom=" + headroom); } CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider( @@ -1471,8 +1483,12 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, application.setHeadroomProvider(headroomProvider); - metrics.setAvailableResourcesToUser(nodePartition, user, headroom); + metrics.setAvailableResourcesToUserPerPartition(nodePartition, user, + headroom); + metrics.setAvailableResourcesToUser(nodePartition, user, + getUserTotalHeadroomAcrossPartitions(queueUser, clusterResource)); + return userLimit; } @@ -1724,12 +1740,16 @@ void allocateResource(Resource clusterResource, User user = usersManager.updateUserResourceUsage(userName, resource, nodePartition, true); + // Note this is a bit unconventional since it gets the object and modifies // it here, rather then using set routine Resources.subtractFrom(application.getHeadroom(), resource); // headroom - metrics.setAvailableResourcesToUser(nodePartition, + metrics.setAvailableResourcesToUserPerPartition(nodePartition, userName, application.getHeadroom()); + metrics.setAvailableResourcesToUser(nodePartition, userName, + getUserTotalHeadroomAcrossPartitions(user, clusterResource)); + if (LOG.isDebugEnabled()) { LOG.debug(getQueueName() + " user=" + userName + " used=" + queueUsage.getUsed(nodePartition) + " numContainers=" @@ -1741,6 +1761,31 @@ void allocateResource(Resource clusterResource, } } + private Resource getUserTotalHeadroomAcrossPartitions(User user, + Resource clusterResource) { + + Resource totalHeadroom = + Resources.createResource(0, 0); + if (metrics.getUserMetrics(user.getUserName()) != null) { + Resource partitionHeadroom = + Resources.createResource(0, 0); + for (String partition : Sets.union(queueCapacities.getNodePartitionsSet(), + this.queueUsage.getNodePartitionsSet())) { + + // Can SchedulingMode be always RESPECT_PARTITION_EXCLUSIVITY? Is it + // correct? requires in getting max user limit for calculation + partitionHeadroom = + getHeadroom(user, cachedResourceLimitsForHeadroom.getLimit(), + clusterResource, + getResourceLimitForActiveUsers(user.getUserName(), clusterResource, + partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), + partition); + Resources.addTo(totalHeadroom, partitionHeadroom); + } + } + return totalHeadroom; + } + void releaseResource(Resource clusterResource, FiCaSchedulerApp application, Resource resource, String nodePartition, RMContainer rmContainer) { @@ -1767,8 +1812,21 @@ void releaseResource(Resource clusterResource, User user = usersManager.updateUserResourceUsage(userName, resource, nodePartition, false); - metrics.setAvailableResourcesToUser(nodePartition, - userName, application.getHeadroom()); + Resource partitionHeadroom = + Resources.createResource(0, 0); + if (metrics.getUserMetrics(userName) != null) { + partitionHeadroom = + getHeadroom(user, cachedResourceLimitsForHeadroom.getLimit(), + clusterResource, + getResourceLimitForActiveUsers(userName, clusterResource, + nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), + nodePartition); + } + metrics.setAvailableResourcesToUserPerPartition(nodePartition, + userName, partitionHeadroom); + + metrics.setAvailableResourcesToUser(nodePartition, userName, + getUserTotalHeadroomAcrossPartitions(user, clusterResource)); if (LOG.isDebugEnabled()) { LOG.debug( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java index b0700e8bf41..0c30c01536b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -498,6 +499,7 @@ public Resource getComputedResourceLimitForActiveUsers(String userName, try { userLimitPerSchedulingMode = preComputedActiveUserLimit.get(nodePartition); + if (isRecomputeNeeded(schedulingMode, nodePartition, true)) { // recompute userLimitPerSchedulingMode = reComputeUserLimits(userName, @@ -554,6 +556,7 @@ public Resource getComputedResourceLimitForAllUsers(String userName, writeLock.lock(); try { userLimitPerSchedulingMode = preComputedAllUserLimit.get(nodePartition); + if (isRecomputeNeeded(schedulingMode, nodePartition, false)) { // recompute userLimitPerSchedulingMode = reComputeUserLimits(userName, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java index e69de29bb2d..2276cc52af0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java @@ -0,0 +1,704 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueMetrics; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.assertGauge; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; + +@RunWith(MockitoJUnitRunner.class) +public class TestPartitionQueueMetrics { + + static final int GB = 1024; // MB + private static final Configuration conf = new Configuration(); + + private MetricsSystem ms; + + @Before + public void setUp() { + ms = new MetricsSystemImpl(); + QueueMetrics.clearQueueMetrics(); + PartitionQueueMetrics.clearQueueMetrics(); + } + + @After + public void tearDown() { + ms.shutdown(); + + } + + /** + * Structure: + * + * Both queues, q1 & q2 has been configured to run in only 1 partition, x + * + * root + * / \ + * q1 q2 + * + * @throws Exception + */ + + @Test + public void testSinglePartitionWithSingleLevelQueueMetrics() + throws Exception { + + String parentQueueName = "root"; + Queue parentQueue = mock(Queue.class); + String user = "alice"; + + CSQueueMetrics root = (CSQueueMetrics) CSQueueMetrics.forQueue( + "root", null, true, conf); + when(parentQueue.getMetrics()).thenReturn(root); + CSQueueMetrics q1 = (CSQueueMetrics) CSQueueMetrics.forQueue( + "root.q1", parentQueue, true, conf); + CSQueueMetrics q2 = (CSQueueMetrics) CSQueueMetrics.forQueue( + "root.q2", parentQueue, true, conf); + + AppSchedulingInfo app = mockApp(user); + + q1.submitApp(user); + q1.submitAppAttempt(user); + + q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1)); + + MetricsSource partitionSource = partitionSource(q1.getMetricsSystem(), "x"); + MetricsSource rootQueueSource = + queueSource(q1.getMetricsSystem(), "x", parentQueueName); + MetricsSource q1Source= queueSource(q1.getMetricsSystem(), "x", "root.q1"); + + //checkResources(partitionSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 0, 0, 2*GB, 2, 2); + + + q2.incrPendingResources( + "x", user, 3, Resource.newInstance(1024, 1)); + MetricsSource q2Source = + queueSource(q2.getMetricsSystem(), "x", "root.q2"); + + //checkResources(partitionSource, 0, 0, 0, 0, 0, 5*GB, 5, 5); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 5*GB, 5, 5); + checkResources(q2Source, 0, 0, 0, 0, 0, 3*GB, 3, 3); + } + + /** + * Structure: + * + * Both queues, q1 & q2 has been configured to run in both partitions, x & y + * + * root + * / \ + * q1 q2 + * + * @throws Exception + */ + @Test + public void testTwoPartitionWithSingleLevelQueueMetrics() + throws Exception { + + String parentQueueName = "root"; + Queue parentQueue = mock(Queue.class); + String user = "alice"; + + QueueMetrics root = QueueMetrics.forQueue(ms, "root", null, true, conf); + when(parentQueue.getMetrics()).thenReturn(root); + QueueMetrics q1 = QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, conf); + QueueMetrics q2 = QueueMetrics.forQueue(ms, "root.q2", parentQueue, true, conf); + q1.incrPendingResources( + "x", user, 2, Resource.newInstance(1024, 1)); + + MetricsSource xPartitionSource = partitionSource(ms, "x"); + MetricsSource xRootQueueSource = queueSource(ms, "x", parentQueueName); + MetricsSource q1Source = queueSource(ms, "x", "root.q1"); + + checkResources(xPartitionSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(xRootQueueSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 0, 0, 2*GB, 2, 2); + + q2.incrPendingResources( + "y", user, 3, Resource.newInstance(1024, 1)); + MetricsSource yPartitionSource = partitionSource(ms, "y"); + MetricsSource yRootQueueSource = queueSource(ms, "y", parentQueueName); + MetricsSource q2Source = queueSource(ms, "y", "root.q2"); + + checkResources(yPartitionSource, 0, 0, 0, 0, 0, 3*GB, 3, 3); + checkResources(yRootQueueSource, 0, 0, 0, 0, 0, 3*GB, 3, 3); + checkResources(q2Source, 0, 0, 0, 0, 0, 3*GB, 3, 3); + } + + /** + * Structure: + * + * Both queues, q1 has been configured to run in multiple partitions, x & y + * + * root + * / + * q1 + * + * @throws Exception + */ + @Test + public void testMultiplePartitionWithSingleQueueMetrics() + throws Exception { + + String parentQueueName = "root"; + Queue parentQueue = mock(Queue.class); + + QueueMetrics root = + QueueMetrics.forQueue(ms, "root", null, true, conf); + when(parentQueue.getMetrics()).thenReturn(root); + QueueMetrics q1 = + QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, conf); + q1.incrPendingResources( + "x", "test_user", 2, Resource.newInstance(1024, 1)); + + MetricsSource partitionSource = partitionSource(ms, "x"); + MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName); + MetricsSource q1Source = queueSource(ms, "x", "root.q1"); + MetricsSource userSource = userSource(ms, "x", "test_user", "root.q1"); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(userSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + + q1.incrPendingResources( + "x", "test_user", 3, Resource.newInstance(1024, 1)); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 5*GB, 5, 5); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 5*GB, 5, 5); + checkResources(q1Source, 0, 0, 0, 0, 0, 5*GB, 5, 5); + checkResources(userSource, 0, 0, 0, 0, 0, 5*GB, 5, 5); + + q1.incrPendingResources( + "x", "test_user1", 4, Resource.newInstance(1024, 1)); + MetricsSource userSource1 = userSource(ms, "x", "test_user1", "root.q1"); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 9*GB, 9, 9); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 9*GB, 9, 9); + checkResources(q1Source, 0, 0, 0, 0, 0, 9*GB, 9, 9); + checkResources(userSource1, 0, 0, 0, 0, 0, 4*GB, 4, 4); + + q1.incrPendingResources( + "y", "test_user1", 6, Resource.newInstance(1024, 1)); + MetricsSource partitionSourceY = partitionSource(ms, "y"); + MetricsSource rootQueueSourceY = queueSource(ms, "y", parentQueueName); + MetricsSource q1SourceY = queueSource(ms, "y", "root.q1"); + MetricsSource userSourceY = userSource(ms, "y", "test_user1", "root.q1"); + + checkResources(partitionSourceY, 0, 0, 0, 0, 0, 6*GB, 6, 6); + checkResources(rootQueueSourceY, 0, 0, 0, 0, 0, 6*GB, 6, 6); + checkResources(q1SourceY, 0, 0, 0, 0, 0, 6*GB, 6, 6); + checkResources(userSourceY, 0, 0, 0, 0, 0, 6*GB, 6, 6); + } + + /** + * Structure: + * + * Both queues, q1 & q2 has been configured to run in both partitions, x & y + * + * root + * / \ + * q1 q2 + * / \ / \ + * q11 q12 q21 q22 + * + * @throws Exception + */ + + @Test + public void testMultiplePartitionsWithMultiLevelQueuesMetrics() + throws Exception { + + String parentQueueName = "root"; + Queue parentQueue = mock(Queue.class); + Queue childQueue1 = mock(Queue.class); + Queue childQueue2 = mock(Queue.class); + + QueueMetrics root = + QueueMetrics.forQueue(ms, "root", null, true,conf); + when(parentQueue.getMetrics()).thenReturn(root); + QueueMetrics q1 = + QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, conf); + when(childQueue1.getMetrics()).thenReturn(q1); + QueueMetrics q11 = QueueMetrics.forQueue( + ms, "root.q1.q11", childQueue1, true, conf); + QueueMetrics q12 = QueueMetrics.forQueue( + ms, "root.q1.q12", childQueue1, true, conf); + QueueMetrics q2 = QueueMetrics.forQueue( + ms, "root.q2", parentQueue, true, conf); + when(childQueue2.getMetrics()).thenReturn(q2); + QueueMetrics q21 = QueueMetrics.forQueue( + ms, "root.q2.q21", childQueue2, true, conf); + QueueMetrics q22 = QueueMetrics.forQueue( + ms, "root.q2.q22", childQueue2, true, conf); + + q11.incrPendingResources( + "x", "test_user", 2, Resource.newInstance(1024, 1)); + + MetricsSource partitionSource = partitionSource(ms, "x"); + MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName); + MetricsSource q1Source = queueSource(ms, "x", "root.q1"); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 0, 0, 2*GB, 2, 2); + + q11.incrPendingResources( + "x", "test_user", 4, Resource.newInstance(1024, 1)); + MetricsSource q11Source = queueSource(ms, "x", "root.q1.q11"); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 6*GB, 6, 6); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 6*GB, 6, 6); + checkResources(q11Source, 0, 0, 0, 0, 0, 6*GB, 6, 6); + checkResources(q1Source, 0, 0, 0, 0, 0, 6*GB, 6, 6); + + q12.incrPendingResources( + "x", "test_user", 5, Resource.newInstance(1024, 1)); + MetricsSource q12Source = queueSource(ms, "x", "root.q1.q12"); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 11*GB, 11, 11); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 11*GB, 11, 11); + checkResources(q1Source, 0, 0, 0, 0, 0, 11*GB, 11, 11); + checkResources(q12Source, 0, 0, 0, 0, 0, 5*GB, 5, 5); + + q12.incrPendingResources( + "y", "test_user", 3, Resource.newInstance(1024, 1)); + MetricsSource yPartitionSource = partitionSource(ms, "y"); + MetricsSource yRootQueueSource = queueSource(ms, "y", parentQueueName); + MetricsSource q1YSource = queueSource(ms, "y", "root.q1"); + MetricsSource q12YSource = queueSource(ms, "y", "root.q1.q12"); + + checkResources(yPartitionSource, 0, 0, 0, 0, 0, 3*GB, 3, 3); + checkResources(yRootQueueSource, 0, 0, 0, 0, 0, 3*GB, 3, 3); + checkResources(q1YSource, 0, 0, 0, 0, 0, 3*GB, 3, 3); + checkResources(q12YSource, 0, 0, 0, 0, 0, 3*GB, 3, 3); + + q21.incrPendingResources( + "y", "test_user", 5, Resource.newInstance(1024, 1)); + MetricsSource q21Source= queueSource(ms, "y", "root.q2.q21"); + MetricsSource q2YSource= queueSource(ms, "y", "root.q2"); + + checkResources(yPartitionSource, 0, 0, 0, 0, 0, 8*GB, 8, 8); + checkResources(yRootQueueSource, 0, 0, 0, 0, 0, 8*GB, 8, 8); + checkResources(q2YSource, 0, 0, 0, 0, 0, 5*GB, 5, 5); + checkResources(q21Source, 0, 0, 0, 0, 0, 5*GB, 5, 5); + + q22.incrPendingResources( + "y", "test_user", 6, Resource.newInstance(1024, 1)); + MetricsSource q22Source = queueSource(ms, "y", "root.q2.q22"); + + checkResources(yPartitionSource, 0, 0, 0, 0, 0, 14*GB, 14, 14); + checkResources(yRootQueueSource, 0, 0, 0, 0, 0, 14*GB, 14, 14); + checkResources(q22Source, 0, 0, 0, 0, 0, 6*GB, 6, 6); + + } + + @Test + public void testTwoLevelWithUserMetrics() { + String parentQueueName = "root"; + String leafQueueName = "root.leaf"; + String user = "alice"; + String partition = "x"; + + CSQueueMetrics parentMetrics = + (CSQueueMetrics) CSQueueMetrics.forQueue( + parentQueueName, null, true, conf); + Queue parentQueue = mock(Queue.class); + when(parentQueue.getMetrics()).thenReturn(parentMetrics); + CSQueueMetrics metrics = + (CSQueueMetrics) CSQueueMetrics.forQueue( + leafQueueName, parentQueue, true, conf); + AppSchedulingInfo app = mockApp(user); + + metrics.submitApp(user); + metrics.submitAppAttempt(user); + + parentMetrics.setAvailableResourcesToQueue(partition, + Resources.createResource(100*GB, 100)); + metrics.setAvailableResourcesToQueue(partition, + Resources.createResource(100*GB, 100)); + parentMetrics.setAvailableResourcesToUser(partition, + user, Resources.createResource(10*GB, 10)); + metrics.setAvailableResourcesToUser(partition, + user, Resources.createResource(10*GB, 10)); + metrics.incrPendingResources(partition, + user, 6, Resources.createResource(3*GB, 3)); + + MetricsSource partitionSource = + partitionSource(metrics.getMetricsSystem(), partition); + MetricsSource parentQueueSource = + queueSource(metrics.getMetricsSystem(), partition, parentQueueName); + MetricsSource queueSource = + queueSource(metrics.getMetricsSystem(), partition, leafQueueName); + MetricsSource userSource = + userSource(metrics.getMetricsSystem(), partition, user, leafQueueName); + MetricsSource userSource1 = + userSource(metrics.getMetricsSystem(), partition, user, + parentQueueName); + + checkResources(queueSource, + 0, 0, 0, 0, 0, 100*GB, 100, 18*GB, 18, 6, 0, 0, 0); + checkResources(parentQueueSource, + 0, 0, 0, 0, 0, 100*GB, 100, 18*GB, 18, 6, 0, 0, 0); + checkResources(userSource, + 0, 0, 0, 0, 0, 10*GB, 10, 18*GB, 18, 6, 0, 0, 0); + checkResources(userSource1, + 0, 0, 0, 0, 0, 10*GB, 10, 18*GB, 18, 6, 0, 0, 0); + checkResources(partitionSource, + 0, 0, 0, 0, 0, 100*GB, 100, 18*GB, 18, 6, 0, 0, 0); + + metrics.runAppAttempt(app.getApplicationId(), user); + + metrics.allocateResources(partition, + user, 3, Resources.createResource(1*GB, 1), true); + metrics.reserveResource(partition, + user, Resources.createResource(3*GB, 3)); + + // Available resources is set externally, as it depends on dynamic + // configurable cluster/queue resources + checkResources(queueSource, + 3*GB, 3, 3, 3, 0, 100*GB, 100, 15*GB, 15, 3, 3*GB, 3, 1); + checkResources(parentQueueSource, + 3*GB, 3, 3, 3, 0, 100*GB, 100, 15*GB, 15, 3, 3*GB, 3, 1); + checkResources(partitionSource, + 3*GB, 3, 3, 3, 0, 100*GB, 100, 15*GB, 15, 3, 3*GB, 3, 1); + + metrics.allocateResources(partition, + user, 3, Resources.createResource(1*GB, 1), true); + + checkResources(queueSource, + 6*GB, 6, 6, 6, 0, 100*GB, 100, 12*GB, 12, 0, 3*GB, 3, 1); + checkResources(parentQueueSource, + 6*GB, 6, 6, 6, 0, 100*GB, 100, 12*GB, 12, 0, 3*GB, 3, 1); + + metrics.releaseResources(partition, + user, 1, Resources.createResource(2*GB, 2)); + metrics.unreserveResource(partition, + user, Resources.createResource(3*GB, 3)); + checkResources(queueSource, + 4*GB, 4, 5, 6, 1, 100*GB, 100, 12*GB, 12, 0, 0, 0, 0); + checkResources(parentQueueSource, + 4*GB, 4, 5, 6, 1, 100*GB, 100, 12*GB, 12, 0, 0, 0, 0); + checkResources(partitionSource, + 4*GB, 4, 5, 6, 1, 100*GB, 100, 12*GB, 12, 0, 0, 0, 0); + + metrics.finishAppAttempt( + app.getApplicationId(), app.isPending(), app.getUser()); + + metrics.finishApp(user, RMAppState.FINISHED); + } + + @Test + public void testThreeLevelWithUserMetrics() { + String parentQueueName = "root"; + String leafQueueName = "root.leaf"; + String leafQueueName1 = "root.leaf.leaf1"; + String user = "alice"; + String partitionX = "x"; + String partitionY = "y"; + + CSQueueMetrics parentMetrics = + (CSQueueMetrics) CSQueueMetrics.forQueue( + parentQueueName, null, true, conf); + Queue parentQueue = mock(Queue.class); + when(parentQueue.getMetrics()).thenReturn(parentMetrics); + CSQueueMetrics metrics = + (CSQueueMetrics) CSQueueMetrics.forQueue( + leafQueueName, parentQueue, true, conf); + Queue leafQueue = mock(Queue.class); + when(leafQueue.getMetrics()).thenReturn(metrics); + CSQueueMetrics metrics1 = + (CSQueueMetrics) CSQueueMetrics.forQueue( + leafQueueName1, leafQueue, true, conf); + AppSchedulingInfo app = mockApp(user); + + metrics1.submitApp(user); + metrics1.submitAppAttempt(user); + + parentMetrics.setAvailableResourcesToQueue(partitionX, + Resources.createResource(200*GB, 200)); + parentMetrics.setAvailableResourcesToQueue(partitionY, + Resources.createResource(500*GB, 500)); + metrics.setAvailableResourcesToQueue(partitionX, + Resources.createResource(100*GB, 100)); + metrics.setAvailableResourcesToQueue(partitionY, + Resources.createResource(400*GB, 400)); + metrics1.setAvailableResourcesToQueue(partitionX, + Resources.createResource(50*GB, 50)); + metrics1.setAvailableResourcesToQueue(partitionY, + Resources.createResource(300*GB, 300)); + parentMetrics.setAvailableResourcesToUser(partitionX, + user, Resources.createResource(20*GB, 20)); + parentMetrics.setAvailableResourcesToUser(partitionY, + user, Resources.createResource(50*GB, 50)); + metrics.setAvailableResourcesToUser(partitionX, + user, Resources.createResource(10*GB, 10)); + metrics.setAvailableResourcesToUser(partitionY, + user, Resources.createResource(40*GB, 40)); + metrics1.setAvailableResourcesToUser(partitionX, + user, Resources.createResource(5*GB, 5)); + metrics1.setAvailableResourcesToUser(partitionY, + user, Resources.createResource(30*GB, 30)); + metrics1.incrPendingResources(partitionX, + user, 6, Resources.createResource(3*GB, 3)); + metrics1.incrPendingResources(partitionY, + user, 6, Resources.createResource(4*GB, 4)); + + MetricsSource partitionSourceX = + partitionSource(metrics1.getMetricsSystem(), partitionX); + + MetricsSource parentQueueSource_X = + queueSource(metrics1.getMetricsSystem(), partitionX, parentQueueName); + MetricsSource queueSource_X = + queueSource(metrics1.getMetricsSystem(), partitionX, leafQueueName); + MetricsSource queueSource1_X = + queueSource(metrics1.getMetricsSystem(), partitionX, leafQueueName1); + MetricsSource parentUserSource_X = + userSource(metrics1.getMetricsSystem(), partitionX, user, parentQueueName); + MetricsSource userSource_X = + userSource(metrics1.getMetricsSystem(), partitionX, user, leafQueueName); + MetricsSource userSource1_X = + userSource(metrics1.getMetricsSystem(), partitionX, user, leafQueueName1); + + checkResources(partitionSourceX, + 0, 0, 0, 0, 0, 200*GB, 200, 18*GB, 18, 6, 0, 0, 0); + checkResources(parentQueueSource_X, + 0, 0, 0, 0, 0, 200*GB, 200, 18*GB, 18, 6, 0, 0, 0); + checkResources(queueSource_X, + 0, 0, 0, 0, 0, 100*GB, 100, 18*GB, 18, 6, 0, 0, 0); + checkResources(queueSource1_X, + 0, 0, 0, 0, 0, 50*GB, 50, 18*GB, 18, 6, 0, 0, 0); + //checkResources(parentUserSource_X, + // 0, 0, 0, 0, 0, 20*GB, 20, 18*GB, 18, 6, 0, 0, 0); + //checkResources(userSource_X, + // 0, 0, 0, 0, 0, 10*GB, 10, 18*GB, 18, 6, 0, 0, 0); + checkResources(userSource1_X, + 0, 0, 0, 0, 0, 5*GB, 5, 18*GB, 18, 6, 0, 0, 0); + + MetricsSource partitionSourceY = + partitionSource(metrics1.getMetricsSystem(), partitionY); + + MetricsSource parentQueueSource_Y = + queueSource(metrics1.getMetricsSystem(), partitionY, + parentQueueName); + MetricsSource queueSource_Y = + queueSource(metrics1.getMetricsSystem(), partitionY, + leafQueueName); + MetricsSource queueSource1_Y = + queueSource(metrics1.getMetricsSystem(), partitionY, + leafQueueName1); + MetricsSource parentUserSource_Y = + userSource(metrics1.getMetricsSystem(), partitionY, user, + parentQueueName); + MetricsSource userSource_Y = + userSource(metrics1.getMetricsSystem(), partitionY, user, + leafQueueName); + MetricsSource userSource1_Y = + userSource(metrics1.getMetricsSystem(), partitionY, user, + leafQueueName1); + + checkResources(partitionSourceY, + 0, 0, 0, 0, 0, 500*GB, 500, 24*GB, 24, 6, 0, 0, 0); + checkResources(parentQueueSource_Y, + 0, 0, 0, 0, 0, 500*GB, 500, 24*GB, 24, 6, 0, 0, 0); + checkResources(queueSource_Y, + 0, 0, 0, 0, 0, 400*GB, 400, 24*GB, 24, 6, 0, 0, 0); + checkResources(queueSource1_Y, + 0, 0, 0, 0, 0, 300*GB, 300, 24*GB, 24, 6, 0, 0, 0); + //checkResources(parentUserSource_Y, + // 0, 0, 0, 0, 0, 50*GB, 50, 24*GB, 24, 6, 0, 0, 0); + //checkResources(userSource_Y, + // 0, 0, 0, 0, 0, 40*GB, 40, 24*GB, 24, 6, 0, 0, 0); + checkResources(userSource1_Y, + 0, 0, 0, 0, 0, 30*GB, 30, 24*GB, 24, 6, 0, 0, 0); + + metrics1.finishAppAttempt( + app.getApplicationId(), app.isPending(), app.getUser()); + + metrics1.finishApp(user, RMAppState.FINISHED); + } + + /** + * Structure: + * + * Both queues, q1 & q2 has been configured to run in only 1 partition, x + * UserMetrics has been disabled, hence trying to access the user source + * throws NPE from sources + * + * root + * / \ + * q1 q2 + * + * @throws Exception + */ + @Test(expected = NullPointerException.class) + public void testSinglePartitionWithSingleLevelQueueMetricsWithoutUserMetrics() + throws Exception { + + String parentQueueName = "root"; + Queue parentQueue = mock(Queue.class); + String user = "alice"; + + CSQueueMetrics root = (CSQueueMetrics) CSQueueMetrics.forQueue( + "root", null, false, conf); + when(parentQueue.getMetrics()).thenReturn(root); + CSQueueMetrics q1 = (CSQueueMetrics) CSQueueMetrics.forQueue( + "root.q1", parentQueue, false, conf); + CSQueueMetrics q2 = (CSQueueMetrics) CSQueueMetrics.forQueue( + "root.q2", parentQueue, false, conf); + + AppSchedulingInfo app = mockApp(user); + + q1.submitApp(user); + q1.submitAppAttempt(user); + + q1.incrPendingResources( + "x", user, 2, Resource.newInstance(1024, 1)); + + MetricsSource partitionSource = + partitionSource(q1.getMetricsSystem(), "x"); + MetricsSource rootQueueSource = + queueSource(q1.getMetricsSystem(), "x", parentQueueName); + MetricsSource q1Source= queueSource(q1.getMetricsSystem(), "x", "root.q1"); + MetricsSource q1UserSource= + userSource(q1.getMetricsSystem(), "x", user, "root.q1"); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(q1UserSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + + q2.incrPendingResources( + "x", user, 3, Resource.newInstance(1024, 1)); + MetricsSource q2Source = + queueSource(q2.getMetricsSystem(), "x", "root.q2"); + MetricsSource q2UserSource= + userSource(q1.getMetricsSystem(), "x", user, "root.q2"); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 5*GB, 5, 5); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 5*GB, 5, 5); + checkResources(q2Source, 0, 0, 0, 0, 0, 3*GB, 3, 3); + checkResources(q2UserSource, 0, 0, 0, 0, 0, 3*GB, 3, 3); + + q1.finishAppAttempt( + app.getApplicationId(), app.isPending(), app.getUser()); + q1.finishApp(user, RMAppState.FINISHED); + } + + /** + * Structure: + * + * Both queues, q1 & q2 has been configured to run in only 1 partition, x + * UserMetrics has been disabled, hence trying to access the user source + * throws NPE from sources + * + * root + * / \ + * q1 q2 + * + * @throws Exception + */ + @Test + public void testNewPartition() + throws Exception { + + PartitionQueueMetrics metrics = + (PartitionQueueMetrics) PartitionQueueMetrics.forQueue( + ms, "root.q2", null, false, conf, "x"); + + MetricsSource partitionSource = partitionSource(ms, "x"); + + checkResources(partitionSource, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0); + } + + public static MetricsSource partitionSource(MetricsSystem ms, + String partition) { + MetricsSource s = ms.getSource( + PartitionQueueMetrics.pSourceName(partition).toString()); + return s; + } + + public static MetricsSource queueSource(MetricsSystem ms, + String partition, String queue) { + MetricsSource s = ms.getSource(PartitionQueueMetrics.pSourceName(partition). + append(PartitionQueueMetrics.qSourceName(queue)).toString()); + return s; + } + + public static MetricsSource userSource(MetricsSystem ms, + String partition, String user, String queue) { + MetricsSource s = ms.getSource(PartitionQueueMetrics.pSourceName(partition). + append(PartitionQueueMetrics.qSourceName(queue)).append(",user="). + append(user).toString()); + return s; + } + + public static void checkResources(MetricsSource source, long allocatedMB, + int allocatedCores, int allocCtnrs, long availableMB, + int availableCores, long pendingMB, int pendingCores, + int pendingCtnrs) { + MetricsRecordBuilder rb = getMetrics(source); + assertGauge("AllocatedMB", allocatedMB, rb); + assertGauge("AllocatedVCores", allocatedCores, rb); + assertGauge("AllocatedContainers", allocCtnrs, rb); + assertGauge("AvailableMB", availableMB, rb); + assertGauge("AvailableVCores", availableCores, rb); + assertGauge("PendingMB", pendingMB, rb); + assertGauge("PendingVCores", pendingCores, rb); + assertGauge("PendingContainers", pendingCtnrs, rb); + assertGauge("PendingResource.memory-mb", pendingMB, rb); + assertGauge("PendingResource.vcores", Long.valueOf(pendingCores), rb); + } + + private static AppSchedulingInfo mockApp(String user) { + AppSchedulingInfo app = mock(AppSchedulingInfo.class); + when(app.getUser()).thenReturn(user); + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + ApplicationAttemptId id = BuilderUtils.newApplicationAttemptId(appId, 1); + when(app.getApplicationAttemptId()).thenReturn(id); + return app; + } + + public static void checkResources(MetricsSource source, long allocatedMB, + int allocatedCores, int allocCtnrs, long aggreAllocCtnrs, + long aggreReleasedCtnrs, long availableMB, int availableCores, + long pendingMB, int pendingCores, int pendingCtnrs, long reservedMB, + int reservedCores, int reservedCtnrs) { + MetricsRecordBuilder rb = getMetrics(source); + assertGauge("AllocatedMB", allocatedMB, rb); + assertGauge("AllocatedVCores", allocatedCores, rb); + assertGauge("AllocatedContainers", allocCtnrs, rb); + assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb); + assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb); + assertGauge("AvailableMB", availableMB, rb); + assertGauge("AvailableVCores", availableCores, rb); + assertGauge("PendingMB", pendingMB, rb); + assertGauge("PendingVCores", pendingCores, rb); + assertGauge("PendingContainers", pendingCtnrs, rb); + assertGauge("ReservedMB", reservedMB, rb); + assertGauge("ReservedVCores", reservedCores, rb); + assertGauge("ReservedContainers", reservedCtnrs, rb); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 737db5b0d65..e0c4d66057a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import static org.apache.hadoop.test.MetricsAsserts.assertGauge; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; import java.util.ArrayList; @@ -25,13 +27,21 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; @@ -44,9 +54,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; @@ -59,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -2084,6 +2097,8 @@ public void testQueueMetricsWithLabelsOnDefaultLabelNode() throws Exception { csConf.setCapacityByLabel(queueB, "x", 50); csConf.setMaximumCapacityByLabel(queueB, "x", 50); + csConf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); + // set node -> label mgr.addToCluserNodeLabels( ImmutableSet.of(NodeLabel.newInstance("x", false))); @@ -2102,6 +2117,85 @@ public RMNodeLabelsManager createNodeLabelManager() { rm1.start(); MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = + + + CapacityScheduler cs = + (CapacityScheduler) rm1.getResourceScheduler(); + + RMNode rmNode1 = + rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + SchedulerNode schedulerNode1 = + cs.getSchedulerNode(nm1.getNodeId()); + + RMNode rmNode2 = + rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + SchedulerNode schedulerNode2 = + cs.getSchedulerNode(nm2.getNodeId()); + + for (int i = + 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + + for (int i = + 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + double delta = + 0.0001; + CSQueue leafQueue = + cs.getQueue("a"); + CSQueue leafQueueB = + cs.getQueue("b"); + CSQueue rootQueue = + cs.getRootQueue(); + assertEquals(20 * GB, rootQueue.getMetrics().getAvailableMB(), delta); + assertEquals(7.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); + assertEquals(12.5 * GB, leafQueueB.getMetrics().getAvailableMB(), delta); + + Map metrics = QueueMetrics.getQueueMetrics(); + + QueueMetrics partXMetrics = + metrics.get("x"); + + QueueMetrics partDefaultMetrics = + metrics.get("default"); + + QueueMetrics queueAMetrics = + metrics.get("root.a"); + + QueueMetrics queueBMetrics = + metrics.get("root.b"); + + QueueMetrics queueAPartDefaultMetrics = + metrics.get("defaultroot.a"); + + QueueMetrics queueBPartDefaultMetrics = + metrics.get("defaultroot.b"); + + QueueMetrics queueAPartXMetrics = + metrics.get("xroot.a"); + + QueueMetrics queueBPartXMetrics = + metrics.get("xroot.b"); + + QueueMetrics rootMetrics = + metrics.get("root"); + + assertEquals(10 * GB, partXMetrics.getAvailableMB(), delta); + assertEquals(10 * GB, partDefaultMetrics.getAvailableMB(), delta); + assertEquals(2.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(7.5 * GB, queueBPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(5 * GB, queueAPartXMetrics.getAvailableMB(), delta); + assertEquals(5 * GB, queueBPartXMetrics.getAvailableMB(), delta); + + assertEquals(20 * GB, rootMetrics.getAvailableMB(), delta); + assertEquals(7.5 * GB, queueAMetrics.getAvailableMB(), delta); + assertEquals(12.5 * GB, queueBMetrics.getAvailableMB(), delta); + // app1 -> a RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2); @@ -2110,14 +2204,15 @@ public RMNodeLabelsManager createNodeLabelManager() { am1.allocate("*", 1 * GB, 3, new ArrayList()); // NM1 do 50 heartbeats - CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); - RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - - SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); for (int i = 0; i < 50; i++) { cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); } + for (int i = + 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + // app1 gets all resource in partition=x (non-exclusive) Assert.assertEquals(3, schedulerNode1.getNumContainers()); @@ -2132,35 +2227,97 @@ public RMNodeLabelsManager createNodeLabelManager() { Assert.assertEquals(1 * GB, reportNm2.getUsedResource().getMemorySize()); Assert.assertEquals(9 * GB, reportNm2.getAvailableResource().getMemorySize()); + + assertEquals(7 * GB, partXMetrics.getAvailableMB(), delta); + assertEquals(9 * GB, partDefaultMetrics.getAvailableMB(), delta); + + assertEquals(1.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(1 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta); + assertEquals(7.5 * GB, queueBPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta); + assertEquals(5 * GB, queueBPartXMetrics.getAvailableMB(), delta); - LeafQueue leafQueue = (LeafQueue) cs.getQueue("a"); + assertEquals(4 * GB, queueAMetrics.getAllocatedMB(), delta); + assertEquals(3.5 * GB, queueAMetrics.getAvailableMB(), delta); + assertEquals(0 * GB, queueBMetrics.getAllocatedMB(), delta); + assertEquals(12.5 * GB, queueBMetrics.getAvailableMB(), delta); + + assertEquals(0 * GB, queueAMetrics.getPendingMB(), delta); + assertEquals(3 * GB, queueAPartDefaultMetrics.getPendingMB(), delta); + assertEquals(-3 * GB, queueAPartXMetrics.getPendingMB(), delta); + assertEquals(0 * GB, queueBPartDefaultMetrics.getPendingMB(), delta); + assertEquals(0 * GB, queueBPartXMetrics.getPendingMB(), delta); + // 3GB is used from label x quota. 1.5 GB is remaining from default label. // 2GB is remaining from label x. - assertEquals(15 * GB / 10, leafQueue.getMetrics().getAvailableMB()); - assertEquals(1 * GB, leafQueue.getMetrics().getAllocatedMB()); - + assertEquals(3.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); + assertEquals(4 * GB, leafQueue.getMetrics().getAllocatedMB()); + + Map metrics1 = QueueMetrics.getQueueMetrics(); + + QueueMetrics partDefaultQueueAUserMetrics = + metrics1.get("defaultroot.auser"); + + QueueMetrics partXQueueAUserMetrics = metrics1.get("xroot.auser"); + + QueueMetrics queueAUserMetrics = metrics1.get("root.auser"); + + assertEquals(4 * GB, queueAUserMetrics.getAllocatedMB(), delta); + assertEquals(4 * GB, queueAUserMetrics.getAvailableMB(), delta); + + assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta); + assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta); + assertEquals(2 * GB, partXQueueAUserMetrics.getAvailableMB(), delta); + // app1 asks for 1 default partition container am1.allocate("*", 1 * GB, 5, new ArrayList()); - // NM2 do couple of heartbeats - RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); - - SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId()); cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } // app1 gets all resource in default partition - Assert.assertEquals(2, schedulerNode2.getNumContainers()); + Assert.assertEquals(4, schedulerNode2.getNumContainers()); // 3GB is used from label x quota. 2GB used from default label. - // So 0.5 GB is remaining from default label. - assertEquals(5 * GB / 10, leafQueue.getMetrics().getAvailableMB()); - assertEquals(2 * GB, leafQueue.getMetrics().getAllocatedMB()); + // So total 2.5 GB is remaining. + assertEquals(0 * GB, leafQueue.getMetrics().getAvailableMB(), delta); + assertEquals(9 * GB, leafQueue.getMetrics().getAllocatedMB()); + + rm1.killApp(app1.getApplicationId()); + rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } - // The total memory tracked by QueueMetrics is 10GB - // for the default partition - CSQueue rootQueue = cs.getRootQueue(); - assertEquals(10*GB, rootQueue.getMetrics().getAvailableMB() + - rootQueue.getMetrics().getAllocatedMB()); + assertEquals(20 * GB, rootQueue.getMetrics().getAvailableMB(), delta); + assertEquals(7.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); + assertEquals(12.5 * GB, leafQueueB.getMetrics().getAvailableMB(), delta); + assertEquals(9, queueAMetrics.getAggregateAllocatedContainers()); + assertEquals(9, queueAMetrics.getAggegatedReleasedContainers()); + assertEquals(4, queueAPartDefaultMetrics.getAggregateAllocatedContainers()); + assertEquals(4, queueAPartDefaultMetrics.getAggegatedReleasedContainers()); + assertEquals(5, queueAPartXMetrics.getAggregateAllocatedContainers()); + assertEquals(5, queueAPartXMetrics.getAggegatedReleasedContainers()); + assertEquals(2.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(5 * GB, queueAPartXMetrics.getAvailableMB(), delta); + assertEquals(8 * GB, queueAUserMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), delta); + assertEquals(5 * GB, partXQueueAUserMetrics.getAvailableMB(), delta); rm1.close(); }