diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 02bf3767811..806c80abc29 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -573,7 +573,7 @@ public boolean isPlaceBlacklisted(String resourceName, public ContainerRequest allocate(NodeType type, SchedulerNode node, SchedulerRequestKey schedulerKey, - Container containerAllocated) { + RMContainer containerAllocated) { writeLock.lock(); try { if (null != containerAllocated) { @@ -731,7 +731,7 @@ public boolean checkAllocation(NodeType type, SchedulerNode node, } private void updateMetricsForAllocatedContainer(NodeType type, - SchedulerNode node, Container containerAllocated) { + SchedulerNode node, RMContainer containerAllocated) { QueueMetrics metrics = queue.getMetrics(); if (pending) { // once an allocation is done we assume the application is @@ -744,15 +744,20 @@ private void updateMetricsForAllocatedContainer(NodeType type, } public static void updateMetrics(ApplicationId applicationId, NodeType type, - SchedulerNode node, Container containerAllocated, String user, + SchedulerNode node, RMContainer containerAllocated, String user, Queue queue) { LOG.debug("allocate: applicationId={} container={} host={} user={}" - + " resource={} type={}", applicationId, containerAllocated.getId(), - containerAllocated.getNodeId(), user, containerAllocated.getResource(), + + " resource={} type={}", applicationId, + containerAllocated.getContainer().getId(), + containerAllocated.getNodeId(), user, + containerAllocated.getContainer().getResource(), type); if(node != null) { queue.getMetrics().allocateResources(node.getPartition(), user, 1, - containerAllocated.getResource(), true); + containerAllocated.getContainer().getResource(), false); + queue.getMetrics().decrPendingResources( + containerAllocated.getNodeLabelExpression(), user, 1, + containerAllocated.getContainer().getResource()); } queue.getMetrics().incrNodeTypeAggregations(user, type); } @@ -831,4 +836,8 @@ public String getDefaultNodeLabelExpression() { this.readLock.unlock(); } } + + public RMContext getRMContext() { + return this.rmContext; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java index ab4fc1eb245..799140de660 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java @@ -161,11 +161,17 @@ private void cancelPreviousRequest(SchedulerNode schedulerNode, // Decrement the pending using a dummy RR with // resource = prev update req capability if (pendingAsk != null && pendingAsk.getCount() > 0) { + Container container = Container.newInstance(UNDEFINED, + schedulerNode.getNodeID(), "host:port", + pendingAsk.getPerAllocationResource(), + schedulerKey.getPriority(), null); appSchedulingInfo.allocate(NodeType.OFF_SWITCH, schedulerNode, - schedulerKey, Container.newInstance(UNDEFINED, - schedulerNode.getNodeID(), "host:port", - pendingAsk.getPerAllocationResource(), - schedulerKey.getPriority(), null)); + schedulerKey, + new RMContainerImpl(container, schedulerKey, + appSchedulingInfo.getApplicationAttemptId(), + schedulerNode.getNodeID(), appSchedulingInfo.getUser(), + appSchedulingInfo.getRMContext(), + appPlacementAllocator.getPrimaryRequestedNodePartition())); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java new file mode 100644 index 00000000000..07446725410 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java @@ -0,0 +1,82 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; + +@Metrics(context = "yarn") +public class PartitionQueueMetrics extends QueueMetrics { + + private String partition; + + protected PartitionQueueMetrics(MetricsSystem ms, String queueName, + Queue parent, boolean enableUserMetrics, Configuration conf, String partition) { + super(ms, queueName, parent, enableUserMetrics, conf); + this.partition = partition; + if(parentQueue != null) { + + String newQueueName = parentQueue.getQueueName(); + if(parentQueue instanceof CSQueue) { + System.out.println("step 1 is csqueue"); + newQueueName = ((CSQueue) parentQueue).getQueuePath(); + } else { + System.out.println("step 2 is csqueue"); + } + String parentMetricName = partition + METRIC_NAME_DELIMITER + newQueueName; + this.parent = QUEUE_METRICS.get(parentMetricName); + } + } + + /** + * Partition * Queue * User Metrics + * + * @param userName Name of the user + * @return QueueMetrics + */ + + /** + * Partition * Queue * User Metrics + * + * Computes Metrics at Partition (Node Label) * Queue * User Level. + * + * Sample JMX O/P Structure: + * + * PartitionQueueMetrics (labelX) + * QueueMetrics (A) + * usermetrics + * QueueMetrics (A1) + * usermetrics + * QueueMetrics (A2) + * usermetrics + * QueueMetrics (B) + * usermetrics + * + * @param partition + * @return QueueMetrics + */ + @Override + public synchronized QueueMetrics getUserMetrics(String userName) { + if (users == null) { + return null; + } + + String partitionJMXStr = + (partition.equals(DEFAULT_PARTITION)) ? DEFAULT_PARTITION_JMX_STR + : partition; + + QueueMetrics metrics = (PartitionQueueMetrics) users.get(userName); + if (metrics == null) { + metrics = new PartitionQueueMetrics(this.metricsSystem, + this.queueName, null, false, this.conf, this.partition); + users.put(userName, metrics); + metricsSystem.register(pSourceName(partitionJMXStr).append(qSourceName(queueName)). + append(",user=").append(userName).toString(), + "Metrics for user '" + userName + "' in queue '" + queueName + "'", + ((PartitionQueueMetrics) metrics.tag(PARTITION_INFO, partitionJMXStr). + tag(QUEUE_INFO, queueName)).tag(USER_INFO, + userName)); + } + return metrics; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index 6a428b59377..3d01e1d14d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -25,7 +25,6 @@ import java.util.Map; import java.util.Map.Entry; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; @@ -52,6 +51,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Splitter; @InterfaceAudience.Private @@ -112,17 +112,34 @@ info("Queue", "Metrics by queue"); protected static final MetricsInfo USER_INFO = info("User", "Metrics by user"); + protected static final MetricsInfo PARTITION_INFO = + info("Partition", "Metrics by partition"); static final Splitter Q_SPLITTER = Splitter.on('.').omitEmptyStrings().trimResults(); protected final MetricsRegistry registry; protected final String queueName; - protected final QueueMetrics parent; + protected QueueMetrics parent; + protected final Queue parentQueue; protected final MetricsSystem metricsSystem; protected final Map users; protected final Configuration conf; private QueueMetricsForCustomResources queueMetricsForCustomResources; + protected final boolean enableUserMetrics; + + protected static final MetricsInfo P_RECORD_INFO = + info("PartitionQueueMetrics", "Metrics for the resource scheduler"); + + // Use "default" to operate NO_LABEL (default) partition internally + public static final String DEFAULT_PARTITION = "default"; + + // Use "" to register NO_LABEL (default) partition into metrics system + public static final String DEFAULT_PARTITION_JMX_STR = ""; + + //Metric Name Delimiter + public static final String METRIC_NAME_DELIMITER = "."; + private static final String ALLOCATED_RESOURCE_METRIC_PREFIX = "AllocatedResource."; private static final String ALLOCATED_RESOURCE_METRIC_DESC = @@ -148,13 +165,17 @@ private static final String AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC = "Aggregate Preempted Seconds for NAME"; - protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent, - boolean enableUserMetrics, Configuration conf) { + public QueueMetrics(MetricsSystem ms, String queueName, Queue parent, + boolean enableUserMetrics, Configuration conf) { + registry = new MetricsRegistry(RECORD_INFO); this.queueName = queueName; + this.parent = parent != null ? parent.getMetrics() : null; - this.users = enableUserMetrics ? new HashMap() - : null; + this.parentQueue = parent; + this.users = enableUserMetrics ? new HashMap() : null; + this.enableUserMetrics = enableUserMetrics; + metricsSystem = ms; this.conf = conf; runningTime = buildBuckets(conf); @@ -176,12 +197,25 @@ protected static StringBuilder sourceName(String queueName) { return sb; } - public synchronized - static QueueMetrics forQueue(String queueName, Queue parent, - boolean enableUserMetrics, - Configuration conf) { + static StringBuilder pSourceName(String partition) { + StringBuilder sb = new StringBuilder(P_RECORD_INFO.name()); + sb.append(",partition").append('=').append(partition); + return sb; + } + + static StringBuilder qSourceName(String queueName) { + StringBuilder sb = new StringBuilder(); + int i = 0; + for (String node : Q_SPLITTER.split(queueName)) { + sb.append(",q").append(i++).append('=').append(node); + } + return sb; + } + + public synchronized static QueueMetrics forQueue(String queueName, + Queue parent, boolean enableUserMetrics, Configuration conf) { return forQueue(DefaultMetricsSystem.instance(), queueName, parent, - enableUserMetrics, conf); + enableUserMetrics, conf); } /** @@ -195,7 +229,7 @@ public synchronized static void clearQueueMetrics() { /** * Simple metrics cache to help prevent re-registrations. */ - private static final Map QUEUE_METRICS = + protected static final Map QUEUE_METRICS = new HashMap(); /** @@ -203,26 +237,22 @@ public synchronized static void clearQueueMetrics() { * * @return A string to {@link QueueMetrics} map. */ - protected static Map getQueueMetrics() { + public static Map getQueueMetrics() { return QUEUE_METRICS; } - public synchronized - static QueueMetrics forQueue(MetricsSystem ms, String queueName, - Queue parent, boolean enableUserMetrics, - Configuration conf) { + public synchronized static QueueMetrics forQueue(MetricsSystem ms, + String queueName, Queue parent, boolean enableUserMetrics, + Configuration conf) { QueueMetrics metrics = QUEUE_METRICS.get(queueName); if (metrics == null) { - metrics = - new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf). - tag(QUEUE_INFO, queueName); - + metrics = new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf) + .tag(QUEUE_INFO, queueName); + // Register with the MetricsSystems if (ms != null) { - metrics = - ms.register( - sourceName(queueName).toString(), - "Metrics for queue: " + queueName, metrics); + metrics = ms.register(sourceName(queueName).toString(), + "Metrics for queue: " + queueName, metrics); } QUEUE_METRICS.put(queueName, metrics); } @@ -236,7 +266,8 @@ public synchronized QueueMetrics getUserMetrics(String userName) { } QueueMetrics metrics = users.get(userName); if (metrics == null) { - metrics = new QueueMetrics(metricsSystem, queueName, null, false, conf); + metrics = + new QueueMetrics(metricsSystem, queueName, null, false, conf); users.put(userName, metrics); metricsSystem.register( sourceName(queueName).append(",user=").append(userName).toString(), @@ -246,6 +277,95 @@ public synchronized QueueMetrics getUserMetrics(String userName) { return metrics; } + /** + * Partition * Queue Metrics + * + * Computes Metrics at Partition (Node Label) * Queue Level. + * + * Sample JMX O/P Structure: + * + * PartitionQueueMetrics (labelX) + * QueueMetrics (A) + * metrics + * QueueMetrics (A1) + * metrics + * QueueMetrics (A2) + * metrics + * QueueMetrics (B) + * metrics + * + * @param partition + * @return QueueMetrics + */ + public synchronized QueueMetrics getPartitionQueueMetrics(String partition) { + + String partitionJMXStr = partition; + + if ((partition == null) + || (partition.equals(RMNodeLabelsManager.NO_LABEL))) { + partition = DEFAULT_PARTITION; + partitionJMXStr = DEFAULT_PARTITION_JMX_STR; + } + + String metricName = partition + METRIC_NAME_DELIMITER + this.queueName; + QueueMetrics metrics = QUEUE_METRICS.get(metricName); + + if (metrics == null) { + QueueMetrics queueMetrics = + new PartitionQueueMetrics(metricsSystem, this.queueName, parentQueue, + this.enableUserMetrics, this.conf, partition); + metrics = metricsSystem.register( + pSourceName(partitionJMXStr).append(qSourceName(this.queueName)) + .toString(), + "Metrics for queue: " + this.queueName, + queueMetrics.tag(PARTITION_INFO, partitionJMXStr).tag(QUEUE_INFO, + this.queueName)); + QUEUE_METRICS.put(metricName, queueMetrics); + return queueMetrics; + } else { + return metrics; + } + } + + /** + * Partition Metrics + * + * Computes Metrics at Partition (Node Label) Level. + * + * Sample JMX O/P Structure: + * + * PartitionQueueMetrics (labelX) + * metrics + * + * @param partition + * @return QueueMetrics + */ + private QueueMetrics getPartitionMetrics(String partition) { + + String partitionJMXStr = partition; + if ((partition == null) + || (partition.equals(RMNodeLabelsManager.NO_LABEL))) { + partition = DEFAULT_PARTITION; + partitionJMXStr = DEFAULT_PARTITION_JMX_STR; + } + + String metricName = partition + METRIC_NAME_DELIMITER; + QueueMetrics metrics = getQueueMetrics().get(metricName); + if (metrics == null) { + metrics = new PartitionQueueMetrics(metricsSystem, this.queueName, null, + false, this.conf, partition); + + // Register with the MetricsSystems + if (metricsSystem != null) { + metricsSystem.register(pSourceName(partitionJMXStr).toString(), + "Metrics for partition: " + partitionJMXStr, + (PartitionQueueMetrics) metrics.tag(PARTITION_INFO, partitionJMXStr)); + } + getQueueMetrics().put(metricName, metrics); + } + return metrics; + } + private ArrayList parseInts(String value) { ArrayList result = new ArrayList(); for(String s: value.split(",")) { @@ -386,20 +506,42 @@ public void moveAppTo(AppSchedulingInfo app) { */ public void setAvailableResourcesToQueue(String partition, Resource limit) { if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - availableMB.set(limit.getMemorySize()); - availableVCores.set(limit.getVirtualCores()); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.setAvailable(limit); - registerCustomResources( - queueMetricsForCustomResources.getAvailableValues(), - AVAILABLE_RESOURCE_METRIC_PREFIX, AVAILABLE_RESOURCE_METRIC_DESC); + _setAvailableResources(limit); + } + + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics._setAvailableResources(limit); + + if(this.queueName.equals("root")) { + QueueMetrics partitionMetrics = getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics._setAvailableResources(limit); + } } } } + /** + * Set Available resources with support for resource vectors. + * + * @param limit + */ + public void _setAvailableResources(Resource limit) { + availableMB.set(limit.getMemorySize()); + availableVCores.set(limit.getVirtualCores()); + if (queueMetricsForCustomResources != null) { + queueMetricsForCustomResources.setAvailable(limit); + registerCustomResources( + queueMetricsForCustomResources.getAvailableValues(), + AVAILABLE_RESOURCE_METRIC_PREFIX, AVAILABLE_RESOURCE_METRIC_DESC); + } + } + /** * Set available resources. To be called by scheduler periodically as * resources become available. + * * @param limit resource limit */ public void setAvailableResourcesToQueue(Resource limit) { @@ -409,42 +551,69 @@ public void setAvailableResourcesToQueue(Resource limit) { /** * Set available resources. To be called by scheduler periodically as * resources become available. + * * @param partition Node Partition * @param user * @param limit resource limit */ - public void setAvailableResourcesToUser(String partition, - String user, Resource limit) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + public void setAvailableResourcesToUser(String partition, String user, + Resource limit) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { - userMetrics.setAvailableResourcesToQueue(partition, limit); + userMetrics._setAvailableResources(limit); + } + } + + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + QueueMetrics partitionUserMetrics = partitionQueueMetrics.getUserMetrics(user); + if(partitionUserMetrics != null) { + partitionUserMetrics._setAvailableResources(limit); } } } - + /** * Increment pending resource metrics + * * @param partition Node Partition * @param user * @param containers - * @param res the TOTAL delta of resources note this is different from - * the other APIs which use per container resource + * @param res the TOTAL delta of resources note this is different from the + * other APIs which use per container resource */ public void incrPendingResources(String partition, String user, int containers, Resource res) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - _incrPendingResources(containers, res); - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.incrPendingResources(partition, user, containers, res); - } - if (parent != null) { - parent.incrPendingResources(partition, user, containers, res); + internalIncrPendingResources(partition, user, containers, res); + } + + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.internalIncrPendingResources(partition, user, + containers, res); + QueueMetrics partitionMetrics = getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics._incrPendingResources(containers, res); } } } + public void internalIncrPendingResources(String partition, String user, + int containers, Resource res) { + _incrPendingResources(containers, res); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.internalIncrPendingResources(partition, user, containers, + res); + } + if (parent != null) { + parent.internalIncrPendingResources(partition, user, containers, res); + } + } + protected Map initAndGetCustomResources() { Map customResources = new HashMap(); ResourceInformation[] resources = ResourceUtils.getResourceTypesArray(); @@ -516,18 +685,35 @@ private void _incrPendingResources(int containers, Resource res) { public void decrPendingResources(String partition, String user, int containers, Resource res) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - _decrPendingResources(containers, res); - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.decrPendingResources(partition, user, containers, res); - } - if (parent != null) { - parent.decrPendingResources(partition, user, containers, res); + internalDecrPendingResources(partition, user, containers, res); + } + + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.internalDecrPendingResources(partition, user, + containers, res); + QueueMetrics partitionMetrics = getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics._decrPendingResources(containers, res); } } } + protected void internalDecrPendingResources(String partition, String user, + int containers, Resource res) { + _decrPendingResources(containers, res); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.internalDecrPendingResources(partition, user, containers, + res); + } + if (parent != null) { + parent.internalDecrPendingResources(partition, user, containers, res); + } + } + private void _decrPendingResources(int containers, Resource res) { pendingContainers.decr(containers); pendingMB.decr(res.getMemorySize() * containers); @@ -558,35 +744,62 @@ public void incrNodeTypeAggregations(String user, NodeType type) { } } - public void allocateResources(String partition, String user, - int containers, Resource res, boolean decrPending) { + public void allocateResources(String partition, String user, int containers, + Resource res, boolean decrPending) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - allocatedContainers.incr(containers); - aggregateContainersAllocated.incr(containers); - - allocatedMB.incr(res.getMemorySize() * containers); - allocatedVCores.incr(res.getVirtualCores() * containers); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.increaseAllocated(res, containers); - registerCustomResources( - queueMetricsForCustomResources.getAllocatedValues(), - ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); - } + internalAllocateResources(partition, user, containers, res, decrPending); + } - if (decrPending) { - _decrPendingResources(containers, res); - } - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.allocateResources(partition, user, - containers, res, decrPending); - } - if (parent != null) { - parent.allocateResources(partition, user, containers, res, decrPending); + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.internalAllocateResources(partition, user, + containers, res, decrPending); + QueueMetrics partitionMetrics = getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics._allocateResources(containers, res, decrPending); } } } + public void internalAllocateResources(String partition, String user, + int containers, Resource res, boolean decrPending) { + _allocateResources(containers, res, decrPending); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.internalAllocateResources(partition, user, containers, res, + decrPending); + } + if (parent != null) { + parent.internalAllocateResources(partition, user, containers, res, + decrPending); + } + } + + /** + * Allocate Resources for a partition with support for resource vectors. + * + * @param containers number of containers + * @param res resource containing memory size, vcores etc + * @param decrPending decides whether to decrease pending resource or not + */ + private void _allocateResources(int containers, Resource res, + boolean decrPending) { + allocatedContainers.incr(containers); + aggregateContainersAllocated.incr(containers); + allocatedMB.incr(res.getMemorySize() * containers); + allocatedVCores.incr(res.getVirtualCores() * containers); + if (queueMetricsForCustomResources != null) { + queueMetricsForCustomResources.increaseAllocated(res, containers); + registerCustomResources( + queueMetricsForCustomResources.getAllocatedValues(), + ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); + } + if (decrPending) { + _decrPendingResources(containers, res); + } + } + /** * Allocate Resource for container size change. * @param partition Node Partition @@ -594,82 +807,80 @@ public void allocateResources(String partition, String user, * @param res */ public void allocateResources(String partition, String user, Resource res) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - allocatedMB.incr(res.getMemorySize()); - allocatedVCores.incr(res.getVirtualCores()); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.increaseAllocated(res); - registerCustomResources( - queueMetricsForCustomResources.getAllocatedValues(), - ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); - } + allocatedMB.incr(res.getMemorySize()); + allocatedVCores.incr(res.getVirtualCores()); + if (queueMetricsForCustomResources != null) { + queueMetricsForCustomResources.increaseAllocated(res); + registerCustomResources( + queueMetricsForCustomResources.getAllocatedValues(), + ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); + } - pendingMB.decr(res.getMemorySize()); - pendingVCores.decr(res.getVirtualCores()); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.decreasePending(res); - registerCustomResources( - queueMetricsForCustomResources.getPendingValues(), - PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC); - } + pendingMB.decr(res.getMemorySize()); + pendingVCores.decr(res.getVirtualCores()); + if (queueMetricsForCustomResources != null) { + queueMetricsForCustomResources.decreasePending(res); + registerCustomResources(queueMetricsForCustomResources.getPendingValues(), + PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC); + } - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.allocateResources(partition, user, res); - } - if (parent != null) { - parent.allocateResources(partition, user, res); - } + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.allocateResources(partition, user, res); + } + if (parent != null) { + parent.allocateResources(partition, user, res); } } + + public void releaseResources(String partition, String user, int containers, + Resource res) { - public void releaseResources(String partition, - String user, int containers, Resource res) { if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - allocatedContainers.decr(containers); - aggregateContainersReleased.incr(containers); - allocatedMB.decr(res.getMemorySize() * containers); - allocatedVCores.decr(res.getVirtualCores() * containers); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.decreaseAllocated(res, containers); - registerCustomResources( - queueMetricsForCustomResources.getAllocatedValues(), - ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); - } + internalReleaseResources(partition, user, containers, res); + } - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.releaseResources(partition, user, containers, res); - } - if (parent != null) { - parent.releaseResources(partition, user, containers, res); + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.internalReleaseResources(partition, user, + containers, res); + QueueMetrics partitionMetrics = getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics._releaseResources(containers, res); } } } + public void internalReleaseResources(String partition, String user, + int containers, Resource res) { + + _releaseResources(containers, res); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.internalReleaseResources(partition, user, containers, res); + } + if (parent != null) { + parent.internalReleaseResources(partition, user, containers, res); + } + } + /** - * Release Resource for container size change. - * - * @param user - * @param res + * Release Resources for a partition with support for resource vectors. + * + * @param containers number of containers + * @param res resource containing memory size, vcores etc */ - private void releaseResources(String user, Resource res) { - allocatedMB.decr(res.getMemorySize()); - allocatedVCores.decr(res.getVirtualCores()); + private void _releaseResources(int containers, Resource res) { + allocatedContainers.decr(containers); + aggregateContainersReleased.incr(containers); + allocatedMB.decr(res.getMemorySize() * containers); + allocatedVCores.decr(res.getVirtualCores() * containers); if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.decreaseAllocated(res); + queueMetricsForCustomResources.decreaseAllocated(res, containers); registerCustomResources( queueMetricsForCustomResources.getAllocatedValues(), ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); } - - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.releaseResources(user, res); - } - if (parent != null) { - parent.releaseResources(user, res); - } } public void preemptContainer() { @@ -728,11 +939,31 @@ public void updatePreemptedSecondsForCustomResources(Resource res, public void reserveResource(String partition, String user, Resource res) { if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - reserveResource(user, res); + internalReserveResources(partition, user, res); + } + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.internalReserveResources(partition, user, res); + QueueMetrics partitionMetrics = getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics._reserveResource(res); + } } } - public void reserveResource(String user, Resource res) { + protected void internalReserveResources(String partition, String user, + Resource res) { + _reserveResource(res); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.internalReserveResources(partition, user, res); + } + if (parent != null) { + parent.internalReserveResources(partition, user, res); + } + } + + public void _reserveResource(Resource res) { reservedContainers.incr(); reservedMB.incr(res.getMemorySize()); reservedVCores.incr(res.getVirtualCores()); @@ -742,17 +973,37 @@ public void reserveResource(String user, Resource res) { queueMetricsForCustomResources.getReservedValues(), RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC); } + } + + public void unreserveResource(String partition, String user, Resource res) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + internalUnReserveResources(partition, user, res); + } + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.internalUnReserveResources(partition, user, res); + QueueMetrics partitionMetrics = getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics._unreserveResource(res); + } + } + } + + protected void internalUnReserveResources(String partition, String user, + Resource res) { + _unreserveResource(res); QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { - userMetrics.reserveResource(user, res); + userMetrics.internalUnReserveResources(partition, user, res); } if (parent != null) { - parent.reserveResource(user, res); + parent.internalUnReserveResources(partition, user, res); } } - - private void unreserveResource(String user, Resource res) { - reservedContainers.decr(); + + public void _unreserveResource(Resource res) { + int containers = 1; + reservedContainers.decr(containers); reservedMB.decr(res.getMemorySize()); reservedVCores.decr(res.getVirtualCores()); if (queueMetricsForCustomResources != null) { @@ -761,19 +1012,6 @@ private void unreserveResource(String user, Resource res) { queueMetricsForCustomResources.getReservedValues(), RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC); } - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.unreserveResource(user, res); - } - if (parent != null) { - parent.unreserveResource(user, res); - } - } - - public void unreserveResource(String partition, String user, Resource res) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - unreserveResource(user, res); - } } public void incrActiveUsers() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java index e9a0aafe6ee..7b366498bf1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java @@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import com.google.common.annotations.VisibleForTesting; + @Metrics(context = "yarn") public class CSQueueMetrics extends QueueMetrics { @@ -246,7 +248,7 @@ public synchronized static CSQueueMetrics forQueue(String queueName, return (CSQueueMetrics) metrics; } - + @Override public synchronized QueueMetrics getUserMetrics(String userName) { if (users == null) { @@ -254,7 +256,8 @@ public synchronized QueueMetrics getUserMetrics(String userName) { } CSQueueMetrics metrics = (CSQueueMetrics) users.get(userName); if (metrics == null) { - metrics = new CSQueueMetrics(metricsSystem, queueName, null, false, conf); + metrics = + new CSQueueMetrics(metricsSystem, queueName, null, false, conf); users.put(userName, metrics); metricsSystem.register( sourceName(queueName).append(",user=").append(userName).toString(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 69f41ef5337..c2f3387efc1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1397,8 +1397,8 @@ private Resource getHeadroom(User user, : getQueueMaxResource(partition); Resource headroom = Resources.componentwiseMin( - Resources.subtract(userLimitResource, user.getUsed(partition)), - Resources.subtract(currentPartitionResourceLimit, + Resources.subtractNonNegative(userLimitResource, user.getUsed(partition)), + Resources.subtractNonNegative(currentPartitionResourceLimit, queueUsage.getUsed(partition))); // Normalize it before return headroom = @@ -1718,11 +1718,16 @@ void allocateResource(Resource clusterResource, User user = usersManager.updateUserResourceUsage(userName, resource, nodePartition, true); - // Note this is a bit unconventional since it gets the object and modifies - // it here, rather then using set routine - Resources.subtractFrom(application.getHeadroom(), resource); // headroom - metrics.setAvailableResourcesToUser(nodePartition, - userName, application.getHeadroom()); + Resource partitionHeadroom = Resources.createResource(0, 0); + if (metrics.getUserMetrics(userName) != null) { + partitionHeadroom = getHeadroom(user, + cachedResourceLimitsForHeadroom.getLimit(), clusterResource, + getResourceLimitForActiveUsers(userName, clusterResource, + nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), + nodePartition); + } + metrics.setAvailableResourcesToUser(nodePartition, userName, + partitionHeadroom); if (LOG.isDebugEnabled()) { LOG.debug(getQueuePath() + " user=" + userName + " used=" @@ -1761,8 +1766,16 @@ void releaseResource(Resource clusterResource, User user = usersManager.updateUserResourceUsage(userName, resource, nodePartition, false); - metrics.setAvailableResourcesToUser(nodePartition, - userName, application.getHeadroom()); + Resource partitionHeadroom = Resources.createResource(0, 0); + if (metrics.getUserMetrics(userName) != null) { + partitionHeadroom = getHeadroom(user, + cachedResourceLimitsForHeadroom.getLimit(), clusterResource, + getResourceLimitForActiveUsers(userName, clusterResource, + nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), + nodePartition); + } + metrics.setAvailableResourcesToUser(nodePartition, userName, + partitionHeadroom); if (LOG.isDebugEnabled()) { LOG.debug( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index f42351c85ed..8f6fb638872 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -609,7 +609,7 @@ public boolean apply(Resource cluster, ResourceCommitRequest label mgr.addToCluserNodeLabels( ImmutableSet.of(NodeLabel.newInstance("x", false))); @@ -2408,6 +2414,63 @@ public RMNodeLabelsManager createNodeLabelManager() { rm1.start(); MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId()); + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + double delta = 0.0001; + CSQueue leafQueue = cs.getQueue("a"); + CSQueue leafQueueB = cs.getQueue("b"); + CSQueue rootQueue = cs.getRootQueue(); + assertEquals(10 * GB, rootQueue.getMetrics().getAvailableMB(), delta); + assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); + assertEquals(7.5 * GB, leafQueueB.getMetrics().getAvailableMB(), delta); + + Map metrics = QueueMetrics.getQueueMetrics(); + MetricsSystem ms = leafQueueB.getMetrics().getMetricsSystem(); + QueueMetrics partXMetrics = + (QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "x"); + + QueueMetrics partDefaultMetrics = + (QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, ""); + QueueMetrics queueAMetrics = + (QueueMetrics) TestQueueMetrics.queueSource(ms, "root.a"); + QueueMetrics queueBMetrics = + (QueueMetrics) TestQueueMetrics.queueSource(ms, "root.b"); + QueueMetrics queueAPartDefaultMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.a"); + QueueMetrics queueAPartXMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.a"); + QueueMetrics queueBPartDefaultMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.b"); + QueueMetrics queueBPartXMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.b"); + QueueMetrics rootMetrics = + (QueueMetrics) TestQueueMetrics.queueSource(ms, "root"); + + assertEquals(10 * GB, partXMetrics.getAvailableMB(), delta); + assertEquals(10 * GB, partDefaultMetrics.getAvailableMB(), delta); + assertEquals(2.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(7.5 * GB, queueBPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(5 * GB, queueAPartXMetrics.getAvailableMB(), delta); + assertEquals(5 * GB, queueBPartXMetrics.getAvailableMB(), delta); + + assertEquals(10 * GB, rootMetrics.getAvailableMB(), delta); + assertEquals(2.5 * GB, queueAMetrics.getAvailableMB(), delta); + assertEquals(7.5 * GB, queueBMetrics.getAvailableMB(), delta); + // app1 -> a MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1) @@ -2423,15 +2486,14 @@ public RMNodeLabelsManager createNodeLabelManager() { // app1 asks for 3 partition= containers am1.allocate("*", 1 * GB, 3, new ArrayList()); - // NM1 do 50 heartbeats - CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); - RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - - SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); for (int i = 0; i < 50; i++) { cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); } + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + // app1 gets all resource in partition=x (non-exclusive) Assert.assertEquals(3, schedulerNode1.getNumContainers()); @@ -2447,34 +2509,122 @@ public RMNodeLabelsManager createNodeLabelManager() { Assert.assertEquals(9 * GB, reportNm2.getAvailableResource().getMemorySize()); - LeafQueue leafQueue = (LeafQueue) cs.getQueue("a"); - // 3GB is used from label x quota. 1.5 GB is remaining from default label. - // 2GB is remaining from label x. - assertEquals(15 * GB / 10, leafQueue.getMetrics().getAvailableMB()); + assertEquals(7 * GB, partXMetrics.getAvailableMB(), delta); + assertEquals(9 * GB, partDefaultMetrics.getAvailableMB(), delta); + + assertEquals(1.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(1 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta); + assertEquals(7.5 * GB, queueBPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta); + assertEquals(5 * GB, queueBPartXMetrics.getAvailableMB(), delta); + + assertEquals(1 * GB, queueAMetrics.getAllocatedMB(), delta); + assertEquals(1.5 * GB, queueAMetrics.getAvailableMB(), delta); + assertEquals(0 * GB, queueBMetrics.getAllocatedMB(), delta); + assertEquals(7.5 * GB, queueBMetrics.getAvailableMB(), delta); + assertEquals(0 * GB, queueAMetrics.getPendingMB(), delta); + assertEquals(0 * GB, queueAPartDefaultMetrics.getPendingMB(), delta); + assertEquals(0 * GB, queueAPartXMetrics.getPendingMB(), delta); + assertEquals(0 * GB, queueBPartDefaultMetrics.getPendingMB(), delta); + assertEquals(0 * GB, queueBPartXMetrics.getPendingMB(), delta); + + assertEquals(1.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); assertEquals(1 * GB, leafQueue.getMetrics().getAllocatedMB()); + + + assertEquals(3 * GB, partXMetrics.getAllocatedMB(), delta); + assertEquals(1 * GB, partDefaultMetrics.getAllocatedMB(), delta); + + QueueMetrics partDefaultQueueAUserMetrics = + (QueueMetrics) TestPartitionQueueMetrics.userSource(ms, "", "user", + "root.a"); + QueueMetrics partXQueueAUserMetrics = + (QueueMetrics) TestPartitionQueueMetrics.userSource(ms, "x", "user", + "root.a"); + QueueMetrics queueAUserMetrics = + (QueueMetrics) TestQueueMetrics.userSource(ms, "root.a", "user"); + + assertEquals(2 * GB, queueAUserMetrics.getAvailableMB(), delta); + assertEquals(1 * GB, queueAUserMetrics.getAllocatedMB(), delta); + + assertEquals(1.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(1 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta); + assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta); + + assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), delta); + assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta); + assertEquals(2 * GB, partXQueueAUserMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta); + - // app1 asks for 1 default partition container am1.allocate("*", 1 * GB, 5, new ArrayList()); - // NM2 do couple of heartbeats - RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } - SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId()); - cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } - // app1 gets all resource in default partition - Assert.assertEquals(2, schedulerNode2.getNumContainers()); + Assert.assertEquals(1, schedulerNode2.getNumContainers()); + Assert.assertEquals(8, schedulerNode1.getNumContainers()); - // 3GB is used from label x quota. 2GB used from default label. - // So 0.5 GB is remaining from default label. - assertEquals(5 * GB / 10, leafQueue.getMetrics().getAvailableMB()); - assertEquals(2 * GB, leafQueue.getMetrics().getAllocatedMB()); + assertEquals(1.5 * GB, queueAMetrics.getAvailableMB(), delta); + assertEquals(1 * GB, queueAMetrics.getAllocatedMB()); - // The total memory tracked by QueueMetrics is 10GB - // for the default partition - CSQueue rootQueue = cs.getRootQueue(); - assertEquals(10*GB, rootQueue.getMetrics().getAvailableMB() + - rootQueue.getMetrics().getAllocatedMB()); + assertEquals(1.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(1 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta); + assertEquals(0 * GB, queueAPartXMetrics.getAvailableMB(), delta); + assertEquals(8 * GB, queueAPartXMetrics.getAllocatedMB(), delta); + + assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), + delta); + assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta); + assertEquals(0 * GB, partXQueueAUserMetrics.getAvailableMB(), delta); + assertEquals(8 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta); + + assertEquals(2 * GB, queueAUserMetrics.getAvailableMB(), delta); + assertEquals(1 * GB, queueAUserMetrics.getAllocatedMB(), delta); + + assertEquals(2 * GB, partXMetrics.getAvailableMB(), delta); + assertEquals(8 * GB, partXMetrics.getAllocatedMB(), delta); + assertEquals(9 * GB, partDefaultMetrics.getAvailableMB(), delta); + assertEquals(1 * GB, partDefaultMetrics.getAllocatedMB(), delta); + + rm1.killApp(app1.getApplicationId()); + rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + + assertEquals(10 * GB, rootQueue.getMetrics().getAvailableMB(), delta); + assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); + assertEquals(7.5 * GB, leafQueueB.getMetrics().getAvailableMB(), delta); + assertEquals(1, queueAMetrics.getAggregateAllocatedContainers()); + assertEquals(1, queueAMetrics.getAggegatedReleasedContainers()); + assertEquals(1, queueAPartDefaultMetrics.getAggregateAllocatedContainers()); + assertEquals(1, queueAPartDefaultMetrics.getAggegatedReleasedContainers()); + + assertEquals(8, partXMetrics.getAggregateAllocatedContainers()); + assertEquals(1, partDefaultMetrics.getAggregateAllocatedContainers()); + + + assertEquals(8, queueAPartXMetrics.getAggregateAllocatedContainers()); + assertEquals(8, queueAPartXMetrics.getAggegatedReleasedContainers()); + assertEquals(2.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(5 * GB, queueAPartXMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, queueAUserMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), delta); + assertEquals(5 * GB, partXQueueAUserMetrics.getAvailableMB(), delta); rm1.close(); } @@ -2606,8 +2756,8 @@ public RMNodeLabelsManager createNodeLabelManager() { // The total memory tracked by QueueMetrics is 12GB // for the default partition CSQueue rootQueue = cs.getRootQueue(); - assertEquals(12*GB, rootQueue.getMetrics().getAvailableMB() + - rootQueue.getMetrics().getAllocatedMB()); + assertEquals(12 * GB, rootQueue.getMetrics().getAvailableMB() + + rootQueue.getMetrics().getAllocatedMB()); // Kill all apps in queue a cs.killAllAppsInQueue("a"); @@ -2618,6 +2768,218 @@ public RMNodeLabelsManager createNodeLabelManager() { assertEquals(0, leafQueueA.getMetrics().getUsedAMResourceVCores()); rm1.close(); } + + @Test + public void testTwoLevelQueueMetricsWithLabels() throws Exception { + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration( + this.conf); + + // Define top-level queues + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] {"a"}); + csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + + final String queueA = CapacitySchedulerConfiguration.ROOT + ".a"; + csConf.setCapacity(queueA, 100); + csConf.setAccessibleNodeLabels(queueA, toSet("x")); + csConf.setCapacityByLabel(queueA, "x", 100); + csConf.setMaximumCapacityByLabel(queueA, "x", 100); + + csConf.setQueues(queueA, new String[] {"a1"}); + final String queueA1 = queueA + ".a1"; + csConf.setCapacity(queueA1, 100); + + csConf.setAccessibleNodeLabels(queueA1, toSet("x")); + csConf.setCapacityByLabel(queueA1, "x", 100); + csConf.setMaximumCapacityByLabel(queueA1, "x", 100); + + // set node -> label + // label x exclusivity is set to true + mgr.addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("x", true))); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + // inject node label manager + MockRM rm1 = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x + MockNM nm2 = rm1.registerNode("h2:1234", 12 * GB); // label = + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + ParentQueue leafQueueA = (ParentQueue) cs.getQueue("a"); + LeafQueue leafQueueA1 = (LeafQueue) cs.getQueue("a1"); + assertEquals(12 * GB, leafQueueA1.getMetrics().getAvailableMB()); + assertEquals(0 * GB, leafQueueA1.getMetrics().getAllocatedMB()); + + MetricsSystem ms = leafQueueA1.getMetrics().getMetricsSystem(); + QueueMetrics partXMetrics = + (QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "x"); + QueueMetrics partDefaultMetrics = + (QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, ""); + QueueMetrics queueAPartDefaultMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.a"); + QueueMetrics queueAPartXMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.a"); + QueueMetrics queueA1PartDefaultMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.a.a1"); + QueueMetrics queueA1PartXMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.a.a1"); + QueueMetrics queueRootPartDefaultMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root"); + QueueMetrics queueRootPartXMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root"); + + QueueMetrics queueAMetrics = + (QueueMetrics) TestQueueMetrics.queueSource(ms, "root.a"); + QueueMetrics queueA1Metrics = + (QueueMetrics) TestQueueMetrics.queueSource(ms, "root.a.a1"); + QueueMetrics queueRootMetrics = + (QueueMetrics) TestQueueMetrics.queueSource(ms, "root"); + + assertEquals(12 * GB, queueAMetrics.getAvailableMB()); + assertEquals(12 * GB, queueA1Metrics.getAvailableMB()); + assertEquals(12 * GB, queueRootMetrics.getAvailableMB()); + assertEquals(12 * GB, leafQueueA.getMetrics().getAvailableMB()); + + + assertEquals(10 * GB, queueA1PartXMetrics.getAvailableMB()); + assertEquals(10 * GB, queueAPartXMetrics.getAvailableMB()); + assertEquals(10 * GB, queueRootPartXMetrics.getAvailableMB()); + assertEquals(12 * GB, queueA1PartDefaultMetrics.getAvailableMB()); + assertEquals(12 * GB, queueAPartDefaultMetrics.getAvailableMB()); + assertEquals(12 * GB, queueRootPartDefaultMetrics.getAvailableMB()); + + assertEquals(10 * GB, partXMetrics.getAvailableMB()); + assertEquals(12 * GB, partDefaultMetrics.getAvailableMB()); + + // app1 -> a + RMApp app1 = MockRMAppSubmitter.submit(rm1, + MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1) + .withAppName("app") + .withUser("user") + .withAcls(null) + .withQueue("a1") + .withAmLabel("x") + .build()); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // app1 asks for 5 partition=x containers + am1.allocate("*", 1 * GB, 5, new ArrayList(), "x"); + // NM1 do 50 heartbeats + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + + // app1 gets all resource in partition=x + Assert.assertEquals(6, schedulerNode1.getNumContainers()); + + SchedulerNodeReport reportNm1 = rm1.getResourceScheduler() + .getNodeReport(nm1.getNodeId()); + Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize()); + Assert.assertEquals(4 * GB, reportNm1.getAvailableResource().getMemorySize()); + + SchedulerNodeReport reportNm2 = rm1.getResourceScheduler() + .getNodeReport(nm2.getNodeId()); + Assert.assertEquals(0 * GB, reportNm2.getUsedResource().getMemorySize()); + Assert.assertEquals(12 * GB, + reportNm2.getAvailableResource().getMemorySize()); + + assertEquals(0 * GB, queueAMetrics.getAllocatedMB()); + assertEquals(0 * GB, queueA1Metrics.getAllocatedMB()); + assertEquals(0 * GB, queueRootMetrics.getAllocatedMB()); + assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB()); + assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB()); + + assertEquals(6 * GB, queueA1PartXMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueAPartXMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueRootPartXMetrics.getAllocatedMB()); + assertEquals(0 * GB, queueA1PartDefaultMetrics.getAllocatedMB()); + assertEquals(0 * GB, queueAPartDefaultMetrics.getAllocatedMB()); + assertEquals(0 * GB, queueRootPartDefaultMetrics.getAllocatedMB()); + + assertEquals(6 * GB, partXMetrics.getAllocatedMB()); + assertEquals(0 * GB, partDefaultMetrics.getAllocatedMB()); + + // app2 -> a + RMApp app2 = MockRMAppSubmitter.submit(rm1, + MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1) + .withAppName("app") + .withUser("user") + .withAcls(null) + .withQueue("a1") + .withAmLabel("") + .build()); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // app2 asks for 5 partition= containers + am2.allocate("*", 1 * GB, 5, new ArrayList(), ""); + // NM2 do 50 heartbeats + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId()); + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // app1 gets all resource in partition=x + Assert.assertEquals(6, schedulerNode2.getNumContainers()); + + reportNm1 = rm1.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize()); + Assert.assertEquals(4 * GB, + reportNm1.getAvailableResource().getMemorySize()); + + reportNm2 = rm1.getResourceScheduler().getNodeReport(nm2.getNodeId()); + Assert.assertEquals(6 * GB, reportNm2.getUsedResource().getMemorySize()); + Assert.assertEquals(6 * GB, + reportNm2.getAvailableResource().getMemorySize()); + + assertEquals(6 * GB, leafQueueA.getMetrics().getAvailableMB()); + assertEquals(6 * GB, leafQueueA.getMetrics().getAllocatedMB()); + + // The total memory tracked by QueueMetrics is 12GB + // for the default partition + CSQueue rootQueue = cs.getRootQueue(); + assertEquals(12 * GB, rootQueue.getMetrics().getAvailableMB() + + rootQueue.getMetrics().getAllocatedMB()); + + assertEquals(6 * GB, queueAMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueA1Metrics.getAllocatedMB()); + assertEquals(6 * GB, queueRootMetrics.getAllocatedMB()); + + assertEquals(6 * GB, queueA1PartXMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueAPartXMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueRootPartXMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueA1PartDefaultMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueAPartDefaultMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueRootPartDefaultMetrics.getAllocatedMB()); + + assertEquals(6 * GB, partXMetrics.getAllocatedMB()); + assertEquals(6 * GB, partDefaultMetrics.getAllocatedMB()); + + // Kill all apps in queue a + cs.killAllAppsInQueue("a1"); + rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED); + rm1.waitForAppRemovedFromScheduler(app1.getApplicationId()); + + assertEquals(0 * GB, leafQueueA.getMetrics().getUsedAMResourceMB()); + assertEquals(0, leafQueueA.getMetrics().getUsedAMResourceVCores()); + rm1.close(); + } @Test public void testQueueMetricsWithLabelsDisableElasticity() throws Exception {