diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java index e69de29bb2d..cd03aec829c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java @@ -0,0 +1,78 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueMetrics; + +@Metrics(context = "yarn") +public class PartitionQueueMetrics extends QueueMetrics { + + private String partition; + + protected PartitionQueueMetrics(MetricsSystem ms, String queueName, + Queue parent, boolean enableUserMetrics, Configuration conf, + String partition) { + super(ms, queueName, parent, enableUserMetrics, conf); + this.partition = partition; + } + + protected PartitionQueueMetrics(MetricsSystem ms, String queueName, + Queue parent, boolean enableUserMetrics, Configuration conf, String partition, + boolean partitionQueueMetrics) { + super(ms, queueName, parent, enableUserMetrics, conf, partitionQueueMetrics); + this.partition = partition; + } + + /** + * Partition * Queue * User Metrics + * + * @param userName Name of the user + * @return QueueMetrics + */ + @Override + public synchronized QueueMetrics getUserMetrics(String userName) { + if (users == null) { + return null; + } + + String metricName = partition + this.queueName + userName; + QueueMetrics metrics = (PartitionQueueMetrics) users.get(metricName); + if (metrics == null) { + metrics = new PartitionQueueMetrics(this.metricsSystem, + this.queueName, null, false, this.conf, this.partition, false); + users.put(metricName, metrics); + metricsSystem.register(pSourceName(partition).append(qSourceName(queueName)). + append(",user=").append(userName).toString(), + "Metrics for user '" + userName + "' in queue '" + queueName + "'", + ((PartitionQueueMetrics) metrics.tag(PARTITION_INFO, partition). + tag(QUEUE_INFO, queueName)).tag(USER_INFO, + userName)); + } + return metrics; + } + + public synchronized static PartitionQueueMetrics forQueue( + MetricsSystem ms, String queueName, Queue parent, + boolean enableUserMetrics, Configuration conf, String partition) { + QueueMetrics metrics = getQueueMetrics().get(partition); + if (metrics == null) { + metrics = + new PartitionQueueMetrics(ms, queueName, parent, enableUserMetrics, + conf, partition, false); + + // Register with the MetricsSystems + if (ms != null) { + metrics = + ms.register(pSourceName(partition).toString(), + "Metrics for partition: " + partition, + (PartitionQueueMetrics) metrics.tag(PARTITION_INFO, partition)); + } + getQueueMetrics().put(partition, metrics); + } + + return (PartitionQueueMetrics) metrics; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index c126338ef0d..808858f1935 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -43,6 +43,7 @@ import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @@ -86,16 +87,31 @@ MutableCounterLong aggregateContainersAllocated; @Metric("Aggregate # of released containers") MutableCounterLong aggregateContainersReleased; + private static String ALLOCATED_RESOURCE_METRIC_PREFIX = + "AllocatedResource."; + private static String ALLOCATED_RESOURCE_METRIC_DESC = + "Allocated NAME"; @Metric("Available memory in MB") MutableGaugeLong availableMB; @Metric("Available CPU in virtual cores") MutableGaugeInt availableVCores; + private static String AVAILABLE_RESOURCE_METRIC_PREFIX = + "AvailableResource."; + private static String AVAILABLE_RESOURCE_METRIC_DESC = + "Available NAME"; @Metric("Pending memory allocation in MB") MutableGaugeLong pendingMB; @Metric("Pending CPU allocation in virtual cores") MutableGaugeInt pendingVCores; @Metric("# of pending containers") MutableGaugeInt pendingContainers; + private static String PENDING_RESOURCE_METRIC_PREFIX = + "PendingResource."; + private static String PENDING_RESOURCE_METRIC_DESC = + "Pending NAME allocation"; @Metric("# of reserved memory in MB") MutableGaugeLong reservedMB; @Metric("Reserved CPU in virtual cores") MutableGaugeInt reservedVCores; @Metric("# of reserved containers") MutableGaugeInt reservedContainers; - + private static String RESERVED_RESOURCE_METRIC_PREFIX = + "ReservedResource."; + private static String RESERVED_RESOURCE_METRIC_DESC = + "Reserved NAME"; private final MutableGaugeInt[] runningTime; private TimeBucketMetrics runBuckets; @@ -106,6 +122,8 @@ info("Queue", "Metrics by queue"); protected static final MetricsInfo USER_INFO = info("User", "Metrics by user"); + protected static final MetricsInfo PARTITION_INFO = + info("Partition", "Metrics by partition"); static final Splitter Q_SPLITTER = Splitter.on('.').omitEmptyStrings().trimResults(); @@ -117,20 +135,43 @@ protected final Configuration conf; private QueueMetricsForCustomResources queueMetricsForCustomResources; + /** + * + * Ensure Partition * Queue Metrics flow doesn't get into infinite loops + + */ + protected final boolean repeatPartitionQueueMetrics; + + protected final boolean enableUserMetrics; + + protected static final MetricsInfo P_RECORD_INFO = + info("PartitionQueueMetrics", "Metrics for the resource scheduler"); + public static final String DEFAULT_PARTITION = + "default"; + + public QueueMetrics(MetricsSystem ms, String queueName, Queue parent, + boolean enableUserMetrics, Configuration conf) { + this(ms, queueName, parent, enableUserMetrics, conf, true); + } + protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent, - boolean enableUserMetrics, Configuration conf) { + boolean enableUserMetrics, Configuration conf, + boolean partitionQueueMetrics) { registry = new MetricsRegistry(RECORD_INFO); this.queueName = queueName; this.parent = parent != null ? parent.getMetrics() : null; this.users = enableUserMetrics ? new HashMap() : null; + this.enableUserMetrics = + enableUserMetrics; + this.repeatPartitionQueueMetrics = + partitionQueueMetrics; metricsSystem = ms; this.conf = conf; runningTime = buildBuckets(conf); if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) { this.queueMetricsForCustomResources = - new QueueMetricsForCustomResources(); + new QueueMetricsForCustomResources(); } } @@ -148,10 +189,30 @@ protected static StringBuilder sourceName(String queueName) { return sb; } - public synchronized - static QueueMetrics forQueue(String queueName, Queue parent, - boolean enableUserMetrics, - Configuration conf) { + static StringBuilder pSourceName(String partition) { + StringBuilder sb = + new StringBuilder(P_RECORD_INFO.name()); + int i = + 0; + for (String node : Q_SPLITTER.split(partition)) { + sb.append(",p").append(i++).append('=').append(node); + } + return sb; + } + + static StringBuilder qSourceName(String queueName) { + StringBuilder sb = + new StringBuilder(); + int i = + 0; + for (String node : Q_SPLITTER.split(queueName)) { + sb.append(",q").append(i++).append('=').append(node); + } + return sb; + } + + public synchronized static QueueMetrics forQueue(String queueName, + Queue parent, boolean enableUserMetrics, Configuration conf) { return forQueue(DefaultMetricsSystem.instance(), queueName, parent, enableUserMetrics, conf); } @@ -179,21 +240,20 @@ public synchronized static void clearQueueMetrics() { return QUEUE_METRICS; } - public synchronized - static QueueMetrics forQueue(MetricsSystem ms, String queueName, - Queue parent, boolean enableUserMetrics, - Configuration conf) { + public synchronized static QueueMetrics forQueue(MetricsSystem ms, + String queueName, Queue parent, boolean enableUserMetrics, + Configuration conf) { QueueMetrics metrics = QUEUE_METRICS.get(queueName); if (metrics == null) { metrics = - new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf). - tag(QUEUE_INFO, queueName); + new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf) + .tag(QUEUE_INFO, queueName); // Register with the MetricsSystems if (ms != null) { metrics = ms.register( - sourceName(queueName).toString(), + sourceName(queueName).toString(), "Metrics for queue: " + queueName, metrics); } QUEUE_METRICS.put(queueName, metrics); @@ -208,7 +268,8 @@ public synchronized QueueMetrics getUserMetrics(String userName) { } QueueMetrics metrics = users.get(userName); if (metrics == null) { - metrics = new QueueMetrics(metricsSystem, queueName, null, false, conf); + metrics = + new QueueMetrics(metricsSystem, queueName, null, false, conf, false); users.put(userName, metrics); metricsSystem.register( sourceName(queueName).append(",user=").append(userName).toString(), @@ -218,6 +279,89 @@ public synchronized QueueMetrics getUserMetrics(String userName) { return metrics; } + /** + * Partition * Queue Metrics + * + * @param partition + * @return QueueMetrics + */ + public synchronized QueueMetrics getPartitionQueueMetrics(String partition) { + if (!repeatPartitionQueueMetrics) { + return null; + } + + // Use 'default' to register into metrics system for default partition + if ((partition == null) + || (partition.equals(RMNodeLabelsManager.NO_LABEL))) { + partition = + DEFAULT_PARTITION; + } + + String metricName = + partition + this.queueName; + QueueMetrics metrics = + QUEUE_METRICS.get(metricName); + + if (metrics == null) { + QueueMetrics queueMetrics = + new PartitionQueueMetrics(metricsSystem, this.queueName, null, + this.enableUserMetrics, this.conf, partition, false); + metrics = + metricsSystem.register( + pSourceName(partition).append(qSourceName(this.queueName)) + .toString(), + "Metrics for queue: " + this.queueName, + queueMetrics.tag(PARTITION_INFO, partition).tag(QUEUE_INFO, + this.queueName)); + QUEUE_METRICS.put(metricName, queueMetrics); + return queueMetrics; + } else { + return metrics; + } + + } + + /** + * Partition Metrics + * + * @param partition + * @return QueueMetrics + */ + public synchronized QueueMetrics getPartitionMetrics(String partition) { + + /** + * Ensure metrics computation only once at Partition level rather than doing + * it even for all child queues in the queue hierarchy + */ + if (!this.queueName.equals("root")) { + return null; + } + + // Use 'default' to register into metrics system for default partition + if ((partition == null) + || (partition.equals(RMNodeLabelsManager.NO_LABEL))) { + partition = + DEFAULT_PARTITION; + } + + QueueMetrics metrics = + QUEUE_METRICS.get(partition); + if (metrics == null) { + metrics = + new PartitionQueueMetrics(metricsSystem, this.queueName, null, false, + this.conf, partition, false); + + // Register with the MetricsSystems + if (metricsSystem != null) { + metricsSystem.register(pSourceName(partition).toString(), + "Metrics for partition: " + partition, + (PartitionQueueMetrics) metrics.tag(PARTITION_INFO, partition)); + } + QUEUE_METRICS.put(partition, metrics); + } + return metrics; + } + private ArrayList parseInts(String value) { ArrayList result = new ArrayList(); for(String s: value.split(",")) { @@ -357,15 +501,48 @@ public void moveAppTo(AppSchedulingInfo app) { * @param limit resource limit */ public void setAvailableResourcesToQueue(String partition, Resource limit) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - availableMB.set(limit.getMemorySize()); - availableVCores.set(limit.getVirtualCores()); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.setAvailable(limit); + + _setAvailableResources(limit); + + QueueMetrics partitionQueueMetrics = + getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.setAvailableResourcesToQueue(partition, limit); + QueueMetrics partitionMetrics = + getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.setAvailableResourcesToQueue(partition, limit); } } } + /** + * Set Available resources with support for resource vectors. + * + * @param limit + */ + public void _setAvailableResources(Resource limit) { + availableMB.set(limit.getMemorySize()); + availableVCores.set(limit.getVirtualCores()); + for (ResourceInformation ri : limit.getResources()) { + MutableGaugeLong availableResource = + null; + String description = + AVAILABLE_RESOURCE_METRIC_DESC.replace("NAME", ri.getName()); + if (this.registry + .get(AVAILABLE_RESOURCE_METRIC_PREFIX + ri.getName()) == null) { + availableResource = + this.registry.newGauge( + AVAILABLE_RESOURCE_METRIC_PREFIX + ri.getName(), description, 0L); + } else { + availableResource = + (MutableGaugeLong) this.registry + .get(AVAILABLE_RESOURCE_METRIC_PREFIX + ri.getName()); + } + availableResource.set(ri.getValue()); + } + } + /** * Set available resources. To be called by scheduler periodically as * resources become available. @@ -384,14 +561,24 @@ public void setAvailableResourcesToQueue(Resource limit) { */ public void setAvailableResourcesToUser(String partition, String user, Resource limit) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.setAvailableResourcesToQueue(partition, limit); - } + QueueMetrics userMetrics = + getUserMetrics(user); + if (userMetrics != null) { + userMetrics.setAvailableResourcesToQueue(partition, limit); + } + + if (parent != null) { + parent.setAvailableResourcesToUser(partition, user, limit); + } + + QueueMetrics partitionQueueMetrics = + getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.setAvailableResourcesToUser(partition, user, limit); } } + /** * Increment pending resource metrics * @param partition Node Partition @@ -402,14 +589,27 @@ public void setAvailableResourcesToUser(String partition, */ public void incrPendingResources(String partition, String user, int containers, Resource res) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - _incrPendingResources(containers, res); - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.incrPendingResources(partition, user, containers, res); - } - if (parent != null) { - parent.incrPendingResources(partition, user, containers, res); + + _incrPendingResources(containers, res); + QueueMetrics userMetrics = + getUserMetrics(user); + if (userMetrics != null) { + userMetrics.incrPendingResources(partition, user, containers, res); + } + if (parent != null) { + parent.incrPendingResources(partition, user, containers, res); + } + QueueMetrics partitionQueueMetrics = + getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.incrPendingResources(partition, user, containers, + res); + + QueueMetrics partitionMetrics = + getPartitionMetrics(partition); + + if (partitionMetrics != null) { + partitionMetrics.incrPendingResources(partition, user, containers, res); } } } @@ -418,22 +618,62 @@ private void _incrPendingResources(int containers, Resource res) { pendingContainers.incr(containers); pendingMB.incr(res.getMemorySize() * containers); pendingVCores.incr(res.getVirtualCores() * containers); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.increasePending(res, containers); + incrResourceTypesMetrics(PENDING_RESOURCE_METRIC_PREFIX, + PENDING_RESOURCE_METRIC_DESC, containers, res); + } + + /* + * Increment different resource metrics for all resource vectors. + * + * @param metricPrefix prefix of the metric name + * + * @param metricDesc description of the metric + * + * @param containers number of containers + * + * @param res resource containing memory size, vcores etc + */ + private void incrResourceTypesMetrics(String metricPrefix, String metricDesc, + int containers, Resource res) { + for (ResourceInformation ri : res.getResources()) { + MutableGaugeLong resourceMetric = + null; + String description = + metricDesc.replace("NAME", ri.getName()); + if (this.registry.get(metricPrefix + ri.getName()) == null) { + resourceMetric = + this.registry.newGauge(metricPrefix + ri.getName(), description, 0L); + } else { + resourceMetric = + (MutableGaugeLong) this.registry.get(metricPrefix + ri.getName()); + } + resourceMetric.incr(containers * ri.getValue()); } } - public void decrPendingResources(String partition, String user, int containers, Resource res) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - _decrPendingResources(containers, res); - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.decrPendingResources(partition, user, containers, res); - } - if (parent != null) { - parent.decrPendingResources(partition, user, containers, res); + + _decrPendingResources(containers, res); + QueueMetrics userMetrics = + getUserMetrics(user); + if (userMetrics != null) { + userMetrics.decrPendingResources(partition, user, containers, res); + } + if (parent != null) { + parent.decrPendingResources(partition, user, containers, res); + } + QueueMetrics partitionQueueMetrics = + getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.decrPendingResources(partition, user, containers, + res); + + QueueMetrics partitionMetrics = + getPartitionMetrics(partition); + + if (partitionMetrics != null) { + partitionMetrics.decrPendingResources(partition, user, containers, res); } } } @@ -442,8 +682,39 @@ private void _decrPendingResources(int containers, Resource res) { pendingContainers.decr(containers); pendingMB.decr(res.getMemorySize() * containers); pendingVCores.decr(res.getVirtualCores() * containers); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.decreasePending(res, containers); + decrResourceTypesMetrics(PENDING_RESOURCE_METRIC_PREFIX, + PENDING_RESOURCE_METRIC_DESC, containers, res); + } + + /* + * Decrement different resource metrics for all resource vectors. + * + * @param metricPrefix prefix of the metric name + * + * @param metricDesc description of the metric + * + * @param containers number of containers + * + * @param res resource containing memory size, vcores etc + */ + private void decrResourceTypesMetrics(String metricPrefix, String metricDesc, + int containers, Resource res) { + if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) { + for (ResourceInformation ri : res.getResources()) { + MutableGaugeLong resourceMetric = + null; + String description = + metricDesc.replace("NAME", ri.getName()); + if (this.registry.get(metricPrefix + ri.getName()) == null) { + resourceMetric = + this.registry.newGauge(metricPrefix + ri.getName(), description, + 0L); + } else { + resourceMetric = + (MutableGaugeLong) this.registry.get(metricPrefix + ri.getName()); + } + resourceMetric.decr(containers * ri.getValue()); + } } } @@ -468,30 +739,52 @@ public void incrNodeTypeAggregations(String user, NodeType type) { public void allocateResources(String partition, String user, int containers, Resource res, boolean decrPending) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - allocatedContainers.incr(containers); - aggregateContainersAllocated.incr(containers); - - allocatedMB.incr(res.getMemorySize() * containers); - allocatedVCores.incr(res.getVirtualCores() * containers); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.increaseAllocated(res, containers); - } - if (decrPending) { - _decrPendingResources(containers, res); - } - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.allocateResources(partition, user, - containers, res, decrPending); - } - if (parent != null) { - parent.allocateResources(partition, user, containers, res, decrPending); + _allocateResources(containers, res, decrPending); + QueueMetrics userMetrics = + getUserMetrics(user); + if (userMetrics != null) { + userMetrics.allocateResources(partition, user, containers, res, + decrPending); + } + if (parent != null) { + parent.allocateResources(partition, user, containers, res, decrPending); + } + QueueMetrics partitionQueueMetrics = + getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.allocateResources(partition, user, containers, res, + decrPending); + QueueMetrics partitionMetrics = + getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.allocateResources(partition, user, containers, res, + decrPending); } } } + /** + * Allocate Resources for a partition with support for resource vectors. + * + * @param containers number of containers + * @param res resource containing memory size, vcores etc + * @param decrPending decides whether to decrease pending resource or not + */ + + private void _allocateResources(int containers, Resource res, + boolean decrPending) { + allocatedContainers.incr(containers); + aggregateContainersAllocated.incr(containers); + allocatedMB.incr(res.getMemorySize() * containers); + allocatedVCores.incr(res.getVirtualCores() * containers); + if (decrPending) { + _decrPendingResources(containers, res); + } + incrResourceTypesMetrics(ALLOCATED_RESOURCE_METRIC_PREFIX, + ALLOCATED_RESOURCE_METRIC_DESC, containers, res); + } + /** * Allocate Resource for container size change. * @param partition Node Partition @@ -499,70 +792,64 @@ public void allocateResources(String partition, String user, * @param res */ public void allocateResources(String partition, String user, Resource res) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - allocatedMB.incr(res.getMemorySize()); - allocatedVCores.incr(res.getVirtualCores()); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.increaseAllocated(res); - } - pendingMB.decr(res.getMemorySize()); - pendingVCores.decr(res.getVirtualCores()); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.decreasePending(res); - } + allocatedMB.incr(res.getMemorySize()); + allocatedVCores.incr(res.getVirtualCores()); - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.allocateResources(partition, user, res); - } - if (parent != null) { - parent.allocateResources(partition, user, res); - } + pendingMB.decr(res.getMemorySize()); + pendingVCores.decr(res.getVirtualCores()); + + QueueMetrics userMetrics = + getUserMetrics(user); + if (userMetrics != null) { + userMetrics.allocateResources(partition, user, res); + } + if (parent != null) { + parent.allocateResources(partition, user, res); } } public void releaseResources(String partition, String user, int containers, Resource res) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - allocatedContainers.decr(containers); - aggregateContainersReleased.incr(containers); - allocatedMB.decr(res.getMemorySize() * containers); - allocatedVCores.decr(res.getVirtualCores() * containers); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.decreaseAllocated(res, containers); - } - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.releaseResources(partition, user, containers, res); - } - if (parent != null) { - parent.releaseResources(partition, user, containers, res); + _releaseResources(containers, res); + + QueueMetrics userMetrics = + getUserMetrics(user); + if (userMetrics != null) { + userMetrics.releaseResources(partition, user, containers, res); + } + if (parent != null) { + parent.releaseResources(partition, user, containers, res); + } + + QueueMetrics partitionQueueMetrics = + getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.releaseResources(partition, user, containers, res); + QueueMetrics partitionMetrics = + getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.releaseResources(partition, user, containers, res); } } } /** - * Release Resource for container size change. - * - * @param user - * @param res + * Release Resources for a partition with support for resource vectors. + * + * @param containers number of containers + * @param res resource containing memory size, vcores etc */ - private void releaseResources(String user, Resource res) { - allocatedMB.decr(res.getMemorySize()); - allocatedVCores.decr(res.getVirtualCores()); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.decreaseAllocated(res); - } + private void _releaseResources(int containers, Resource res) { + allocatedContainers.decr(containers); + aggregateContainersReleased.incr(containers); - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.releaseResources(user, res); - } - if (parent != null) { - parent.releaseResources(user, res); - } + allocatedMB.decr(res.getMemorySize() * containers); + allocatedVCores.decr(res.getVirtualCores() * containers); + + decrResourceTypesMetrics(ALLOCATED_RESOURCE_METRIC_PREFIX, + ALLOCATED_RESOURCE_METRIC_DESC, containers, res); } public void preemptContainer() { @@ -598,49 +885,83 @@ public void updatePreemptedSecondsForCustomResources(Resource res, } public void reserveResource(String partition, String user, Resource res) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - reserveResource(user, res); + _reserveResource(res); + + QueueMetrics userMetrics = + getUserMetrics(user); + if (userMetrics != null) { + userMetrics.reserveResource(partition, user, res); + } + if (parent != null) { + parent.reserveResource(partition, user, res); + } + + QueueMetrics partitionQueueMetrics = + getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.reserveResource(partition, user, res); + QueueMetrics partitionMetrics = + getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.reserveResource(partition, user, res); + } } } - public void reserveResource(String user, Resource res) { + /** + * Reserve Resources for a partition with support for resource vectors + * + * @param res resource containing memory size, vcores etc + */ + public void _reserveResource(Resource res) { + int containers = + 1; reservedContainers.incr(); reservedMB.incr(res.getMemorySize()); reservedVCores.incr(res.getVirtualCores()); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.increaseReserved(res); - } - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.reserveResource(user, res); - } - if (parent != null) { - parent.reserveResource(user, res); - } + incrResourceTypesMetrics(RESERVED_RESOURCE_METRIC_PREFIX, + RESERVED_RESOURCE_METRIC_DESC, containers, res); } - private void unreserveResource(String user, Resource res) { - reservedContainers.decr(); - reservedMB.decr(res.getMemorySize()); - reservedVCores.decr(res.getVirtualCores()); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.decreaseReserved(res); - } - QueueMetrics userMetrics = getUserMetrics(user); + public void unreserveResource(String partition, String user, Resource res) { + _unReserveResource(res); + QueueMetrics userMetrics = + getUserMetrics(user); if (userMetrics != null) { - userMetrics.unreserveResource(user, res); + userMetrics.unreserveResource(partition, user, res); } if (parent != null) { - parent.unreserveResource(user, res); + parent.unreserveResource(partition, user, res); } - } - public void unreserveResource(String partition, String user, Resource res) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - unreserveResource(user, res); + QueueMetrics partitionQueueMetrics = + getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.unreserveResource(partition, user, res); + + QueueMetrics partitionMetrics = + getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.unreserveResource(partition, user, res); + } } } + /** + * UnReserve Resources for a partition with support for resource vectors + * + * @param res resource containing memory size, vcores etc + */ + private void _unReserveResource(Resource res) { + int containers = + 1; + reservedContainers.decr(containers); + reservedMB.decr(res.getMemorySize()); + reservedVCores.decr(res.getVirtualCores()); + decrResourceTypesMetrics(RESERVED_RESOURCE_METRIC_PREFIX, + RESERVED_RESOURCE_METRIC_DESC, containers, res); + } + public void incrActiveUsers() { activeUsers.incr(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java index 83826650414..b29432df4ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java @@ -61,6 +61,13 @@ super(ms, queueName, parent, enableUserMetrics, conf); } + CSQueueMetrics(MetricsSystem ms, String queueName, Queue parent, + boolean enableUserMetrics, Configuration conf, + boolean partitionQueueMetrics) { + super(ms, queueName, parent, enableUserMetrics, conf, + partitionQueueMetrics); + } + public long getAMResourceLimitMB() { return AMResourceLimitMB.value(); } @@ -193,7 +200,8 @@ public synchronized QueueMetrics getUserMetrics(String userName) { } CSQueueMetrics metrics = (CSQueueMetrics) users.get(userName); if (metrics == null) { - metrics = new CSQueueMetrics(metricsSystem, queueName, null, false, conf); + metrics = + new CSQueueMetrics(metricsSystem, queueName, null, false, conf, false); users.put(userName, metrics); metricsSystem.register( sourceName(queueName).append(",user=").append(userName).toString(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java index e69de29bb2d..2276cc52af0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java @@ -0,0 +1,704 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueMetrics; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.assertGauge; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; + +@RunWith(MockitoJUnitRunner.class) +public class TestPartitionQueueMetrics { + + static final int GB = 1024; // MB + private static final Configuration conf = new Configuration(); + + private MetricsSystem ms; + + @Before + public void setUp() { + ms = new MetricsSystemImpl(); + QueueMetrics.clearQueueMetrics(); + PartitionQueueMetrics.clearQueueMetrics(); + } + + @After + public void tearDown() { + ms.shutdown(); + + } + + /** + * Structure: + * + * Both queues, q1 & q2 has been configured to run in only 1 partition, x + * + * root + * / \ + * q1 q2 + * + * @throws Exception + */ + + @Test + public void testSinglePartitionWithSingleLevelQueueMetrics() + throws Exception { + + String parentQueueName = "root"; + Queue parentQueue = mock(Queue.class); + String user = "alice"; + + CSQueueMetrics root = (CSQueueMetrics) CSQueueMetrics.forQueue( + "root", null, true, conf); + when(parentQueue.getMetrics()).thenReturn(root); + CSQueueMetrics q1 = (CSQueueMetrics) CSQueueMetrics.forQueue( + "root.q1", parentQueue, true, conf); + CSQueueMetrics q2 = (CSQueueMetrics) CSQueueMetrics.forQueue( + "root.q2", parentQueue, true, conf); + + AppSchedulingInfo app = mockApp(user); + + q1.submitApp(user); + q1.submitAppAttempt(user); + + q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1)); + + MetricsSource partitionSource = partitionSource(q1.getMetricsSystem(), "x"); + MetricsSource rootQueueSource = + queueSource(q1.getMetricsSystem(), "x", parentQueueName); + MetricsSource q1Source= queueSource(q1.getMetricsSystem(), "x", "root.q1"); + + //checkResources(partitionSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 0, 0, 2*GB, 2, 2); + + + q2.incrPendingResources( + "x", user, 3, Resource.newInstance(1024, 1)); + MetricsSource q2Source = + queueSource(q2.getMetricsSystem(), "x", "root.q2"); + + //checkResources(partitionSource, 0, 0, 0, 0, 0, 5*GB, 5, 5); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 5*GB, 5, 5); + checkResources(q2Source, 0, 0, 0, 0, 0, 3*GB, 3, 3); + } + + /** + * Structure: + * + * Both queues, q1 & q2 has been configured to run in both partitions, x & y + * + * root + * / \ + * q1 q2 + * + * @throws Exception + */ + @Test + public void testTwoPartitionWithSingleLevelQueueMetrics() + throws Exception { + + String parentQueueName = "root"; + Queue parentQueue = mock(Queue.class); + String user = "alice"; + + QueueMetrics root = QueueMetrics.forQueue(ms, "root", null, true, conf); + when(parentQueue.getMetrics()).thenReturn(root); + QueueMetrics q1 = QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, conf); + QueueMetrics q2 = QueueMetrics.forQueue(ms, "root.q2", parentQueue, true, conf); + q1.incrPendingResources( + "x", user, 2, Resource.newInstance(1024, 1)); + + MetricsSource xPartitionSource = partitionSource(ms, "x"); + MetricsSource xRootQueueSource = queueSource(ms, "x", parentQueueName); + MetricsSource q1Source = queueSource(ms, "x", "root.q1"); + + checkResources(xPartitionSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(xRootQueueSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 0, 0, 2*GB, 2, 2); + + q2.incrPendingResources( + "y", user, 3, Resource.newInstance(1024, 1)); + MetricsSource yPartitionSource = partitionSource(ms, "y"); + MetricsSource yRootQueueSource = queueSource(ms, "y", parentQueueName); + MetricsSource q2Source = queueSource(ms, "y", "root.q2"); + + checkResources(yPartitionSource, 0, 0, 0, 0, 0, 3*GB, 3, 3); + checkResources(yRootQueueSource, 0, 0, 0, 0, 0, 3*GB, 3, 3); + checkResources(q2Source, 0, 0, 0, 0, 0, 3*GB, 3, 3); + } + + /** + * Structure: + * + * Both queues, q1 has been configured to run in multiple partitions, x & y + * + * root + * / + * q1 + * + * @throws Exception + */ + @Test + public void testMultiplePartitionWithSingleQueueMetrics() + throws Exception { + + String parentQueueName = "root"; + Queue parentQueue = mock(Queue.class); + + QueueMetrics root = + QueueMetrics.forQueue(ms, "root", null, true, conf); + when(parentQueue.getMetrics()).thenReturn(root); + QueueMetrics q1 = + QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, conf); + q1.incrPendingResources( + "x", "test_user", 2, Resource.newInstance(1024, 1)); + + MetricsSource partitionSource = partitionSource(ms, "x"); + MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName); + MetricsSource q1Source = queueSource(ms, "x", "root.q1"); + MetricsSource userSource = userSource(ms, "x", "test_user", "root.q1"); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(userSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + + q1.incrPendingResources( + "x", "test_user", 3, Resource.newInstance(1024, 1)); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 5*GB, 5, 5); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 5*GB, 5, 5); + checkResources(q1Source, 0, 0, 0, 0, 0, 5*GB, 5, 5); + checkResources(userSource, 0, 0, 0, 0, 0, 5*GB, 5, 5); + + q1.incrPendingResources( + "x", "test_user1", 4, Resource.newInstance(1024, 1)); + MetricsSource userSource1 = userSource(ms, "x", "test_user1", "root.q1"); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 9*GB, 9, 9); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 9*GB, 9, 9); + checkResources(q1Source, 0, 0, 0, 0, 0, 9*GB, 9, 9); + checkResources(userSource1, 0, 0, 0, 0, 0, 4*GB, 4, 4); + + q1.incrPendingResources( + "y", "test_user1", 6, Resource.newInstance(1024, 1)); + MetricsSource partitionSourceY = partitionSource(ms, "y"); + MetricsSource rootQueueSourceY = queueSource(ms, "y", parentQueueName); + MetricsSource q1SourceY = queueSource(ms, "y", "root.q1"); + MetricsSource userSourceY = userSource(ms, "y", "test_user1", "root.q1"); + + checkResources(partitionSourceY, 0, 0, 0, 0, 0, 6*GB, 6, 6); + checkResources(rootQueueSourceY, 0, 0, 0, 0, 0, 6*GB, 6, 6); + checkResources(q1SourceY, 0, 0, 0, 0, 0, 6*GB, 6, 6); + checkResources(userSourceY, 0, 0, 0, 0, 0, 6*GB, 6, 6); + } + + /** + * Structure: + * + * Both queues, q1 & q2 has been configured to run in both partitions, x & y + * + * root + * / \ + * q1 q2 + * / \ / \ + * q11 q12 q21 q22 + * + * @throws Exception + */ + + @Test + public void testMultiplePartitionsWithMultiLevelQueuesMetrics() + throws Exception { + + String parentQueueName = "root"; + Queue parentQueue = mock(Queue.class); + Queue childQueue1 = mock(Queue.class); + Queue childQueue2 = mock(Queue.class); + + QueueMetrics root = + QueueMetrics.forQueue(ms, "root", null, true,conf); + when(parentQueue.getMetrics()).thenReturn(root); + QueueMetrics q1 = + QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, conf); + when(childQueue1.getMetrics()).thenReturn(q1); + QueueMetrics q11 = QueueMetrics.forQueue( + ms, "root.q1.q11", childQueue1, true, conf); + QueueMetrics q12 = QueueMetrics.forQueue( + ms, "root.q1.q12", childQueue1, true, conf); + QueueMetrics q2 = QueueMetrics.forQueue( + ms, "root.q2", parentQueue, true, conf); + when(childQueue2.getMetrics()).thenReturn(q2); + QueueMetrics q21 = QueueMetrics.forQueue( + ms, "root.q2.q21", childQueue2, true, conf); + QueueMetrics q22 = QueueMetrics.forQueue( + ms, "root.q2.q22", childQueue2, true, conf); + + q11.incrPendingResources( + "x", "test_user", 2, Resource.newInstance(1024, 1)); + + MetricsSource partitionSource = partitionSource(ms, "x"); + MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName); + MetricsSource q1Source = queueSource(ms, "x", "root.q1"); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 0, 0, 2*GB, 2, 2); + + q11.incrPendingResources( + "x", "test_user", 4, Resource.newInstance(1024, 1)); + MetricsSource q11Source = queueSource(ms, "x", "root.q1.q11"); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 6*GB, 6, 6); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 6*GB, 6, 6); + checkResources(q11Source, 0, 0, 0, 0, 0, 6*GB, 6, 6); + checkResources(q1Source, 0, 0, 0, 0, 0, 6*GB, 6, 6); + + q12.incrPendingResources( + "x", "test_user", 5, Resource.newInstance(1024, 1)); + MetricsSource q12Source = queueSource(ms, "x", "root.q1.q12"); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 11*GB, 11, 11); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 11*GB, 11, 11); + checkResources(q1Source, 0, 0, 0, 0, 0, 11*GB, 11, 11); + checkResources(q12Source, 0, 0, 0, 0, 0, 5*GB, 5, 5); + + q12.incrPendingResources( + "y", "test_user", 3, Resource.newInstance(1024, 1)); + MetricsSource yPartitionSource = partitionSource(ms, "y"); + MetricsSource yRootQueueSource = queueSource(ms, "y", parentQueueName); + MetricsSource q1YSource = queueSource(ms, "y", "root.q1"); + MetricsSource q12YSource = queueSource(ms, "y", "root.q1.q12"); + + checkResources(yPartitionSource, 0, 0, 0, 0, 0, 3*GB, 3, 3); + checkResources(yRootQueueSource, 0, 0, 0, 0, 0, 3*GB, 3, 3); + checkResources(q1YSource, 0, 0, 0, 0, 0, 3*GB, 3, 3); + checkResources(q12YSource, 0, 0, 0, 0, 0, 3*GB, 3, 3); + + q21.incrPendingResources( + "y", "test_user", 5, Resource.newInstance(1024, 1)); + MetricsSource q21Source= queueSource(ms, "y", "root.q2.q21"); + MetricsSource q2YSource= queueSource(ms, "y", "root.q2"); + + checkResources(yPartitionSource, 0, 0, 0, 0, 0, 8*GB, 8, 8); + checkResources(yRootQueueSource, 0, 0, 0, 0, 0, 8*GB, 8, 8); + checkResources(q2YSource, 0, 0, 0, 0, 0, 5*GB, 5, 5); + checkResources(q21Source, 0, 0, 0, 0, 0, 5*GB, 5, 5); + + q22.incrPendingResources( + "y", "test_user", 6, Resource.newInstance(1024, 1)); + MetricsSource q22Source = queueSource(ms, "y", "root.q2.q22"); + + checkResources(yPartitionSource, 0, 0, 0, 0, 0, 14*GB, 14, 14); + checkResources(yRootQueueSource, 0, 0, 0, 0, 0, 14*GB, 14, 14); + checkResources(q22Source, 0, 0, 0, 0, 0, 6*GB, 6, 6); + + } + + @Test + public void testTwoLevelWithUserMetrics() { + String parentQueueName = "root"; + String leafQueueName = "root.leaf"; + String user = "alice"; + String partition = "x"; + + CSQueueMetrics parentMetrics = + (CSQueueMetrics) CSQueueMetrics.forQueue( + parentQueueName, null, true, conf); + Queue parentQueue = mock(Queue.class); + when(parentQueue.getMetrics()).thenReturn(parentMetrics); + CSQueueMetrics metrics = + (CSQueueMetrics) CSQueueMetrics.forQueue( + leafQueueName, parentQueue, true, conf); + AppSchedulingInfo app = mockApp(user); + + metrics.submitApp(user); + metrics.submitAppAttempt(user); + + parentMetrics.setAvailableResourcesToQueue(partition, + Resources.createResource(100*GB, 100)); + metrics.setAvailableResourcesToQueue(partition, + Resources.createResource(100*GB, 100)); + parentMetrics.setAvailableResourcesToUser(partition, + user, Resources.createResource(10*GB, 10)); + metrics.setAvailableResourcesToUser(partition, + user, Resources.createResource(10*GB, 10)); + metrics.incrPendingResources(partition, + user, 6, Resources.createResource(3*GB, 3)); + + MetricsSource partitionSource = + partitionSource(metrics.getMetricsSystem(), partition); + MetricsSource parentQueueSource = + queueSource(metrics.getMetricsSystem(), partition, parentQueueName); + MetricsSource queueSource = + queueSource(metrics.getMetricsSystem(), partition, leafQueueName); + MetricsSource userSource = + userSource(metrics.getMetricsSystem(), partition, user, leafQueueName); + MetricsSource userSource1 = + userSource(metrics.getMetricsSystem(), partition, user, + parentQueueName); + + checkResources(queueSource, + 0, 0, 0, 0, 0, 100*GB, 100, 18*GB, 18, 6, 0, 0, 0); + checkResources(parentQueueSource, + 0, 0, 0, 0, 0, 100*GB, 100, 18*GB, 18, 6, 0, 0, 0); + checkResources(userSource, + 0, 0, 0, 0, 0, 10*GB, 10, 18*GB, 18, 6, 0, 0, 0); + checkResources(userSource1, + 0, 0, 0, 0, 0, 10*GB, 10, 18*GB, 18, 6, 0, 0, 0); + checkResources(partitionSource, + 0, 0, 0, 0, 0, 100*GB, 100, 18*GB, 18, 6, 0, 0, 0); + + metrics.runAppAttempt(app.getApplicationId(), user); + + metrics.allocateResources(partition, + user, 3, Resources.createResource(1*GB, 1), true); + metrics.reserveResource(partition, + user, Resources.createResource(3*GB, 3)); + + // Available resources is set externally, as it depends on dynamic + // configurable cluster/queue resources + checkResources(queueSource, + 3*GB, 3, 3, 3, 0, 100*GB, 100, 15*GB, 15, 3, 3*GB, 3, 1); + checkResources(parentQueueSource, + 3*GB, 3, 3, 3, 0, 100*GB, 100, 15*GB, 15, 3, 3*GB, 3, 1); + checkResources(partitionSource, + 3*GB, 3, 3, 3, 0, 100*GB, 100, 15*GB, 15, 3, 3*GB, 3, 1); + + metrics.allocateResources(partition, + user, 3, Resources.createResource(1*GB, 1), true); + + checkResources(queueSource, + 6*GB, 6, 6, 6, 0, 100*GB, 100, 12*GB, 12, 0, 3*GB, 3, 1); + checkResources(parentQueueSource, + 6*GB, 6, 6, 6, 0, 100*GB, 100, 12*GB, 12, 0, 3*GB, 3, 1); + + metrics.releaseResources(partition, + user, 1, Resources.createResource(2*GB, 2)); + metrics.unreserveResource(partition, + user, Resources.createResource(3*GB, 3)); + checkResources(queueSource, + 4*GB, 4, 5, 6, 1, 100*GB, 100, 12*GB, 12, 0, 0, 0, 0); + checkResources(parentQueueSource, + 4*GB, 4, 5, 6, 1, 100*GB, 100, 12*GB, 12, 0, 0, 0, 0); + checkResources(partitionSource, + 4*GB, 4, 5, 6, 1, 100*GB, 100, 12*GB, 12, 0, 0, 0, 0); + + metrics.finishAppAttempt( + app.getApplicationId(), app.isPending(), app.getUser()); + + metrics.finishApp(user, RMAppState.FINISHED); + } + + @Test + public void testThreeLevelWithUserMetrics() { + String parentQueueName = "root"; + String leafQueueName = "root.leaf"; + String leafQueueName1 = "root.leaf.leaf1"; + String user = "alice"; + String partitionX = "x"; + String partitionY = "y"; + + CSQueueMetrics parentMetrics = + (CSQueueMetrics) CSQueueMetrics.forQueue( + parentQueueName, null, true, conf); + Queue parentQueue = mock(Queue.class); + when(parentQueue.getMetrics()).thenReturn(parentMetrics); + CSQueueMetrics metrics = + (CSQueueMetrics) CSQueueMetrics.forQueue( + leafQueueName, parentQueue, true, conf); + Queue leafQueue = mock(Queue.class); + when(leafQueue.getMetrics()).thenReturn(metrics); + CSQueueMetrics metrics1 = + (CSQueueMetrics) CSQueueMetrics.forQueue( + leafQueueName1, leafQueue, true, conf); + AppSchedulingInfo app = mockApp(user); + + metrics1.submitApp(user); + metrics1.submitAppAttempt(user); + + parentMetrics.setAvailableResourcesToQueue(partitionX, + Resources.createResource(200*GB, 200)); + parentMetrics.setAvailableResourcesToQueue(partitionY, + Resources.createResource(500*GB, 500)); + metrics.setAvailableResourcesToQueue(partitionX, + Resources.createResource(100*GB, 100)); + metrics.setAvailableResourcesToQueue(partitionY, + Resources.createResource(400*GB, 400)); + metrics1.setAvailableResourcesToQueue(partitionX, + Resources.createResource(50*GB, 50)); + metrics1.setAvailableResourcesToQueue(partitionY, + Resources.createResource(300*GB, 300)); + parentMetrics.setAvailableResourcesToUser(partitionX, + user, Resources.createResource(20*GB, 20)); + parentMetrics.setAvailableResourcesToUser(partitionY, + user, Resources.createResource(50*GB, 50)); + metrics.setAvailableResourcesToUser(partitionX, + user, Resources.createResource(10*GB, 10)); + metrics.setAvailableResourcesToUser(partitionY, + user, Resources.createResource(40*GB, 40)); + metrics1.setAvailableResourcesToUser(partitionX, + user, Resources.createResource(5*GB, 5)); + metrics1.setAvailableResourcesToUser(partitionY, + user, Resources.createResource(30*GB, 30)); + metrics1.incrPendingResources(partitionX, + user, 6, Resources.createResource(3*GB, 3)); + metrics1.incrPendingResources(partitionY, + user, 6, Resources.createResource(4*GB, 4)); + + MetricsSource partitionSourceX = + partitionSource(metrics1.getMetricsSystem(), partitionX); + + MetricsSource parentQueueSource_X = + queueSource(metrics1.getMetricsSystem(), partitionX, parentQueueName); + MetricsSource queueSource_X = + queueSource(metrics1.getMetricsSystem(), partitionX, leafQueueName); + MetricsSource queueSource1_X = + queueSource(metrics1.getMetricsSystem(), partitionX, leafQueueName1); + MetricsSource parentUserSource_X = + userSource(metrics1.getMetricsSystem(), partitionX, user, parentQueueName); + MetricsSource userSource_X = + userSource(metrics1.getMetricsSystem(), partitionX, user, leafQueueName); + MetricsSource userSource1_X = + userSource(metrics1.getMetricsSystem(), partitionX, user, leafQueueName1); + + checkResources(partitionSourceX, + 0, 0, 0, 0, 0, 200*GB, 200, 18*GB, 18, 6, 0, 0, 0); + checkResources(parentQueueSource_X, + 0, 0, 0, 0, 0, 200*GB, 200, 18*GB, 18, 6, 0, 0, 0); + checkResources(queueSource_X, + 0, 0, 0, 0, 0, 100*GB, 100, 18*GB, 18, 6, 0, 0, 0); + checkResources(queueSource1_X, + 0, 0, 0, 0, 0, 50*GB, 50, 18*GB, 18, 6, 0, 0, 0); + //checkResources(parentUserSource_X, + // 0, 0, 0, 0, 0, 20*GB, 20, 18*GB, 18, 6, 0, 0, 0); + //checkResources(userSource_X, + // 0, 0, 0, 0, 0, 10*GB, 10, 18*GB, 18, 6, 0, 0, 0); + checkResources(userSource1_X, + 0, 0, 0, 0, 0, 5*GB, 5, 18*GB, 18, 6, 0, 0, 0); + + MetricsSource partitionSourceY = + partitionSource(metrics1.getMetricsSystem(), partitionY); + + MetricsSource parentQueueSource_Y = + queueSource(metrics1.getMetricsSystem(), partitionY, + parentQueueName); + MetricsSource queueSource_Y = + queueSource(metrics1.getMetricsSystem(), partitionY, + leafQueueName); + MetricsSource queueSource1_Y = + queueSource(metrics1.getMetricsSystem(), partitionY, + leafQueueName1); + MetricsSource parentUserSource_Y = + userSource(metrics1.getMetricsSystem(), partitionY, user, + parentQueueName); + MetricsSource userSource_Y = + userSource(metrics1.getMetricsSystem(), partitionY, user, + leafQueueName); + MetricsSource userSource1_Y = + userSource(metrics1.getMetricsSystem(), partitionY, user, + leafQueueName1); + + checkResources(partitionSourceY, + 0, 0, 0, 0, 0, 500*GB, 500, 24*GB, 24, 6, 0, 0, 0); + checkResources(parentQueueSource_Y, + 0, 0, 0, 0, 0, 500*GB, 500, 24*GB, 24, 6, 0, 0, 0); + checkResources(queueSource_Y, + 0, 0, 0, 0, 0, 400*GB, 400, 24*GB, 24, 6, 0, 0, 0); + checkResources(queueSource1_Y, + 0, 0, 0, 0, 0, 300*GB, 300, 24*GB, 24, 6, 0, 0, 0); + //checkResources(parentUserSource_Y, + // 0, 0, 0, 0, 0, 50*GB, 50, 24*GB, 24, 6, 0, 0, 0); + //checkResources(userSource_Y, + // 0, 0, 0, 0, 0, 40*GB, 40, 24*GB, 24, 6, 0, 0, 0); + checkResources(userSource1_Y, + 0, 0, 0, 0, 0, 30*GB, 30, 24*GB, 24, 6, 0, 0, 0); + + metrics1.finishAppAttempt( + app.getApplicationId(), app.isPending(), app.getUser()); + + metrics1.finishApp(user, RMAppState.FINISHED); + } + + /** + * Structure: + * + * Both queues, q1 & q2 has been configured to run in only 1 partition, x + * UserMetrics has been disabled, hence trying to access the user source + * throws NPE from sources + * + * root + * / \ + * q1 q2 + * + * @throws Exception + */ + @Test(expected = NullPointerException.class) + public void testSinglePartitionWithSingleLevelQueueMetricsWithoutUserMetrics() + throws Exception { + + String parentQueueName = "root"; + Queue parentQueue = mock(Queue.class); + String user = "alice"; + + CSQueueMetrics root = (CSQueueMetrics) CSQueueMetrics.forQueue( + "root", null, false, conf); + when(parentQueue.getMetrics()).thenReturn(root); + CSQueueMetrics q1 = (CSQueueMetrics) CSQueueMetrics.forQueue( + "root.q1", parentQueue, false, conf); + CSQueueMetrics q2 = (CSQueueMetrics) CSQueueMetrics.forQueue( + "root.q2", parentQueue, false, conf); + + AppSchedulingInfo app = mockApp(user); + + q1.submitApp(user); + q1.submitAppAttempt(user); + + q1.incrPendingResources( + "x", user, 2, Resource.newInstance(1024, 1)); + + MetricsSource partitionSource = + partitionSource(q1.getMetricsSystem(), "x"); + MetricsSource rootQueueSource = + queueSource(q1.getMetricsSystem(), "x", parentQueueName); + MetricsSource q1Source= queueSource(q1.getMetricsSystem(), "x", "root.q1"); + MetricsSource q1UserSource= + userSource(q1.getMetricsSystem(), "x", user, "root.q1"); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(q1UserSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + + q2.incrPendingResources( + "x", user, 3, Resource.newInstance(1024, 1)); + MetricsSource q2Source = + queueSource(q2.getMetricsSystem(), "x", "root.q2"); + MetricsSource q2UserSource= + userSource(q1.getMetricsSystem(), "x", user, "root.q2"); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 5*GB, 5, 5); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 5*GB, 5, 5); + checkResources(q2Source, 0, 0, 0, 0, 0, 3*GB, 3, 3); + checkResources(q2UserSource, 0, 0, 0, 0, 0, 3*GB, 3, 3); + + q1.finishAppAttempt( + app.getApplicationId(), app.isPending(), app.getUser()); + q1.finishApp(user, RMAppState.FINISHED); + } + + /** + * Structure: + * + * Both queues, q1 & q2 has been configured to run in only 1 partition, x + * UserMetrics has been disabled, hence trying to access the user source + * throws NPE from sources + * + * root + * / \ + * q1 q2 + * + * @throws Exception + */ + @Test + public void testNewPartition() + throws Exception { + + PartitionQueueMetrics metrics = + (PartitionQueueMetrics) PartitionQueueMetrics.forQueue( + ms, "root.q2", null, false, conf, "x"); + + MetricsSource partitionSource = partitionSource(ms, "x"); + + checkResources(partitionSource, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0); + } + + public static MetricsSource partitionSource(MetricsSystem ms, + String partition) { + MetricsSource s = ms.getSource( + PartitionQueueMetrics.pSourceName(partition).toString()); + return s; + } + + public static MetricsSource queueSource(MetricsSystem ms, + String partition, String queue) { + MetricsSource s = ms.getSource(PartitionQueueMetrics.pSourceName(partition). + append(PartitionQueueMetrics.qSourceName(queue)).toString()); + return s; + } + + public static MetricsSource userSource(MetricsSystem ms, + String partition, String user, String queue) { + MetricsSource s = ms.getSource(PartitionQueueMetrics.pSourceName(partition). + append(PartitionQueueMetrics.qSourceName(queue)).append(",user="). + append(user).toString()); + return s; + } + + public static void checkResources(MetricsSource source, long allocatedMB, + int allocatedCores, int allocCtnrs, long availableMB, + int availableCores, long pendingMB, int pendingCores, + int pendingCtnrs) { + MetricsRecordBuilder rb = getMetrics(source); + assertGauge("AllocatedMB", allocatedMB, rb); + assertGauge("AllocatedVCores", allocatedCores, rb); + assertGauge("AllocatedContainers", allocCtnrs, rb); + assertGauge("AvailableMB", availableMB, rb); + assertGauge("AvailableVCores", availableCores, rb); + assertGauge("PendingMB", pendingMB, rb); + assertGauge("PendingVCores", pendingCores, rb); + assertGauge("PendingContainers", pendingCtnrs, rb); + assertGauge("PendingResource.memory-mb", pendingMB, rb); + assertGauge("PendingResource.vcores", Long.valueOf(pendingCores), rb); + } + + private static AppSchedulingInfo mockApp(String user) { + AppSchedulingInfo app = mock(AppSchedulingInfo.class); + when(app.getUser()).thenReturn(user); + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + ApplicationAttemptId id = BuilderUtils.newApplicationAttemptId(appId, 1); + when(app.getApplicationAttemptId()).thenReturn(id); + return app; + } + + public static void checkResources(MetricsSource source, long allocatedMB, + int allocatedCores, int allocCtnrs, long aggreAllocCtnrs, + long aggreReleasedCtnrs, long availableMB, int availableCores, + long pendingMB, int pendingCores, int pendingCtnrs, long reservedMB, + int reservedCores, int reservedCtnrs) { + MetricsRecordBuilder rb = getMetrics(source); + assertGauge("AllocatedMB", allocatedMB, rb); + assertGauge("AllocatedVCores", allocatedCores, rb); + assertGauge("AllocatedContainers", allocCtnrs, rb); + assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb); + assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb); + assertGauge("AvailableMB", availableMB, rb); + assertGauge("AvailableVCores", availableCores, rb); + assertGauge("PendingMB", pendingMB, rb); + assertGauge("PendingVCores", pendingCores, rb); + assertGauge("PendingContainers", pendingCtnrs, rb); + assertGauge("ReservedMB", reservedMB, rb); + assertGauge("ReservedVCores", reservedCores, rb); + assertGauge("ReservedContainers", reservedCtnrs, rb); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java index 2066f607c53..669f558c57e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java @@ -38,6 +38,7 @@ import org.junit.Test; import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.assertGauge; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_COMPLETED; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_FAILED; @@ -84,8 +85,8 @@ public void setUp() { public void testDefaultSingleQueueMetrics() { String queueName = "single"; - QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, false, - conf); + QueueMetrics metrics = + QueueMetrics.forQueue(ms, queueName, null, false, conf); MetricsSource queueSource= queueSource(ms, queueName); AppSchedulingInfo app = mockApp(USER); @@ -121,15 +122,10 @@ public void testDefaultSingleQueueMetrics() { metrics.allocateResources(RMNodeLabelsManager.NO_LABEL, USER, 3, Resources.createResource(2*GB, 2), true); - rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker) - .gaugeLong(ALLOCATED_MB, 6 * GB) - .gaugeInt(ALLOCATED_V_CORES, 6) - .gaugeInt(ALLOCATED_CONTAINERS, 3) - .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3) - .gaugeLong(PENDING_MB, 9 * GB) - .gaugeInt(PENDING_V_CORES, 9) - .gaugeInt(PENDING_CONTAINERS, 2) - .checkAgainst(queueSource); + + rmChecker = + checkAllocatedMetricResourceTypes(queueSource, 6 * GB, 6, 3, 3, 0, + 100 * GB, 100, 9 * GB, 9, 2, 0, 0, 0); metrics.releaseResources(RMNodeLabelsManager.NO_LABEL, USER, 1, Resources.createResource(2*GB, 2)); @@ -250,7 +246,7 @@ public void testSingleQueueWithUserMetrics() { String queueName = "single2"; QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, true, - conf); + conf); MetricsSource queueSource = queueSource(ms, queueName); AppSchedulingInfo app = mockApp(USER_2); @@ -315,25 +311,11 @@ public void testSingleQueueWithUserMetrics() { metrics.allocateResources(RMNodeLabelsManager.NO_LABEL, USER_2, 3, Resources.createResource(2*GB, 2), true); resMetricsQueueSourceChecker = - ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker) - .gaugeLong(ALLOCATED_MB, 6 * GB) - .gaugeInt(ALLOCATED_V_CORES, 6) - .gaugeInt(ALLOCATED_CONTAINERS, 3) - .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3) - .gaugeLong(PENDING_MB, 9 * GB) - .gaugeInt(PENDING_V_CORES, 9) - .gaugeInt(PENDING_CONTAINERS, 2) - .checkAgainst(queueSource); + checkAllocatedMetricResourceTypes(queueSource, 6 * GB, 6, 3, 3, 0, + 100 * GB, 100, 9 * GB, 9, 2, 0, 0, 0); resMetricsUserSourceChecker = - ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker) - .gaugeLong(ALLOCATED_MB, 6 * GB) - .gaugeInt(ALLOCATED_V_CORES, 6) - .gaugeInt(ALLOCATED_CONTAINERS, 3) - .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3) - .gaugeLong(PENDING_MB, 9 * GB) - .gaugeInt(PENDING_V_CORES, 9) - .gaugeInt(PENDING_CONTAINERS, 2) - .checkAgainst(userSource); + checkAllocatedMetricResourceTypes(userSource, 6 * GB, 6, 3, 3, 0, 10 * GB, + 10, 9 * GB, 9, 2, 0, 0, 0); metrics.releaseResources(RMNodeLabelsManager.NO_LABEL, USER_2, 1, Resources.createResource(2*GB, 2)); @@ -522,58 +504,17 @@ public void testTwoLevelWithUserMetrics() { // Available resources is set externally, as it depends on dynamic // configurable cluster/queue resources resMetricsQueueSourceChecker = - ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker) - .gaugeLong(ALLOCATED_MB, 6 * GB) - .gaugeInt(ALLOCATED_V_CORES, 6) - .gaugeInt(ALLOCATED_CONTAINERS, 3) - .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3) - .gaugeLong(PENDING_MB, 9 * GB) - .gaugeInt(PENDING_V_CORES, 9) - .gaugeInt(PENDING_CONTAINERS, 2) - .gaugeLong(RESERVED_MB, 3 * GB) - .gaugeInt(RESERVED_V_CORES, 3) - .gaugeInt(RESERVED_CONTAINERS, 1) - .checkAgainst(leaf.queueSource); + checkReservedMetricResourceTypes(leaf.queueSource, 6 * GB, 6, 3, 3, 0, + 100 * GB, 100, 9 * GB, 9, 2, 3 * GB, 3, 1); resMetricsParentQueueSourceChecker = - ResourceMetricsChecker - .createFromChecker(resMetricsParentQueueSourceChecker) - .gaugeLong(ALLOCATED_MB, 6 * GB) - .gaugeInt(ALLOCATED_V_CORES, 6) - .gaugeInt(ALLOCATED_CONTAINERS, 3) - .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3) - .gaugeLong(PENDING_MB, 9 * GB) - .gaugeInt(PENDING_V_CORES, 9) - .gaugeInt(PENDING_CONTAINERS, 2) - .gaugeLong(RESERVED_MB, 3 * GB) - .gaugeInt(RESERVED_V_CORES, 3) - .gaugeInt(RESERVED_CONTAINERS, 1) - .checkAgainst(root.queueSource); + checkReservedMetricResourceTypes(root.queueSource, 6 * GB, 6, 3, 3, 0, + 100 * GB, 100, 9 * GB, 9, 2, 3 * GB, 3, 1); resMetricsUserSourceChecker = - ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker) - .gaugeLong(ALLOCATED_MB, 6 * GB) - .gaugeInt(ALLOCATED_V_CORES, 6) - .gaugeInt(ALLOCATED_CONTAINERS, 3) - .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3) - .gaugeLong(PENDING_MB, 9 * GB) - .gaugeInt(PENDING_V_CORES, 9) - .gaugeInt(PENDING_CONTAINERS, 2) - .gaugeLong(RESERVED_MB, 3 * GB) - .gaugeInt(RESERVED_V_CORES, 3) - .gaugeInt(RESERVED_CONTAINERS, 1) - .checkAgainst(leaf.userSource); - resMetricsParentUserSourceChecker = ResourceMetricsChecker - .createFromChecker(resMetricsParentUserSourceChecker) - .gaugeLong(ALLOCATED_MB, 6 * GB) - .gaugeInt(ALLOCATED_V_CORES, 6) - .gaugeInt(ALLOCATED_CONTAINERS, 3) - .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3) - .gaugeLong(PENDING_MB, 9 * GB) - .gaugeInt(PENDING_V_CORES, 9) - .gaugeInt(PENDING_CONTAINERS, 2) - .gaugeLong(RESERVED_MB, 3 * GB) - .gaugeInt(RESERVED_V_CORES, 3) - .gaugeInt(RESERVED_CONTAINERS, 1) - .checkAgainst(root.userSource); + checkReservedMetricResourceTypes(leaf.userSource, 6 * GB, 6, 3, 3, 0, + 10 * GB, 10, 9 * GB, 9, 2, 3 * GB, 3, 1); + resMetricsParentUserSourceChecker = + checkReservedMetricResourceTypes(root.userSource, 6 * GB, 6, 3, 3, 0, + 10 * GB, 10, 9 * GB, 9, 2, 3 * GB, 3, 1); leaf.queueMetrics.releaseResources(RMNodeLabelsManager.NO_LABEL, USER, 1, Resources.createResource(2*GB, 2)); @@ -731,6 +672,59 @@ private static void checkAggregatedNodeTypes(MetricsSource source, assertCounter("AggregateOffSwitchContainersAllocated", offSwitch, rb); } + private ResourceMetricsChecker checkAllocatedMetricResourceTypes( + MetricsSource source, long allocatedMB, int allocatedCores, + int allocCtnrs, long aggreAllocCtnrs, long aggreReleasedCtnrs, + long availableMB, int availableCores, long pendingMB, int pendingCores, + int pendingCtnrs, long reservedMB, int reservedCores, int reservedCtnrs) { + + ResourceMetricsChecker rmChecker = + ResourceMetricsChecker.create().gaugeLong(ALLOCATED_MB, allocatedMB) + .gaugeInt(ALLOCATED_V_CORES, allocatedCores) + .gaugeInt(ALLOCATED_CONTAINERS, allocCtnrs) + .counter(AGGREGATE_CONTAINERS_ALLOCATED, aggreAllocCtnrs) + .counter(AGGREGATE_CONTAINERS_RELEASED, aggreReleasedCtnrs) + .gaugeLong(AVAILABLE_MB, availableMB) + .gaugeInt(AVAILABLE_V_CORES, availableCores) + .gaugeLong(PENDING_MB, pendingMB) + .gaugeInt(PENDING_V_CORES, pendingCores) + .gaugeInt(PENDING_CONTAINERS, pendingCtnrs) + .gaugeLong(RESERVED_MB, reservedMB) + .gaugeInt(RESERVED_V_CORES, reservedCores) + .gaugeInt(RESERVED_CONTAINERS, reservedCtnrs).checkAgainst(source); + MetricsRecordBuilder rb = + getMetrics(source); + assertGauge("AllocatedResource.memory-mb", allocatedMB, rb); + assertGauge("AllocatedResource.vcores", Long.valueOf(allocatedCores), rb); + return rmChecker; + } + + private ResourceMetricsChecker checkReservedMetricResourceTypes( + MetricsSource source, long allocatedMB, int allocatedCores, + int allocCtnrs, long aggreAllocCtnrs, long aggreReleasedCtnrs, + long availableMB, int availableCores, long pendingMB, int pendingCores, + int pendingCtnrs, long reservedMB, int reservedCores, int reservedCtnrs) { + ResourceMetricsChecker rmChecker = + ResourceMetricsChecker.create().gaugeLong(ALLOCATED_MB, allocatedMB) + .gaugeInt(ALLOCATED_V_CORES, allocatedCores) + .gaugeInt(ALLOCATED_CONTAINERS, allocCtnrs) + .counter(AGGREGATE_CONTAINERS_ALLOCATED, aggreAllocCtnrs) + .counter(AGGREGATE_CONTAINERS_RELEASED, aggreReleasedCtnrs) + .gaugeLong(AVAILABLE_MB, availableMB) + .gaugeInt(AVAILABLE_V_CORES, availableCores) + .gaugeLong(PENDING_MB, pendingMB) + .gaugeInt(PENDING_V_CORES, pendingCores) + .gaugeInt(PENDING_CONTAINERS, pendingCtnrs) + .gaugeLong(RESERVED_MB, reservedMB) + .gaugeInt(RESERVED_V_CORES, reservedCores) + .gaugeInt(RESERVED_CONTAINERS, reservedCtnrs).checkAgainst(source); + MetricsRecordBuilder rb = + getMetrics(source); + assertGauge("ReservedResource.memory-mb", reservedMB, rb); + assertGauge("ReservedResource.vcores", Long.valueOf(reservedCores), rb); + return rmChecker; + } + static AppSchedulingInfo mockApp(String user) { AppSchedulingInfo app = mock(AppSchedulingInfo.class); when(app.getUser()).thenReturn(user); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 9cfddd66f66..e53082dfca8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -2020,8 +2020,8 @@ public RMNodeLabelsManager createNodeLabelManager() { reportNm2.getAvailableResource().getMemorySize()); LeafQueue leafQueue = (LeafQueue) cs.getQueue("a"); - assertEquals(5 * GB, leafQueue.getMetrics().getAvailableMB()); - assertEquals(0 * GB, leafQueue.getMetrics().getAllocatedMB()); + assertEquals(0 * GB, leafQueue.getMetrics().getAvailableMB()); + assertEquals(5 * GB, leafQueue.getMetrics().getAllocatedMB()); // Kill all apps in queue a cs.killAllAppsInQueue("a"); @@ -2123,8 +2123,8 @@ public RMNodeLabelsManager createNodeLabelManager() { double delta = 0.0001; // 3GB is used from label x quota. 1.5 GB is remaining from default label. // 2GB is remaining from label x. - assertEquals(6.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); - assertEquals(1 * GB, leafQueue.getMetrics().getAllocatedMB()); + assertEquals(3.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); + assertEquals(4 * GB, leafQueue.getMetrics().getAllocatedMB()); // app1 asks for 1 default partition container am1.allocate("*", 1 * GB, 5, new ArrayList()); @@ -2141,7 +2141,7 @@ public RMNodeLabelsManager createNodeLabelManager() { // 3GB is used from label x quota. 2GB used from default label. // So total 2.5 GB is remaining. assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); - assertEquals(2 * GB, leafQueue.getMetrics().getAllocatedMB()); + assertEquals(5 * GB, leafQueue.getMetrics().getAllocatedMB()); rm1.close(); }