diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java new file mode 100644 index 00000000000..57b922db0d2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java @@ -0,0 +1,113 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; + +@Metrics(context = "yarn") +public class PartitionQueueMetrics extends QueueMetrics { + + private String partition; + + protected PartitionQueueMetrics(MetricsSystem ms, String queueName, + Queue parent, boolean enableUserMetrics, Configuration conf, + String partition) { + super(ms, queueName, parent, enableUserMetrics, conf); + this.partition = partition; + } + + protected PartitionQueueMetrics(MetricsSystem ms, String queueName, + Queue parent, boolean enableUserMetrics, Configuration conf, String partition, + boolean partitionQueueMetrics) { + super(ms, queueName, parent, enableUserMetrics, conf, partitionQueueMetrics); + this.partition = partition; + } + + /** + * Partition * Queue * User Metrics + * + * @param userName Name of the user + * @return QueueMetrics + */ + @Override + public synchronized QueueMetrics getUserMetrics(String userName) { + if (users == null) { + return null; + } + + String metricName = partition + this.queueName + userName; + QueueMetrics metrics = (PartitionQueueMetrics) users.get(metricName); + if (metrics == null) { + metrics = new PartitionQueueMetrics(this.metricsSystem, + this.queueName, null, false, this.conf, this.partition, false); + users.put(metricName, metrics); + metricsSystem.register(pSourceName(partition).append(qSourceName(queueName)). + append(",user=").append(userName).toString(), + "Metrics for user '" + userName + "' in queue '" + queueName + "'", + ((PartitionQueueMetrics) metrics.tag(PARTITION_INFO, partition). + tag(QUEUE_INFO, queueName)).tag(USER_INFO, + userName)); + getQueueMetrics().put(metricName, metrics); + } + return metrics; + } + + public synchronized static PartitionQueueMetrics forQueue( + MetricsSystem ms, String queueName, Queue parent, + boolean enableUserMetrics, Configuration conf, String partition) { + QueueMetrics metrics = getQueueMetrics().get(partition); + if (metrics == null) { + metrics = + new PartitionQueueMetrics(ms, queueName, parent, enableUserMetrics, + conf, partition, false); + + // Register with the MetricsSystems + if (ms != null) { + metrics = + ms.register(pSourceName(partition).toString(), + "Metrics for partition: " + partition, + (PartitionQueueMetrics) metrics.tag(PARTITION_INFO, partition)); + } + getQueueMetrics().put(partition, metrics); + } + + return (PartitionQueueMetrics) metrics; + } + + + /** + * Partition * Queue Metrics + * + * @param partition + * @return QueueMetrics + */ + public synchronized QueueMetrics getPartitionQueueMetrics(String partition) { + + if (!repeatPartitionQueueMetrics) { + return null; + } + + // Use 'default' to register into metrics system for default partition + if ((partition == null) + || (partition.equals(RMNodeLabelsManager.NO_LABEL))) { + partition = DEFAULT_PARTITION; + } + + QueueMetrics metrics = getQueueMetrics().get(partition); + if (metrics == null) { + metrics = new PartitionQueueMetrics(metricsSystem, this.queueName, null, + false, this.conf, partition, false); + + // Register with the MetricsSystems + if (metricsSystem != null) { + metricsSystem.register(pSourceName(partition).toString(), + "Metrics for partition: " + partition, + (PartitionQueueMetrics) metrics.tag(PARTITION_INFO, partition)); + } + getQueueMetrics().put(partition, metrics); + } + return metrics; + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index 1b926a43e08..518ec3043cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -112,6 +112,8 @@ info("Queue", "Metrics by queue"); protected static final MetricsInfo USER_INFO = info("User", "Metrics by user"); + protected static final MetricsInfo PARTITION_INFO = + info("Partition", "Metrics by partition"); static final Splitter Q_SPLITTER = Splitter.on('.').omitEmptyStrings().trimResults(); @@ -123,6 +125,19 @@ protected final Configuration conf; private QueueMetricsForCustomResources queueMetricsForCustomResources; + /** + * + * Ensure Partition * Queue Metrics flow doesn't get into infinite loops + */ + protected final boolean repeatPartitionQueueMetrics; + + protected final boolean enableUserMetrics; + + protected static final MetricsInfo P_RECORD_INFO = + info("PartitionQueueMetrics", "Metrics for the resource scheduler"); + + public static final String DEFAULT_PARTITION = "default"; + private static final String ALLOCATED_RESOURCE_METRIC_PREFIX = "AllocatedResource."; private static final String ALLOCATED_RESOURCE_METRIC_DESC = @@ -148,13 +163,22 @@ private static final String AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC = "Aggregate Preempted Seconds for NAME"; - protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent, - boolean enableUserMetrics, Configuration conf) { + public QueueMetrics(MetricsSystem ms, String queueName, Queue parent, + boolean enableUserMetrics, Configuration conf) { + this(ms, queueName, parent, enableUserMetrics, conf, true); + } + + protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent, + boolean enableUserMetrics, Configuration conf, + boolean partitionQueueMetrics) { + registry = new MetricsRegistry(RECORD_INFO); this.queueName = queueName; this.parent = parent != null ? parent.getMetrics() : null; - this.users = enableUserMetrics ? new HashMap() - : null; + this.users = enableUserMetrics ? new HashMap() : null; + this.enableUserMetrics = enableUserMetrics; + this.repeatPartitionQueueMetrics = partitionQueueMetrics; + metricsSystem = ms; this.conf = conf; runningTime = buildBuckets(conf); @@ -180,12 +204,25 @@ protected static StringBuilder sourceName(String queueName) { return sb; } - public synchronized - static QueueMetrics forQueue(String queueName, Queue parent, - boolean enableUserMetrics, - Configuration conf) { + static StringBuilder pSourceName(String partition) { + StringBuilder sb = new StringBuilder(P_RECORD_INFO.name()); + sb.append(",partition").append('=').append(partition); + return sb; + } + + static StringBuilder qSourceName(String queueName) { + StringBuilder sb = new StringBuilder(); + int i = 0; + for (String node : Q_SPLITTER.split(queueName)) { + sb.append(",q").append(i++).append('=').append(node); + } + return sb; + } + + public synchronized static QueueMetrics forQueue(String queueName, + Queue parent, boolean enableUserMetrics, Configuration conf) { return forQueue(DefaultMetricsSystem.instance(), queueName, parent, - enableUserMetrics, conf); + enableUserMetrics, conf); } /** @@ -211,22 +248,18 @@ public synchronized static void clearQueueMetrics() { return QUEUE_METRICS; } - public synchronized - static QueueMetrics forQueue(MetricsSystem ms, String queueName, - Queue parent, boolean enableUserMetrics, - Configuration conf) { + public synchronized static QueueMetrics forQueue(MetricsSystem ms, + String queueName, Queue parent, boolean enableUserMetrics, + Configuration conf) { QueueMetrics metrics = QUEUE_METRICS.get(queueName); if (metrics == null) { - metrics = - new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf). - tag(QUEUE_INFO, queueName); - + metrics = new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf) + .tag(QUEUE_INFO, queueName); + // Register with the MetricsSystems if (ms != null) { - metrics = - ms.register( - sourceName(queueName).toString(), - "Metrics for queue: " + queueName, metrics); + metrics = ms.register(sourceName(queueName).toString(), + "Metrics for queue: " + queueName, metrics); } QUEUE_METRICS.put(queueName, metrics); } @@ -240,7 +273,8 @@ public synchronized QueueMetrics getUserMetrics(String userName) { } QueueMetrics metrics = users.get(userName); if (metrics == null) { - metrics = new QueueMetrics(metricsSystem, queueName, null, false, conf); + metrics = + new QueueMetrics(metricsSystem, queueName, null, false, conf, false); users.put(userName, metrics); metricsSystem.register( sourceName(queueName).append(",user=").append(userName).toString(), @@ -250,6 +284,43 @@ public synchronized QueueMetrics getUserMetrics(String userName) { return metrics; } + /** + * Partition * Queue Metrics + * + * @param partition + * @return QueueMetrics + */ + public synchronized QueueMetrics getPartitionQueueMetrics(String partition) { + if (!repeatPartitionQueueMetrics) { + return null; + } + + // Use 'default' to register into metrics system for default partition + if ((partition == null) + || (partition.equals(RMNodeLabelsManager.NO_LABEL))) { + partition = DEFAULT_PARTITION; + } + + boolean partitionQueueMetricsFlag = + (this.queueName.equals("root")) ? true : false; + + String metricName = partition + this.queueName; + QueueMetrics metrics = QUEUE_METRICS.get(metricName); + if (metrics == null) { + QueueMetrics queueMetrics = new PartitionQueueMetrics(metricsSystem, + this.queueName, null, this.enableUserMetrics, this.conf, partition, + partitionQueueMetricsFlag); + metrics = metricsSystem.register( + pSourceName(partition).append(qSourceName(this.queueName)).toString(), + "Metrics for queue: " + this.queueName, queueMetrics + .tag(PARTITION_INFO, partition).tag(QUEUE_INFO, this.queueName)); + QUEUE_METRICS.put(metricName, queueMetrics); + return queueMetrics; + } else { + return metrics; + } + } + private ArrayList parseInts(String value) { ArrayList result = new ArrayList(); for(String s: value.split(",")) { @@ -389,21 +460,33 @@ public void moveAppTo(AppSchedulingInfo app) { * @param limit resource limit */ public void setAvailableResourcesToQueue(String partition, Resource limit) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - availableMB.set(limit.getMemorySize()); - availableVCores.set(limit.getVirtualCores()); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.setAvailable(limit); - registerCustomResources( - queueMetricsForCustomResources.getAvailableValues(), - AVAILABLE_RESOURCE_METRIC_PREFIX, AVAILABLE_RESOURCE_METRIC_DESC); - } + _setAvailableResources(limit); + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.setAvailableResourcesToQueue(partition, limit); + } + } + + /** + * Set Available resources with support for resource vectors. + * + * @param limit + */ + public void _setAvailableResources(Resource limit) { + availableMB.set(limit.getMemorySize()); + availableVCores.set(limit.getVirtualCores()); + if (queueMetricsForCustomResources != null) { + queueMetricsForCustomResources.setAvailable(limit); + registerCustomResources( + queueMetricsForCustomResources.getAvailableValues(), + AVAILABLE_RESOURCE_METRIC_PREFIX, AVAILABLE_RESOURCE_METRIC_DESC); } } /** * Set available resources. To be called by scheduler periodically as * resources become available. + * * @param limit resource limit */ public void setAvailableResourcesToQueue(Resource limit) { @@ -413,39 +496,51 @@ public void setAvailableResourcesToQueue(Resource limit) { /** * Set available resources. To be called by scheduler periodically as * resources become available. + * * @param partition Node Partition * @param user * @param limit resource limit */ - public void setAvailableResourcesToUser(String partition, - String user, Resource limit) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.setAvailableResourcesToQueue(partition, limit); - } + public void setAvailableResourcesToUser(String partition, String user, + Resource limit) { + + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.setAvailableResourcesToQueue(partition, limit); + } + if (parent != null) { + parent.setAvailableResourcesToUser(partition, user, limit); + } + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.setAvailableResourcesToUser(partition, user, limit); } } /** * Increment pending resource metrics + * * @param partition Node Partition * @param user * @param containers - * @param res the TOTAL delta of resources note this is different from - * the other APIs which use per container resource + * @param res the TOTAL delta of resources note this is different from the + * other APIs which use per container resource */ public void incrPendingResources(String partition, String user, int containers, Resource res) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - _incrPendingResources(containers, res); - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.incrPendingResources(partition, user, containers, res); - } - if (parent != null) { - parent.incrPendingResources(partition, user, containers, res); - } + + _incrPendingResources(containers, res); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.incrPendingResources(partition, user, containers, res); + } + if (parent != null) { + parent.incrPendingResources(partition, user, containers, res); + } + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.incrPendingResources(partition, user, containers, + res); } } @@ -512,15 +607,19 @@ private void _incrPendingResources(int containers, Resource res) { public void decrPendingResources(String partition, String user, int containers, Resource res) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - _decrPendingResources(containers, res); - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.decrPendingResources(partition, user, containers, res); - } - if (parent != null) { - parent.decrPendingResources(partition, user, containers, res); - } + + _decrPendingResources(containers, res); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.decrPendingResources(partition, user, containers, res); + } + if (parent != null) { + parent.decrPendingResources(partition, user, containers, res); + } + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.decrPendingResources(partition, user, containers, + res); } } @@ -554,32 +653,43 @@ public void incrNodeTypeAggregations(String user, NodeType type) { } } - public void allocateResources(String partition, String user, - int containers, Resource res, boolean decrPending) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - allocatedContainers.incr(containers); - aggregateContainersAllocated.incr(containers); + public void allocateResources(String partition, String user, int containers, + Resource res, boolean decrPending) { + _allocateResources(containers, res, decrPending); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.allocateResources(partition, user, containers, res, + decrPending); + } + if (parent != null) { + parent.allocateResources(partition, user, containers, res, decrPending); + } + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.allocateResources(partition, user, containers, res, + decrPending); + } + } - allocatedMB.incr(res.getMemorySize() * containers); - allocatedVCores.incr(res.getVirtualCores() * containers); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.increaseAllocated(res, containers); - registerCustomResources( - queueMetricsForCustomResources.getAllocatedValues(), - ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); - } - if (decrPending) { - _decrPendingResources(containers, res); - } - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.allocateResources(partition, user, - containers, res, decrPending); - } - if (parent != null) { - parent.allocateResources(partition, user, containers, res, decrPending); - } + /** + * Allocate Resources for a partition with support for resource vectors. + * + * @param containers number of containers + * @param res resource containing memory size, vcores etc + * @param decrPending decides whether to decrease pending resource or not + */ + private void _allocateResources(int containers, Resource res, + boolean decrPending) { + allocatedContainers.incr(containers); + aggregateContainersAllocated.incr(containers); + allocatedMB.incr(res.getMemorySize() * containers); + allocatedVCores.incr(res.getVirtualCores() * containers); + if (queueMetricsForCustomResources != null) { + queueMetricsForCustomResources.increaseAllocated(res, containers); + } + if (decrPending) { + _decrPendingResources(containers, res); } } @@ -590,56 +700,66 @@ public void allocateResources(String partition, String user, * @param res */ public void allocateResources(String partition, String user, Resource res) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - allocatedMB.incr(res.getMemorySize()); - allocatedVCores.incr(res.getVirtualCores()); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.increaseAllocated(res); - registerCustomResources( - queueMetricsForCustomResources.getAllocatedValues(), - ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); - } + allocatedMB.incr(res.getMemorySize()); + allocatedVCores.incr(res.getVirtualCores()); + if (queueMetricsForCustomResources != null) { + queueMetricsForCustomResources.increaseAllocated(res); + registerCustomResources( + queueMetricsForCustomResources.getAllocatedValues(), + ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); + } - pendingMB.decr(res.getMemorySize()); - pendingVCores.decr(res.getVirtualCores()); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.decreasePending(res); - registerCustomResources( - queueMetricsForCustomResources.getPendingValues(), - PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC); - } + pendingMB.decr(res.getMemorySize()); + pendingVCores.decr(res.getVirtualCores()); + if (queueMetricsForCustomResources != null) { + queueMetricsForCustomResources.decreasePending(res); + registerCustomResources(queueMetricsForCustomResources.getPendingValues(), + PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC); + } - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.allocateResources(partition, user, res); - } - if (parent != null) { - parent.allocateResources(partition, user, res); - } + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.allocateResources(partition, user, res); + } + if (parent != null) { + parent.allocateResources(partition, user, res); } + } - public void releaseResources(String partition, - String user, int containers, Resource res) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - allocatedContainers.decr(containers); - aggregateContainersReleased.incr(containers); - allocatedMB.decr(res.getMemorySize() * containers); - allocatedVCores.decr(res.getVirtualCores() * containers); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.decreaseAllocated(res, containers); - registerCustomResources( - queueMetricsForCustomResources.getAllocatedValues(), - ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); - } + public void releaseResources(String partition, String user, int containers, + Resource res) { + _releaseResources(containers, res); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.releaseResources(partition, user, containers, res); + } + if (parent != null) { + parent.releaseResources(partition, user, containers, res); + } - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.releaseResources(partition, user, containers, res); - } - if (parent != null) { - parent.releaseResources(partition, user, containers, res); - } + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.releaseResources(partition, user, containers, res); + } + } + + /** + * Release Resources for a partition with support for resource vectors. + * + * @param containers number of containers + * @param res resource containing memory size, vcores etc + */ + private void _releaseResources(int containers, Resource res) { + allocatedContainers.decr(containers); + aggregateContainersReleased.incr(containers); + allocatedMB.decr(res.getMemorySize() * containers); + allocatedVCores.decr(res.getVirtualCores() * containers); + if (queueMetricsForCustomResources != null) { + queueMetricsForCustomResources.decreaseAllocated(res, containers); + registerCustomResources( + queueMetricsForCustomResources.getAllocatedValues(), + ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); } } @@ -723,12 +843,24 @@ public void updatePreemptedSecondsForCustomResources(Resource res, } public void reserveResource(String partition, String user, Resource res) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - reserveResource(user, res); + + _reserveResource(res); + + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.reserveResource(partition, user, res); + } + if (parent != null) { + parent.reserveResource(partition, user, res); + } + + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.reserveResource(partition, user, res); } } - public void reserveResource(String user, Resource res) { + public void _reserveResource(Resource res) { reservedContainers.incr(); reservedMB.incr(res.getMemorySize()); reservedVCores.incr(res.getVirtualCores()); @@ -738,17 +870,27 @@ public void reserveResource(String user, Resource res) { queueMetricsForCustomResources.getReservedValues(), RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC); } + } + + public void unreserveResource(String partition, String user, Resource res) { + _unreserveResource(res); QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { - userMetrics.reserveResource(user, res); + userMetrics.unreserveResource(partition, user, res); } if (parent != null) { - parent.reserveResource(user, res); + parent.unreserveResource(partition, user, res); + } + + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.unreserveResource(partition, user, res); } } - private void unreserveResource(String user, Resource res) { - reservedContainers.decr(); + public void _unreserveResource(Resource res) { + int containers = 1; + reservedContainers.decr(containers); reservedMB.decr(res.getMemorySize()); reservedVCores.decr(res.getVirtualCores()); if (queueMetricsForCustomResources != null) { @@ -757,19 +899,6 @@ private void unreserveResource(String user, Resource res) { queueMetricsForCustomResources.getReservedValues(), RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC); } - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.unreserveResource(user, res); - } - if (parent != null) { - parent.unreserveResource(user, res); - } - } - - public void unreserveResource(String partition, String user, Resource res) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - unreserveResource(user, res); - } } public void incrActiveUsers() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java index c50a1b145d3..7548a18fe4c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java @@ -69,6 +69,13 @@ super(ms, queueName, parent, enableUserMetrics, conf); } + CSQueueMetrics(MetricsSystem ms, String queueName, Queue parent, + boolean enableUserMetrics, Configuration conf, + boolean partitionQueueMetrics) { + super(ms, queueName, parent, enableUserMetrics, conf, + partitionQueueMetrics); + } + public long getAMResourceLimitMB() { return AMResourceLimitMB.value(); } @@ -201,7 +208,8 @@ public synchronized QueueMetrics getUserMetrics(String userName) { } CSQueueMetrics metrics = (CSQueueMetrics) users.get(userName); if (metrics == null) { - metrics = new CSQueueMetrics(metricsSystem, queueName, null, false, conf); + metrics = + new CSQueueMetrics(metricsSystem, queueName, null, false, conf, false); users.put(userName, metrics); metricsSystem.register( sourceName(queueName).append(",user=").append(userName).toString(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java new file mode 100644 index 00000000000..4cb6ce61311 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java @@ -0,0 +1,699 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueMetrics; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.assertGauge; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; + +public class TestPartitionQueueMetrics { + + static final int GB = 1024; // MB + private static final Configuration conf = new Configuration(); + + private MetricsSystem ms; + + @Before + public void setUp() { + ms = new MetricsSystemImpl(); + QueueMetrics.clearQueueMetrics(); + PartitionQueueMetrics.clearQueueMetrics(); + } + + @After + public void tearDown() { + ms.shutdown(); + + } + + /** + * Structure: + * + * Both queues, q1 & q2 has been configured to run in only 1 partition, x + * + * root + * / \ + * q1 q2 + * + * @throws Exception + */ + + @Test + public void testSinglePartitionWithSingleLevelQueueMetrics() + throws Exception { + + String parentQueueName = "root"; + Queue parentQueue = mock(Queue.class); + String user = "alice"; + + CSQueueMetrics root = (CSQueueMetrics) CSQueueMetrics.forQueue( + "root", null, true, conf); + when(parentQueue.getMetrics()).thenReturn(root); + CSQueueMetrics q1 = (CSQueueMetrics) CSQueueMetrics.forQueue( + "root.q1", parentQueue, true, conf); + CSQueueMetrics q2 = (CSQueueMetrics) CSQueueMetrics.forQueue( + "root.q2", parentQueue, true, conf); + + q1.submitApp(user); + q1.submitAppAttempt(user); + + q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1)); + + MetricsSource partitionSource = partitionSource(q1.getMetricsSystem(), "x"); + MetricsSource rootQueueSource = + queueSource(q1.getMetricsSystem(), "x", parentQueueName); + MetricsSource q1Source= queueSource(q1.getMetricsSystem(), "x", "root.q1"); + + //checkResources(partitionSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 0, 0, 2*GB, 2, 2); + + + q2.incrPendingResources( + "x", user, 3, Resource.newInstance(1024, 1)); + MetricsSource q2Source = + queueSource(q2.getMetricsSystem(), "x", "root.q2"); + + //checkResources(partitionSource, 0, 0, 0, 0, 0, 5*GB, 5, 5); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 5*GB, 5, 5); + checkResources(q2Source, 0, 0, 0, 0, 0, 3*GB, 3, 3); + } + + /** + * Structure: + * + * Both queues, q1 & q2 has been configured to run in both partitions, x & y + * + * root + * / \ + * q1 q2 + * + * @throws Exception + */ + @Test + public void testTwoPartitionWithSingleLevelQueueMetrics() + throws Exception { + + String parentQueueName = "root"; + Queue parentQueue = mock(Queue.class); + String user = "alice"; + + QueueMetrics root = QueueMetrics.forQueue(ms, "root", null, true, conf); + when(parentQueue.getMetrics()).thenReturn(root); + QueueMetrics q1 = QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, conf); + QueueMetrics q2 = QueueMetrics.forQueue(ms, "root.q2", parentQueue, true, conf); + q1.incrPendingResources( + "x", user, 2, Resource.newInstance(1024, 1)); + + MetricsSource xPartitionSource = partitionSource(ms, "x"); + MetricsSource xRootQueueSource = queueSource(ms, "x", parentQueueName); + MetricsSource q1Source = queueSource(ms, "x", "root.q1"); + + checkResources(xPartitionSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(xRootQueueSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 0, 0, 2*GB, 2, 2); + + q2.incrPendingResources( + "y", user, 3, Resource.newInstance(1024, 1)); + MetricsSource yPartitionSource = partitionSource(ms, "y"); + MetricsSource yRootQueueSource = queueSource(ms, "y", parentQueueName); + MetricsSource q2Source = queueSource(ms, "y", "root.q2"); + + checkResources(yPartitionSource, 0, 0, 0, 0, 0, 3*GB, 3, 3); + checkResources(yRootQueueSource, 0, 0, 0, 0, 0, 3*GB, 3, 3); + checkResources(q2Source, 0, 0, 0, 0, 0, 3*GB, 3, 3); + } + + /** + * Structure: + * + * Both queues, q1 has been configured to run in multiple partitions, x & y + * + * root + * / + * q1 + * + * @throws Exception + */ + @Test + public void testMultiplePartitionWithSingleQueueMetrics() + throws Exception { + + String parentQueueName = "root"; + Queue parentQueue = mock(Queue.class); + + QueueMetrics root = + QueueMetrics.forQueue(ms, "root", null, true, conf); + when(parentQueue.getMetrics()).thenReturn(root); + QueueMetrics q1 = + QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, conf); + q1.incrPendingResources( + "x", "test_user", 2, Resource.newInstance(1024, 1)); + + MetricsSource partitionSource = partitionSource(ms, "x"); + MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName); + MetricsSource q1Source = queueSource(ms, "x", "root.q1"); + MetricsSource userSource = userSource(ms, "x", "test_user", "root.q1"); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(userSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + + q1.incrPendingResources( + "x", "test_user", 3, Resource.newInstance(1024, 1)); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 5*GB, 5, 5); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 5*GB, 5, 5); + checkResources(q1Source, 0, 0, 0, 0, 0, 5*GB, 5, 5); + checkResources(userSource, 0, 0, 0, 0, 0, 5*GB, 5, 5); + + q1.incrPendingResources( + "x", "test_user1", 4, Resource.newInstance(1024, 1)); + MetricsSource userSource1 = userSource(ms, "x", "test_user1", "root.q1"); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 9*GB, 9, 9); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 9*GB, 9, 9); + checkResources(q1Source, 0, 0, 0, 0, 0, 9*GB, 9, 9); + checkResources(userSource1, 0, 0, 0, 0, 0, 4*GB, 4, 4); + + q1.incrPendingResources( + "y", "test_user1", 6, Resource.newInstance(1024, 1)); + MetricsSource partitionSourceY = partitionSource(ms, "y"); + MetricsSource rootQueueSourceY = queueSource(ms, "y", parentQueueName); + MetricsSource q1SourceY = queueSource(ms, "y", "root.q1"); + MetricsSource userSourceY = userSource(ms, "y", "test_user1", "root.q1"); + + checkResources(partitionSourceY, 0, 0, 0, 0, 0, 6*GB, 6, 6); + checkResources(rootQueueSourceY, 0, 0, 0, 0, 0, 6*GB, 6, 6); + checkResources(q1SourceY, 0, 0, 0, 0, 0, 6*GB, 6, 6); + checkResources(userSourceY, 0, 0, 0, 0, 0, 6*GB, 6, 6); + } + + /** + * Structure: + * + * Both queues, q1 & q2 has been configured to run in both partitions, x & y + * + * root + * / \ + * q1 q2 + * / \ / \ + * q11 q12 q21 q22 + * + * @throws Exception + */ + + @Test + public void testMultiplePartitionsWithMultiLevelQueuesMetrics() + throws Exception { + + String parentQueueName = "root"; + Queue parentQueue = mock(Queue.class); + Queue childQueue1 = mock(Queue.class); + Queue childQueue2 = mock(Queue.class); + + QueueMetrics root = + QueueMetrics.forQueue(ms, "root", null, true,conf); + when(parentQueue.getMetrics()).thenReturn(root); + QueueMetrics q1 = + QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, conf); + when(childQueue1.getMetrics()).thenReturn(q1); + QueueMetrics q11 = QueueMetrics.forQueue( + ms, "root.q1.q11", childQueue1, true, conf); + QueueMetrics q12 = QueueMetrics.forQueue( + ms, "root.q1.q12", childQueue1, true, conf); + QueueMetrics q2 = QueueMetrics.forQueue( + ms, "root.q2", parentQueue, true, conf); + when(childQueue2.getMetrics()).thenReturn(q2); + QueueMetrics q21 = QueueMetrics.forQueue( + ms, "root.q2.q21", childQueue2, true, conf); + QueueMetrics q22 = QueueMetrics.forQueue( + ms, "root.q2.q22", childQueue2, true, conf); + + q11.incrPendingResources( + "x", "test_user", 2, Resource.newInstance(1024, 1)); + + MetricsSource partitionSource = partitionSource(ms, "x"); + MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName); + MetricsSource q1Source = queueSource(ms, "x", "root.q1"); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 0, 0, 2*GB, 2, 2); + + q11.incrPendingResources( + "x", "test_user", 4, Resource.newInstance(1024, 1)); + MetricsSource q11Source = queueSource(ms, "x", "root.q1.q11"); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 6*GB, 6, 6); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 6*GB, 6, 6); + checkResources(q11Source, 0, 0, 0, 0, 0, 6*GB, 6, 6); + checkResources(q1Source, 0, 0, 0, 0, 0, 6*GB, 6, 6); + + q12.incrPendingResources( + "x", "test_user", 5, Resource.newInstance(1024, 1)); + MetricsSource q12Source = queueSource(ms, "x", "root.q1.q12"); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 11*GB, 11, 11); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 11*GB, 11, 11); + checkResources(q1Source, 0, 0, 0, 0, 0, 11*GB, 11, 11); + checkResources(q12Source, 0, 0, 0, 0, 0, 5*GB, 5, 5); + + q12.incrPendingResources( + "y", "test_user", 3, Resource.newInstance(1024, 1)); + MetricsSource yPartitionSource = partitionSource(ms, "y"); + MetricsSource yRootQueueSource = queueSource(ms, "y", parentQueueName); + MetricsSource q1YSource = queueSource(ms, "y", "root.q1"); + MetricsSource q12YSource = queueSource(ms, "y", "root.q1.q12"); + + checkResources(yPartitionSource, 0, 0, 0, 0, 0, 3*GB, 3, 3); + checkResources(yRootQueueSource, 0, 0, 0, 0, 0, 3*GB, 3, 3); + checkResources(q1YSource, 0, 0, 0, 0, 0, 3*GB, 3, 3); + checkResources(q12YSource, 0, 0, 0, 0, 0, 3*GB, 3, 3); + + q21.incrPendingResources( + "y", "test_user", 5, Resource.newInstance(1024, 1)); + MetricsSource q21Source= queueSource(ms, "y", "root.q2.q21"); + MetricsSource q2YSource= queueSource(ms, "y", "root.q2"); + + checkResources(yPartitionSource, 0, 0, 0, 0, 0, 8*GB, 8, 8); + checkResources(yRootQueueSource, 0, 0, 0, 0, 0, 8*GB, 8, 8); + checkResources(q2YSource, 0, 0, 0, 0, 0, 5*GB, 5, 5); + checkResources(q21Source, 0, 0, 0, 0, 0, 5*GB, 5, 5); + + q22.incrPendingResources( + "y", "test_user", 6, Resource.newInstance(1024, 1)); + MetricsSource q22Source = queueSource(ms, "y", "root.q2.q22"); + + checkResources(yPartitionSource, 0, 0, 0, 0, 0, 14*GB, 14, 14); + checkResources(yRootQueueSource, 0, 0, 0, 0, 0, 14*GB, 14, 14); + checkResources(q22Source, 0, 0, 0, 0, 0, 6*GB, 6, 6); + + } + + @Test + public void testTwoLevelWithUserMetrics() { + String parentQueueName = "root"; + String leafQueueName = "root.leaf"; + String user = "alice"; + String partition = "x"; + + CSQueueMetrics parentMetrics = + (CSQueueMetrics) CSQueueMetrics.forQueue( + parentQueueName, null, true, conf); + Queue parentQueue = mock(Queue.class); + when(parentQueue.getMetrics()).thenReturn(parentMetrics); + CSQueueMetrics metrics = + (CSQueueMetrics) CSQueueMetrics.forQueue( + leafQueueName, parentQueue, true, conf); + AppSchedulingInfo app = mockApp(user); + + metrics.submitApp(user); + metrics.submitAppAttempt(user); + + parentMetrics.setAvailableResourcesToQueue(partition, + Resources.createResource(100*GB, 100)); + metrics.setAvailableResourcesToQueue(partition, + Resources.createResource(100*GB, 100)); + parentMetrics.setAvailableResourcesToUser(partition, + user, Resources.createResource(10*GB, 10)); + metrics.setAvailableResourcesToUser(partition, + user, Resources.createResource(10*GB, 10)); + metrics.incrPendingResources(partition, + user, 6, Resources.createResource(3*GB, 3)); + + MetricsSource partitionSource = + partitionSource(metrics.getMetricsSystem(), partition); + MetricsSource parentQueueSource = + queueSource(metrics.getMetricsSystem(), partition, parentQueueName); + MetricsSource queueSource = + queueSource(metrics.getMetricsSystem(), partition, leafQueueName); + MetricsSource userSource = + userSource(metrics.getMetricsSystem(), partition, user, leafQueueName); + MetricsSource userSource1 = + userSource(metrics.getMetricsSystem(), partition, user, + parentQueueName); + + checkResources(queueSource, + 0, 0, 0, 0, 0, 100*GB, 100, 18*GB, 18, 6, 0, 0, 0); + checkResources(parentQueueSource, + 0, 0, 0, 0, 0, 100*GB, 100, 18*GB, 18, 6, 0, 0, 0); + checkResources(userSource, + 0, 0, 0, 0, 0, 10*GB, 10, 18*GB, 18, 6, 0, 0, 0); + checkResources(userSource1, + 0, 0, 0, 0, 0, 10*GB, 10, 18*GB, 18, 6, 0, 0, 0); + checkResources(partitionSource, + 0, 0, 0, 0, 0, 100*GB, 100, 18*GB, 18, 6, 0, 0, 0); + + metrics.runAppAttempt(app.getApplicationId(), user); + + metrics.allocateResources(partition, + user, 3, Resources.createResource(1*GB, 1), true); + metrics.reserveResource(partition, + user, Resources.createResource(3*GB, 3)); + + // Available resources is set externally, as it depends on dynamic + // configurable cluster/queue resources + checkResources(queueSource, + 3*GB, 3, 3, 3, 0, 100*GB, 100, 15*GB, 15, 3, 3*GB, 3, 1); + checkResources(parentQueueSource, + 3*GB, 3, 3, 3, 0, 100*GB, 100, 15*GB, 15, 3, 3*GB, 3, 1); + checkResources(partitionSource, + 3*GB, 3, 3, 3, 0, 100*GB, 100, 15*GB, 15, 3, 3*GB, 3, 1); + + metrics.allocateResources(partition, + user, 3, Resources.createResource(1*GB, 1), true); + + checkResources(queueSource, + 6*GB, 6, 6, 6, 0, 100*GB, 100, 12*GB, 12, 0, 3*GB, 3, 1); + checkResources(parentQueueSource, + 6*GB, 6, 6, 6, 0, 100*GB, 100, 12*GB, 12, 0, 3*GB, 3, 1); + + metrics.releaseResources(partition, + user, 1, Resources.createResource(2*GB, 2)); + metrics.unreserveResource(partition, + user, Resources.createResource(3*GB, 3)); + checkResources(queueSource, + 4*GB, 4, 5, 6, 1, 100*GB, 100, 12*GB, 12, 0, 0, 0, 0); + checkResources(parentQueueSource, + 4*GB, 4, 5, 6, 1, 100*GB, 100, 12*GB, 12, 0, 0, 0, 0); + checkResources(partitionSource, + 4*GB, 4, 5, 6, 1, 100*GB, 100, 12*GB, 12, 0, 0, 0, 0); + + metrics.finishAppAttempt( + app.getApplicationId(), app.isPending(), app.getUser()); + + metrics.finishApp(user, RMAppState.FINISHED); + } + + @Test + public void testThreeLevelWithUserMetrics() { + String parentQueueName = "root"; + String leafQueueName = "root.leaf"; + String leafQueueName1 = "root.leaf.leaf1"; + String user = "alice"; + String partitionX = "x"; + String partitionY = "y"; + + CSQueueMetrics parentMetrics = + (CSQueueMetrics) CSQueueMetrics.forQueue( + parentQueueName, null, true, conf); + Queue parentQueue = mock(Queue.class); + when(parentQueue.getMetrics()).thenReturn(parentMetrics); + CSQueueMetrics metrics = + (CSQueueMetrics) CSQueueMetrics.forQueue( + leafQueueName, parentQueue, true, conf); + Queue leafQueue = mock(Queue.class); + when(leafQueue.getMetrics()).thenReturn(metrics); + CSQueueMetrics metrics1 = + (CSQueueMetrics) CSQueueMetrics.forQueue( + leafQueueName1, leafQueue, true, conf); + AppSchedulingInfo app = mockApp(user); + + metrics1.submitApp(user); + metrics1.submitAppAttempt(user); + + parentMetrics.setAvailableResourcesToQueue(partitionX, + Resources.createResource(200*GB, 200)); + parentMetrics.setAvailableResourcesToQueue(partitionY, + Resources.createResource(500*GB, 500)); + metrics.setAvailableResourcesToQueue(partitionX, + Resources.createResource(100*GB, 100)); + metrics.setAvailableResourcesToQueue(partitionY, + Resources.createResource(400*GB, 400)); + metrics1.setAvailableResourcesToQueue(partitionX, + Resources.createResource(50*GB, 50)); + metrics1.setAvailableResourcesToQueue(partitionY, + Resources.createResource(300*GB, 300)); + parentMetrics.setAvailableResourcesToUser(partitionX, + user, Resources.createResource(20*GB, 20)); + parentMetrics.setAvailableResourcesToUser(partitionY, + user, Resources.createResource(50*GB, 50)); + metrics.setAvailableResourcesToUser(partitionX, + user, Resources.createResource(10*GB, 10)); + metrics.setAvailableResourcesToUser(partitionY, + user, Resources.createResource(40*GB, 40)); + metrics1.setAvailableResourcesToUser(partitionX, + user, Resources.createResource(5*GB, 5)); + metrics1.setAvailableResourcesToUser(partitionY, + user, Resources.createResource(30*GB, 30)); + metrics1.incrPendingResources(partitionX, + user, 6, Resources.createResource(3*GB, 3)); + metrics1.incrPendingResources(partitionY, + user, 6, Resources.createResource(4*GB, 4)); + + MetricsSource partitionSourceX = + partitionSource(metrics1.getMetricsSystem(), partitionX); + + MetricsSource parentQueueSource_X = + queueSource(metrics1.getMetricsSystem(), partitionX, parentQueueName); + MetricsSource queueSource_X = + queueSource(metrics1.getMetricsSystem(), partitionX, leafQueueName); + MetricsSource queueSource1_X = + queueSource(metrics1.getMetricsSystem(), partitionX, leafQueueName1); + MetricsSource parentUserSource_X = + userSource(metrics1.getMetricsSystem(), partitionX, user, parentQueueName); + MetricsSource userSource_X = + userSource(metrics1.getMetricsSystem(), partitionX, user, leafQueueName); + MetricsSource userSource1_X = + userSource(metrics1.getMetricsSystem(), partitionX, user, leafQueueName1); + + checkResources(partitionSourceX, + 0, 0, 0, 0, 0, 200*GB, 200, 18*GB, 18, 6, 0, 0, 0); + checkResources(parentQueueSource_X, + 0, 0, 0, 0, 0, 200*GB, 200, 18*GB, 18, 6, 0, 0, 0); + checkResources(queueSource_X, + 0, 0, 0, 0, 0, 100*GB, 100, 18*GB, 18, 6, 0, 0, 0); + checkResources(queueSource1_X, + 0, 0, 0, 0, 0, 50*GB, 50, 18*GB, 18, 6, 0, 0, 0); + //checkResources(parentUserSource_X, + // 0, 0, 0, 0, 0, 20*GB, 20, 18*GB, 18, 6, 0, 0, 0); + //checkResources(userSource_X, + // 0, 0, 0, 0, 0, 10*GB, 10, 18*GB, 18, 6, 0, 0, 0); + checkResources(userSource1_X, + 0, 0, 0, 0, 0, 5*GB, 5, 18*GB, 18, 6, 0, 0, 0); + + MetricsSource partitionSourceY = + partitionSource(metrics1.getMetricsSystem(), partitionY); + + MetricsSource parentQueueSource_Y = + queueSource(metrics1.getMetricsSystem(), partitionY, + parentQueueName); + MetricsSource queueSource_Y = + queueSource(metrics1.getMetricsSystem(), partitionY, + leafQueueName); + MetricsSource queueSource1_Y = + queueSource(metrics1.getMetricsSystem(), partitionY, + leafQueueName1); + MetricsSource parentUserSource_Y = + userSource(metrics1.getMetricsSystem(), partitionY, user, + parentQueueName); + MetricsSource userSource_Y = + userSource(metrics1.getMetricsSystem(), partitionY, user, + leafQueueName); + MetricsSource userSource1_Y = + userSource(metrics1.getMetricsSystem(), partitionY, user, + leafQueueName1); + + checkResources(partitionSourceY, + 0, 0, 0, 0, 0, 500*GB, 500, 24*GB, 24, 6, 0, 0, 0); + checkResources(parentQueueSource_Y, + 0, 0, 0, 0, 0, 500*GB, 500, 24*GB, 24, 6, 0, 0, 0); + checkResources(queueSource_Y, + 0, 0, 0, 0, 0, 400*GB, 400, 24*GB, 24, 6, 0, 0, 0); + checkResources(queueSource1_Y, + 0, 0, 0, 0, 0, 300*GB, 300, 24*GB, 24, 6, 0, 0, 0); + //checkResources(parentUserSource_Y, + // 0, 0, 0, 0, 0, 50*GB, 50, 24*GB, 24, 6, 0, 0, 0); + //checkResources(userSource_Y, + // 0, 0, 0, 0, 0, 40*GB, 40, 24*GB, 24, 6, 0, 0, 0); + checkResources(userSource1_Y, + 0, 0, 0, 0, 0, 30*GB, 30, 24*GB, 24, 6, 0, 0, 0); + + metrics1.finishAppAttempt( + app.getApplicationId(), app.isPending(), app.getUser()); + + metrics1.finishApp(user, RMAppState.FINISHED); + } + + /** + * Structure: + * + * Both queues, q1 & q2 has been configured to run in only 1 partition, x + * UserMetrics has been disabled, hence trying to access the user source + * throws NPE from sources + * + * root + * / \ + * q1 q2 + * + * @throws Exception + */ + @Test(expected = NullPointerException.class) + public void testSinglePartitionWithSingleLevelQueueMetricsWithoutUserMetrics() + throws Exception { + + String parentQueueName = "root"; + Queue parentQueue = mock(Queue.class); + String user = "alice"; + + CSQueueMetrics root = (CSQueueMetrics) CSQueueMetrics.forQueue( + "root", null, false, conf); + when(parentQueue.getMetrics()).thenReturn(root); + CSQueueMetrics q1 = (CSQueueMetrics) CSQueueMetrics.forQueue( + "root.q1", parentQueue, false, conf); + CSQueueMetrics q2 = (CSQueueMetrics) CSQueueMetrics.forQueue( + "root.q2", parentQueue, false, conf); + + AppSchedulingInfo app = mockApp(user); + + q1.submitApp(user); + q1.submitAppAttempt(user); + + q1.incrPendingResources( + "x", user, 2, Resource.newInstance(1024, 1)); + + MetricsSource partitionSource = + partitionSource(q1.getMetricsSystem(), "x"); + MetricsSource rootQueueSource = + queueSource(q1.getMetricsSystem(), "x", parentQueueName); + MetricsSource q1Source= queueSource(q1.getMetricsSystem(), "x", "root.q1"); + MetricsSource q1UserSource= + userSource(q1.getMetricsSystem(), "x", user, "root.q1"); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 0, 0, 2*GB, 2, 2); + checkResources(q1UserSource, 0, 0, 0, 0, 0, 2*GB, 2, 2); + + q2.incrPendingResources( + "x", user, 3, Resource.newInstance(1024, 1)); + MetricsSource q2Source = + queueSource(q2.getMetricsSystem(), "x", "root.q2"); + MetricsSource q2UserSource= + userSource(q1.getMetricsSystem(), "x", user, "root.q2"); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 5*GB, 5, 5); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 5*GB, 5, 5); + checkResources(q2Source, 0, 0, 0, 0, 0, 3*GB, 3, 3); + checkResources(q2UserSource, 0, 0, 0, 0, 0, 3*GB, 3, 3); + + q1.finishAppAttempt( + app.getApplicationId(), app.isPending(), app.getUser()); + q1.finishApp(user, RMAppState.FINISHED); + } + + /** + * Structure: + * + * Both queues, q1 & q2 has been configured to run in only 1 partition, x + * UserMetrics has been disabled, hence trying to access the user source + * throws NPE from sources + * + * root + * / \ + * q1 q2 + * + * @throws Exception + */ + @Test + public void testNewPartition() + throws Exception { + + PartitionQueueMetrics metrics = + (PartitionQueueMetrics) PartitionQueueMetrics.forQueue( + ms, "root.q2", null, false, conf, "x"); + + MetricsSource partitionSource = partitionSource(ms, "x"); + + checkResources(partitionSource, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0); + } + + public static MetricsSource partitionSource(MetricsSystem ms, + String partition) { + MetricsSource s = ms.getSource( + PartitionQueueMetrics.pSourceName(partition).toString()); + return s; + } + + public static MetricsSource queueSource(MetricsSystem ms, + String partition, String queue) { + MetricsSource s = ms.getSource(PartitionQueueMetrics.pSourceName(partition). + append(PartitionQueueMetrics.qSourceName(queue)).toString()); + return s; + } + + public static MetricsSource userSource(MetricsSystem ms, + String partition, String user, String queue) { + MetricsSource s = ms.getSource(PartitionQueueMetrics.pSourceName(partition). + append(PartitionQueueMetrics.qSourceName(queue)).append(",user="). + append(user).toString()); + return s; + } + + public static void checkResources(MetricsSource source, long allocatedMB, + int allocatedCores, int allocCtnrs, long availableMB, + int availableCores, long pendingMB, int pendingCores, + int pendingCtnrs) { + MetricsRecordBuilder rb = getMetrics(source); + assertGauge("AllocatedMB", allocatedMB, rb); + assertGauge("AllocatedVCores", allocatedCores, rb); + assertGauge("AllocatedContainers", allocCtnrs, rb); + assertGauge("AvailableMB", availableMB, rb); + assertGauge("AvailableVCores", availableCores, rb); + assertGauge("PendingMB", pendingMB, rb); + assertGauge("PendingVCores", pendingCores, rb); + assertGauge("PendingContainers", pendingCtnrs, rb); + } + + private static AppSchedulingInfo mockApp(String user) { + AppSchedulingInfo app = mock(AppSchedulingInfo.class); + when(app.getUser()).thenReturn(user); + ApplicationId appId = + BuilderUtils.newApplicationId(1, 1); + ApplicationAttemptId id = + BuilderUtils.newApplicationAttemptId(appId, 1); + when(app.getApplicationAttemptId()).thenReturn(id); + return app; + } + + public static void checkResources(MetricsSource source, long allocatedMB, + int allocatedCores, int allocCtnrs, long aggreAllocCtnrs, + long aggreReleasedCtnrs, long availableMB, int availableCores, + long pendingMB, int pendingCores, int pendingCtnrs, long reservedMB, + int reservedCores, int reservedCtnrs) { + MetricsRecordBuilder rb = getMetrics(source); + assertGauge("AllocatedMB", allocatedMB, rb); + assertGauge("AllocatedVCores", allocatedCores, rb); + assertGauge("AllocatedContainers", allocCtnrs, rb); + assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb); + assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb); + assertGauge("AvailableMB", availableMB, rb); + assertGauge("AvailableVCores", availableCores, rb); + assertGauge("PendingMB", pendingMB, rb); + assertGauge("PendingVCores", pendingCores, rb); + assertGauge("PendingContainers", pendingCtnrs, rb); + assertGauge("ReservedMB", reservedMB, rb); + assertGauge("ReservedVCores", reservedCores, rb); + assertGauge("ReservedContainers", reservedCtnrs, rb); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java index 33c39290de5..880cb5c6e70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java @@ -38,6 +38,7 @@ import org.junit.Test; import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.assertGauge; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_COMPLETED; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_FAILED; @@ -84,8 +85,8 @@ public void setUp() { public void testDefaultSingleQueueMetrics() { String queueName = "single"; - QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, false, - conf); + QueueMetrics metrics = + QueueMetrics.forQueue(ms, queueName, null, false, conf); MetricsSource queueSource= queueSource(ms, queueName); AppSchedulingInfo app = mockApp(USER); @@ -119,15 +120,10 @@ public void testDefaultSingleQueueMetrics() { metrics.allocateResources(RMNodeLabelsManager.NO_LABEL, USER, 3, Resources.createResource(2*GB, 2), true); - rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker) - .gaugeLong(ALLOCATED_MB, 6 * GB) - .gaugeInt(ALLOCATED_V_CORES, 6) - .gaugeInt(ALLOCATED_CONTAINERS, 3) - .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3) - .gaugeLong(PENDING_MB, 9 * GB) - .gaugeInt(PENDING_V_CORES, 9) - .gaugeInt(PENDING_CONTAINERS, 2) - .checkAgainst(queueSource); + + rmChecker = + checkAllocatedMetricResourceTypes(queueSource, 6 * GB, 6, 3, 3, 0, + 100 * GB, 100, 9 * GB, 9, 2, 0, 0, 0); metrics.releaseResources(RMNodeLabelsManager.NO_LABEL, USER, 1, Resources.createResource(2*GB, 2)); @@ -248,7 +244,7 @@ public void testSingleQueueWithUserMetrics() { String queueName = "single2"; QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, true, - conf); + conf); MetricsSource queueSource = queueSource(ms, queueName); AppSchedulingInfo app = mockApp(USER_2); @@ -313,25 +309,11 @@ public void testSingleQueueWithUserMetrics() { metrics.allocateResources(RMNodeLabelsManager.NO_LABEL, USER_2, 3, Resources.createResource(2*GB, 2), true); resMetricsQueueSourceChecker = - ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker) - .gaugeLong(ALLOCATED_MB, 6 * GB) - .gaugeInt(ALLOCATED_V_CORES, 6) - .gaugeInt(ALLOCATED_CONTAINERS, 3) - .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3) - .gaugeLong(PENDING_MB, 9 * GB) - .gaugeInt(PENDING_V_CORES, 9) - .gaugeInt(PENDING_CONTAINERS, 2) - .checkAgainst(queueSource); + checkAllocatedMetricResourceTypes(queueSource, 6 * GB, 6, 3, 3, 0, + 100 * GB, 100, 9 * GB, 9, 2, 0, 0, 0); resMetricsUserSourceChecker = - ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker) - .gaugeLong(ALLOCATED_MB, 6 * GB) - .gaugeInt(ALLOCATED_V_CORES, 6) - .gaugeInt(ALLOCATED_CONTAINERS, 3) - .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3) - .gaugeLong(PENDING_MB, 9 * GB) - .gaugeInt(PENDING_V_CORES, 9) - .gaugeInt(PENDING_CONTAINERS, 2) - .checkAgainst(userSource); + checkAllocatedMetricResourceTypes(userSource, 6 * GB, 6, 3, 3, 0, 10 * GB, + 10, 9 * GB, 9, 2, 0, 0, 0); metrics.releaseResources(RMNodeLabelsManager.NO_LABEL, USER_2, 1, Resources.createResource(2*GB, 2)); @@ -508,58 +490,17 @@ public void testTwoLevelWithUserMetrics() { // Available resources is set externally, as it depends on dynamic // configurable cluster/queue resources resMetricsQueueSourceChecker = - ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker) - .gaugeLong(ALLOCATED_MB, 6 * GB) - .gaugeInt(ALLOCATED_V_CORES, 6) - .gaugeInt(ALLOCATED_CONTAINERS, 3) - .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3) - .gaugeLong(PENDING_MB, 9 * GB) - .gaugeInt(PENDING_V_CORES, 9) - .gaugeInt(PENDING_CONTAINERS, 2) - .gaugeLong(RESERVED_MB, 3 * GB) - .gaugeInt(RESERVED_V_CORES, 3) - .gaugeInt(RESERVED_CONTAINERS, 1) - .checkAgainst(leaf.queueSource); + checkReservedMetricResourceTypes(leaf.queueSource, 6 * GB, 6, 3, 3, 0, + 100 * GB, 100, 9 * GB, 9, 2, 3 * GB, 3, 1); resMetricsParentQueueSourceChecker = - ResourceMetricsChecker - .createFromChecker(resMetricsParentQueueSourceChecker) - .gaugeLong(ALLOCATED_MB, 6 * GB) - .gaugeInt(ALLOCATED_V_CORES, 6) - .gaugeInt(ALLOCATED_CONTAINERS, 3) - .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3) - .gaugeLong(PENDING_MB, 9 * GB) - .gaugeInt(PENDING_V_CORES, 9) - .gaugeInt(PENDING_CONTAINERS, 2) - .gaugeLong(RESERVED_MB, 3 * GB) - .gaugeInt(RESERVED_V_CORES, 3) - .gaugeInt(RESERVED_CONTAINERS, 1) - .checkAgainst(root.queueSource); + checkReservedMetricResourceTypes(root.queueSource, 6 * GB, 6, 3, 3, 0, + 100 * GB, 100, 9 * GB, 9, 2, 3 * GB, 3, 1); resMetricsUserSourceChecker = - ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker) - .gaugeLong(ALLOCATED_MB, 6 * GB) - .gaugeInt(ALLOCATED_V_CORES, 6) - .gaugeInt(ALLOCATED_CONTAINERS, 3) - .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3) - .gaugeLong(PENDING_MB, 9 * GB) - .gaugeInt(PENDING_V_CORES, 9) - .gaugeInt(PENDING_CONTAINERS, 2) - .gaugeLong(RESERVED_MB, 3 * GB) - .gaugeInt(RESERVED_V_CORES, 3) - .gaugeInt(RESERVED_CONTAINERS, 1) - .checkAgainst(leaf.userSource); - resMetricsParentUserSourceChecker = ResourceMetricsChecker - .createFromChecker(resMetricsParentUserSourceChecker) - .gaugeLong(ALLOCATED_MB, 6 * GB) - .gaugeInt(ALLOCATED_V_CORES, 6) - .gaugeInt(ALLOCATED_CONTAINERS, 3) - .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3) - .gaugeLong(PENDING_MB, 9 * GB) - .gaugeInt(PENDING_V_CORES, 9) - .gaugeInt(PENDING_CONTAINERS, 2) - .gaugeLong(RESERVED_MB, 3 * GB) - .gaugeInt(RESERVED_V_CORES, 3) - .gaugeInt(RESERVED_CONTAINERS, 1) - .checkAgainst(root.userSource); + checkReservedMetricResourceTypes(leaf.userSource, 6 * GB, 6, 3, 3, 0, + 10 * GB, 10, 9 * GB, 9, 2, 3 * GB, 3, 1); + resMetricsParentUserSourceChecker = + checkReservedMetricResourceTypes(root.userSource, 6 * GB, 6, 3, 3, 0, + 10 * GB, 10, 9 * GB, 9, 2, 3 * GB, 3, 1); leaf.queueMetrics.releaseResources(RMNodeLabelsManager.NO_LABEL, USER, 1, Resources.createResource(2*GB, 2)); @@ -717,6 +658,53 @@ private static void checkAggregatedNodeTypes(MetricsSource source, assertCounter("AggregateOffSwitchContainersAllocated", offSwitch, rb); } + private ResourceMetricsChecker checkAllocatedMetricResourceTypes( + MetricsSource source, long allocatedMB, int allocatedCores, + int allocCtnrs, long aggreAllocCtnrs, long aggreReleasedCtnrs, + long availableMB, int availableCores, long pendingMB, int pendingCores, + int pendingCtnrs, long reservedMB, int reservedCores, int reservedCtnrs) { + + ResourceMetricsChecker rmChecker = + ResourceMetricsChecker.createMandatoryResourceChecker() + .gaugeLong(ALLOCATED_MB, allocatedMB) + .gaugeInt(ALLOCATED_V_CORES, allocatedCores) + .gaugeInt(ALLOCATED_CONTAINERS, allocCtnrs) + .counter(AGGREGATE_CONTAINERS_ALLOCATED, aggreAllocCtnrs) + .counter(AGGREGATE_CONTAINERS_RELEASED, aggreReleasedCtnrs) + .gaugeLong(AVAILABLE_MB, availableMB) + .gaugeInt(AVAILABLE_V_CORES, availableCores) + .gaugeLong(PENDING_MB, pendingMB) + .gaugeInt(PENDING_V_CORES, pendingCores) + .gaugeInt(PENDING_CONTAINERS, pendingCtnrs) + .gaugeLong(RESERVED_MB, reservedMB) + .gaugeInt(RESERVED_V_CORES, reservedCores) + .gaugeInt(RESERVED_CONTAINERS, reservedCtnrs).checkAgainst(source); + return rmChecker; + } + + private ResourceMetricsChecker checkReservedMetricResourceTypes( + MetricsSource source, long allocatedMB, int allocatedCores, + int allocCtnrs, long aggreAllocCtnrs, long aggreReleasedCtnrs, + long availableMB, int availableCores, long pendingMB, int pendingCores, + int pendingCtnrs, long reservedMB, int reservedCores, int reservedCtnrs) { + ResourceMetricsChecker rmChecker = + ResourceMetricsChecker.createMandatoryResourceChecker() + .gaugeLong(ALLOCATED_MB, allocatedMB) + .gaugeInt(ALLOCATED_V_CORES, allocatedCores) + .gaugeInt(ALLOCATED_CONTAINERS, allocCtnrs) + .counter(AGGREGATE_CONTAINERS_ALLOCATED, aggreAllocCtnrs) + .counter(AGGREGATE_CONTAINERS_RELEASED, aggreReleasedCtnrs) + .gaugeLong(AVAILABLE_MB, availableMB) + .gaugeInt(AVAILABLE_V_CORES, availableCores) + .gaugeLong(PENDING_MB, pendingMB) + .gaugeInt(PENDING_V_CORES, pendingCores) + .gaugeInt(PENDING_CONTAINERS, pendingCtnrs) + .gaugeLong(RESERVED_MB, reservedMB) + .gaugeInt(RESERVED_V_CORES, reservedCores) + .gaugeInt(RESERVED_CONTAINERS, reservedCtnrs).checkAgainst(source); + return rmChecker; + } + static AppSchedulingInfo mockApp(String user) { AppSchedulingInfo app = mock(AppSchedulingInfo.class); when(app.getUser()).thenReturn(user); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 737db5b0d65..a549b733080 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -1991,10 +1991,10 @@ public RMNodeLabelsManager createNodeLabelManager() { CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); LeafQueue leafQueueA = (LeafQueue) cs.getQueue("a"); - assertEquals(0 * GB, leafQueueA.getMetrics().getAvailableMB()); + assertEquals(5 * GB, leafQueueA.getMetrics().getAvailableMB()); assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB()); LeafQueue leafQueueB = (LeafQueue) cs.getQueue("b"); - assertEquals(0 * GB, leafQueueB.getMetrics().getAvailableMB()); + assertEquals(5 * GB, leafQueueB.getMetrics().getAvailableMB()); assertEquals(0 * GB, leafQueueB.getMetrics().getAllocatedMB()); // app1 -> a @@ -2028,13 +2028,14 @@ public RMNodeLabelsManager createNodeLabelManager() { reportNm2.getAvailableResource().getMemorySize()); assertEquals(0 * GB, leafQueueA.getMetrics().getAvailableMB()); - assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB()); - assertEquals(0 * GB, leafQueueB.getMetrics().getAvailableMB()); + assertEquals(5 * GB, leafQueueA.getMetrics().getAllocatedMB()); + assertEquals(5 * GB, leafQueueB.getMetrics().getAvailableMB()); assertEquals(0 * GB, leafQueueB.getMetrics().getAllocatedMB()); // The total memory tracked by QueueMetrics is 0GB for the default partition CSQueue rootQueue = cs.getRootQueue(); - assertEquals(0*GB, rootQueue.getMetrics().getAvailableMB() + + assertEquals(10 * GB, rootQueue.getMetrics().getAvailableMB() + + rootQueue.getMetrics().getAllocatedMB()); // Kill all apps in queue a @@ -2136,8 +2137,8 @@ public RMNodeLabelsManager createNodeLabelManager() { LeafQueue leafQueue = (LeafQueue) cs.getQueue("a"); // 3GB is used from label x quota. 1.5 GB is remaining from default label. // 2GB is remaining from label x. - assertEquals(15 * GB / 10, leafQueue.getMetrics().getAvailableMB()); - assertEquals(1 * GB, leafQueue.getMetrics().getAllocatedMB()); + assertEquals(20 * GB / 10, leafQueue.getMetrics().getAvailableMB()); + assertEquals(4 * GB, leafQueue.getMetrics().getAllocatedMB()); // app1 asks for 1 default partition container am1.allocate("*", 1 * GB, 5, new ArrayList()); @@ -2154,12 +2155,13 @@ public RMNodeLabelsManager createNodeLabelManager() { // 3GB is used from label x quota. 2GB used from default label. // So 0.5 GB is remaining from default label. assertEquals(5 * GB / 10, leafQueue.getMetrics().getAvailableMB()); - assertEquals(2 * GB, leafQueue.getMetrics().getAllocatedMB()); + assertEquals(5 * GB, leafQueue.getMetrics().getAllocatedMB()); // The total memory tracked by QueueMetrics is 10GB // for the default partition CSQueue rootQueue = cs.getRootQueue(); - assertEquals(10*GB, rootQueue.getMetrics().getAvailableMB() + + assertEquals(13 * GB, rootQueue.getMetrics().getAvailableMB() + + rootQueue.getMetrics().getAllocatedMB()); rm1.close(); @@ -2208,7 +2210,7 @@ public RMNodeLabelsManager createNodeLabelManager() { CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); LeafQueue leafQueueA = (LeafQueue) cs.getQueue("a"); - assertEquals(12 * GB, leafQueueA.getMetrics().getAvailableMB()); + assertEquals(10 * GB, leafQueueA.getMetrics().getAvailableMB()); assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB()); // app1 -> a @@ -2241,8 +2243,8 @@ public RMNodeLabelsManager createNodeLabelManager() { Assert.assertEquals(12 * GB, reportNm2.getAvailableResource().getMemorySize()); - assertEquals(12 * GB, leafQueueA.getMetrics().getAvailableMB()); - assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB()); + assertEquals(4 * GB, leafQueueA.getMetrics().getAvailableMB()); + assertEquals(6 * GB, leafQueueA.getMetrics().getAllocatedMB()); // app2 -> a RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a", ""); @@ -2273,12 +2275,13 @@ public RMNodeLabelsManager createNodeLabelManager() { reportNm2.getAvailableResource().getMemorySize()); assertEquals(6 * GB, leafQueueA.getMetrics().getAvailableMB()); - assertEquals(6 * GB, leafQueueA.getMetrics().getAllocatedMB()); + assertEquals(12 * GB, leafQueueA.getMetrics().getAllocatedMB()); // The total memory tracked by QueueMetrics is 12GB // for the default partition CSQueue rootQueue = cs.getRootQueue(); - assertEquals(12*GB, rootQueue.getMetrics().getAvailableMB() + + assertEquals(18 * GB, rootQueue.getMetrics().getAvailableMB() + + rootQueue.getMetrics().getAllocatedMB()); // Kill all apps in queue a