diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 4abb7129cc7..9e4badd4178 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -551,7 +551,7 @@ public boolean isPlaceBlacklisted(String resourceName, public ContainerRequest allocate(NodeType type, SchedulerNode node, SchedulerRequestKey schedulerKey, - Container containerAllocated) { + RMContainer containerAllocated) { writeLock.lock(); try { if (null != containerAllocated) { @@ -709,7 +709,7 @@ public boolean checkAllocation(NodeType type, SchedulerNode node, } private void updateMetricsForAllocatedContainer(NodeType type, - SchedulerNode node, Container containerAllocated) { + SchedulerNode node, RMContainer containerAllocated) { QueueMetrics metrics = queue.getMetrics(); if (pending) { // once an allocation is done we assume the application is @@ -722,15 +722,20 @@ private void updateMetricsForAllocatedContainer(NodeType type, } public static void updateMetrics(ApplicationId applicationId, NodeType type, - SchedulerNode node, Container containerAllocated, String user, + SchedulerNode node, RMContainer containerAllocated, String user, Queue queue) { LOG.debug("allocate: applicationId={} container={} host={} user={}" - + " resource={} type={}", applicationId, containerAllocated.getId(), - containerAllocated.getNodeId(), user, containerAllocated.getResource(), + + " resource={} type={}", applicationId, + containerAllocated.getContainer().getId(), + containerAllocated.getNodeId(), user, + containerAllocated.getContainer().getResource(), type); if(node != null) { queue.getMetrics().allocateResources(node.getPartition(), user, 1, - containerAllocated.getResource(), true); + containerAllocated.getContainer().getResource(), false); + queue.getMetrics().decrPendingResources( + containerAllocated.getNodeLabelExpression(), user, 1, + containerAllocated.getContainer().getResource()); } queue.getMetrics().incrNodeTypeAggregations(user, type); } @@ -809,4 +814,8 @@ public String getDefaultNodeLabelExpression() { this.readLock.unlock(); } } + + public RMContext getRMContext() { + return this.rmContext; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java index ab4fc1eb245..799140de660 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java @@ -161,11 +161,17 @@ private void cancelPreviousRequest(SchedulerNode schedulerNode, // Decrement the pending using a dummy RR with // resource = prev update req capability if (pendingAsk != null && pendingAsk.getCount() > 0) { + Container container = Container.newInstance(UNDEFINED, + schedulerNode.getNodeID(), "host:port", + pendingAsk.getPerAllocationResource(), + schedulerKey.getPriority(), null); appSchedulingInfo.allocate(NodeType.OFF_SWITCH, schedulerNode, - schedulerKey, Container.newInstance(UNDEFINED, - schedulerNode.getNodeID(), "host:port", - pendingAsk.getPerAllocationResource(), - schedulerKey.getPriority(), null)); + schedulerKey, + new RMContainerImpl(container, schedulerKey, + appSchedulingInfo.getApplicationAttemptId(), + schedulerNode.getNodeID(), appSchedulingInfo.getUser(), + appSchedulingInfo.getRMContext(), + appPlacementAllocator.getPrimaryRequestedNodePartition())); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java index cd03aec829c..5f3116c2082 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java @@ -49,6 +49,7 @@ public synchronized QueueMetrics getUserMetrics(String userName) { ((PartitionQueueMetrics) metrics.tag(PARTITION_INFO, partition). tag(QUEUE_INFO, queueName)).tag(USER_INFO, userName)); + getQueueMetrics().put(metricName, metrics); } return metrics; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index 3763ef76096..7c9c3c599b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -221,7 +221,7 @@ public synchronized static void clearQueueMetrics() { * * @return A string to {@link QueueMetrics} map. */ - protected static Map getQueueMetrics() { + public static Map getQueueMetrics() { return QUEUE_METRICS; } @@ -251,15 +251,19 @@ public synchronized QueueMetrics getUserMetrics(String userName) { if (users == null) { return null; } - QueueMetrics metrics = users.get(userName); + String metricName = + this.queueName + userName; + QueueMetrics metrics = + users.get(metricName); if (metrics == null) { metrics = new QueueMetrics(metricsSystem, queueName, null, false, conf, false); - users.put(userName, metrics); + users.put(metricName, metrics); metricsSystem.register( sourceName(queueName).append(",user=").append(userName).toString(), "Metrics for user '"+ userName +"' in queue '"+ queueName +"'", metrics.tag(QUEUE_INFO, queueName).tag(USER_INFO, userName)); + QUEUE_METRICS.put(metricName, metrics); } return metrics; } @@ -488,13 +492,13 @@ public void moveAppTo(AppSchedulingInfo app) { public void setAvailableResourcesToQueue(String partition, Resource limit) { _setAvailableResources(limit); + } - QueueMetrics partitionQueueMetrics = - getPartitionQueueMetrics(partition); + public void setAvailableResourcesToQueuePerPartition(String partition, Resource limit) { + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); if (partitionQueueMetrics != null) { partitionQueueMetrics.setAvailableResourcesToQueue(partition, limit); - QueueMetrics partitionMetrics = - getPartitionMetrics(partition); + QueueMetrics partitionMetrics = getPartitionMetrics(partition); if (partitionMetrics != null) { partitionMetrics.setAvailableResourcesToQueue(partition, limit); } @@ -537,19 +541,19 @@ public void setAvailableResourcesToUser(String partition, if (userMetrics != null) { userMetrics.setAvailableResourcesToQueue(partition, limit); } + } - if (parent != null) { - parent.setAvailableResourcesToUser(partition, user, limit); - } - - QueueMetrics partitionQueueMetrics = - getPartitionQueueMetrics(partition); + public void setAvailableResourcesToUserPerPartition(String partition, String user, Resource limit) { + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); if (partitionQueueMetrics != null) { partitionQueueMetrics.setAvailableResourcesToUser(partition, user, limit); + QueueMetrics partitionMetrics = getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.setAvailableResourcesToUser(partition, user, limit); + } } } - /** * Increment pending resource metrics * @param partition Node Partition diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java index b29432df4ff..46d6f82ba1f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java @@ -198,16 +198,20 @@ public synchronized QueueMetrics getUserMetrics(String userName) { if (users == null) { return null; } - CSQueueMetrics metrics = (CSQueueMetrics) users.get(userName); + String metricName = + this.queueName + userName; + CSQueueMetrics metrics = + (CSQueueMetrics) users.get(metricName); if (metrics == null) { metrics = new CSQueueMetrics(metricsSystem, queueName, null, false, conf, false); - users.put(userName, metrics); + users.put(metricName, metrics); metricsSystem.register( sourceName(queueName).append(",user=").append(userName).toString(), "Metrics for user '" + userName + "' in queue '" + queueName + "'", ((CSQueueMetrics) metrics.tag(QUEUE_INFO, queueName)).tag(USER_INFO, userName)); + getQueueMetrics().put(metricName, metrics); } return metrics; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index 5c5f30f329c..ebbe4fe261e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -250,24 +250,50 @@ public static void updateUsedCapacity(final ResourceCalculator rc, } - private static Resource getMaxAvailableResourceToQueuePartition( - final ResourceCalculator rc, CSQueue queue, - Resource cluster, String partition) { + public static Resource computeMaxAvailableResource( + final ResourceCalculator rc, CSQueue queue, Resource clusterResource, + String partition) { + // Calculate guaranteed resource for a label in a queue by below logic. // (total label resource) * (absolute capacity of label in that queue) - Resource queueGuaranteedResource = queue.getEffectiveCapacity(partition); + Resource queueGuaranteedResource = + queue.getEffectiveCapacity(partition); - // Available resource in queue for a specific label will be calculated as + // Available resource in queue for a specific label will be calculated + // as // {(guaranteed resource for a label in a queue) - // (resource usage of that label in the queue)} - Resource available = (Resources.greaterThan(rc, cluster, - queueGuaranteedResource, - queue.getQueueResourceUsage().getUsed(partition))) ? Resources - .componentwiseMax(Resources.subtractFrom(queueGuaranteedResource, - queue.getQueueResourceUsage().getUsed(partition)), Resources - .none()) : Resources.none(); + // Finally accumulate this available resource to get total. + Resource available = + (Resources.greaterThan(rc, clusterResource, + queueGuaranteedResource, + queue.getQueueResourceUsage().getUsed(partition))) + ? Resources.componentwiseMax( + Resources.subtractFrom(queueGuaranteedResource, + queue.getQueueResourceUsage().getUsed(partition)), + Resources.none()) + : Resources.none(); return available; + + } + + private static Resource getMaxAvailableResourceToQueueWithPartitionWise( + final ResourceCalculator rc, CSQueue queue, Resource cluster) { + Set nodeLabels = + queue.getNodeLabelsForQueue(); + Resource totalAvailableResource = + Resources.createResource(0, 0); + for (String partition : nodeLabels) { + Resource available = + computeMaxAvailableResource(rc, queue, cluster, partition); + + // Update Partition Queue Metrics for the given queue and a label + queue.getMetrics().setAvailableResourcesToQueuePerPartition(partition, + available); + Resources.addTo(totalAvailableResource, available); + } + return totalAvailableResource; } /** @@ -298,27 +324,14 @@ public static void updateQueueStatistics( queueResourceUsage.getNodePartitionsSet())) { updateUsedCapacity(rc, nlm.getResourceByLabel(partition, cluster), partition, childQueue); - - // Update queue metrics w.r.t node labels. - // In QueueMetrics, null label is handled the same as NO_LABEL. - // This is because queue metrics for partitions are not tracked. - // In the future, will have to change this when/if queue metrics - // for partitions also get tracked. - childQueue.getMetrics().setAvailableResourcesToQueue( - partition, - getMaxAvailableResourceToQueuePartition(rc, childQueue, - cluster, partition)); } } else { updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster), nodePartition, childQueue); - - // Same as above. - childQueue.getMetrics().setAvailableResourcesToQueue( - nodePartition, - getMaxAvailableResourceToQueuePartition(rc, childQueue, - cluster, nodePartition)); } + childQueue.getMetrics().setAvailableResourcesToQueue(nodePartition, + getMaxAvailableResourceToQueueWithPartitionWise(rc, childQueue, + cluster)); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index a178f9e9a0b..afcdf1ba443 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1406,8 +1406,9 @@ private Resource getHeadroom(User user, : getQueueMaxResource(partition); Resource headroom = Resources.componentwiseMin( - Resources.subtract(userLimitResource, user.getUsed(partition)), - Resources.subtract(currentPartitionResourceLimit, + Resources.subtractNonNegative(userLimitResource, + user.getUsed(partition)), + Resources.subtractNonNegative(currentPartitionResourceLimit, queueUsage.getUsed(partition))); // Normalize it before return headroom = @@ -1421,6 +1422,19 @@ private Resource getHeadroom(User user, csContext.getClusterResourceUsage().getUsed(partition)); headroom = Resources.min(resourceCalculator, clusterPartitionResource, clusterFreePartitionResource, headroom); + + if (LOG.isDebugEnabled()) { + LOG.debug("Headroom calculation for each partition. user=" + + user.getUserName() + " used=" + queueUsage.getUsed(partition) + + " partition=" + partition + " current partition resource limit=" + + cachedResourceLimitsForHeadroom.getLimit() + + " clusterPartitionResource = " + clusterPartitionResource + + " usedpartitionresource = " + + csContext.getClusterResourceUsage().getUsed(partition) + + " clusterFreePartitionResource =" + + clusterFreePartitionResource + " headroom = " + headroom); + } + return headroom; } @@ -1471,8 +1485,12 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, application.setHeadroomProvider(headroomProvider); - metrics.setAvailableResourcesToUser(nodePartition, user, headroom); + metrics.setAvailableResourcesToUserPerPartition(nodePartition, user, + headroom); + metrics.setAvailableResourcesToUser(nodePartition, user, + getUserTotalHeadroomAcrossPartitions(queueUser, clusterResource)); + return userLimit; } @@ -1493,14 +1511,10 @@ public boolean getRackLocalityFullReset() { /** * - * @param userName - * Name of user who has submitted one/more app to given queue. - * @param clusterResource - * total cluster resource - * @param nodePartition - * partition name - * @param schedulingMode - * scheduling mode + * @param userName Name of user who has submitted one/more app to given queue. + * @param clusterResource total cluster resource + * @param nodePartition partition name + * @param schedulingMode scheduling mod * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY * @return Computed User Limit */ @@ -1727,8 +1741,10 @@ void allocateResource(Resource clusterResource, // Note this is a bit unconventional since it gets the object and modifies // it here, rather then using set routine Resources.subtractFrom(application.getHeadroom(), resource); // headroom - metrics.setAvailableResourcesToUser(nodePartition, + metrics.setAvailableResourcesToUserPerPartition(nodePartition, userName, application.getHeadroom()); + metrics.setAvailableResourcesToUser(nodePartition, userName, + getUserTotalHeadroomAcrossPartitions(user, clusterResource)); if (LOG.isDebugEnabled()) { LOG.debug(getQueueName() + " user=" + userName + " used=" @@ -1741,6 +1757,39 @@ void allocateResource(Resource clusterResource, } } + private Resource getUserTotalHeadroomAcrossPartitions(User user, + Resource clusterResource) { + Resource totalHeadroom = Resources.createResource(0, 0); + if (metrics.getUserMetrics(user.getUserName()) != null) { + Resource partitionHeadroom = Resources.createResource(0, 0); + for (String partition : Sets.union(queueCapacities.getNodePartitionsSet(), + this.queueUsage.getNodePartitionsSet())) { + + // Can SchedulingMode be always RESPECT_PARTITION_EXCLUSIVITY? Is it + // correct? requires in getting max user limit for calculation + partitionHeadroom = + getHeadroom(user, cachedResourceLimitsForHeadroom.getLimit(), + clusterResource, + getResourceLimitForActiveUsers(user.getUserName(), + clusterResource, partition, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), + partition); + + Resources.addTo(totalHeadroom, partitionHeadroom); + if (LOG.isDebugEnabled()) { + LOG.debug("Total Headroom calculations for each partition. user=" + + user.getUserName() + " used=" + + queueUsage.getUsed(partition) + " partition=" + partition + + "current partition resource limit=" + + cachedResourceLimitsForHeadroom.getLimit() + + " paritition headroom = " + partitionHeadroom + + " total head room=" + totalHeadroom); + } + } + } + return totalHeadroom; + } + void releaseResource(Resource clusterResource, FiCaSchedulerApp application, Resource resource, String nodePartition, RMContainer rmContainer) { @@ -1767,9 +1816,21 @@ void releaseResource(Resource clusterResource, User user = usersManager.updateUserResourceUsage(userName, resource, nodePartition, false); - metrics.setAvailableResourcesToUser(nodePartition, - userName, application.getHeadroom()); - + Resource partitionHeadroom = Resources.createResource(0, 0); + if (metrics.getUserMetrics(userName) != null) { + partitionHeadroom = + getHeadroom(user, cachedResourceLimitsForHeadroom.getLimit(), + clusterResource, + getResourceLimitForActiveUsers(userName, clusterResource, + nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), + nodePartition); + } + + metrics.setAvailableResourcesToUserPerPartition(nodePartition, userName, + partitionHeadroom); + metrics.setAvailableResourcesToUser(nodePartition, userName, + getUserTotalHeadroomAcrossPartitions(user, clusterResource)); + if (LOG.isDebugEnabled()) { LOG.debug( getQueueName() + " used=" + queueUsage.getUsed() + " numContainers=" diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 71aa865ccfc..1cd9f02ed0f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -604,7 +604,7 @@ public boolean apply(Resource cluster, ResourceCommitRequest label mgr.addToCluserNodeLabels( ImmutableSet.of(NodeLabel.newInstance("x", false))); @@ -2103,6 +2105,69 @@ public RMNodeLabelsManager createNodeLabelManager() { rm1.start(); MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId()); + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + double delta = 0.0001; + CSQueue leafQueue = + cs.getQueue("a"); + CSQueue leafQueueB = + cs.getQueue("b"); + CSQueue rootQueue = + cs.getRootQueue(); + assertEquals(20 * GB, rootQueue.getMetrics().getAvailableMB(), delta); + assertEquals(7.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); + assertEquals(12.5 * GB, leafQueueB.getMetrics().getAvailableMB(), delta); + + Map metrics = + QueueMetrics.getQueueMetrics(); + QueueMetrics partXMetrics = + metrics.get("x"); + + QueueMetrics partDefaultMetrics = + metrics.get("default"); + QueueMetrics queueAMetrics = + metrics.get("root.a"); + + QueueMetrics queueBMetrics = + metrics.get("root.b"); + QueueMetrics queueAPartDefaultMetrics = + metrics.get("defaultroot.a"); + + QueueMetrics queueBPartDefaultMetrics = + metrics.get("defaultroot.b"); + + QueueMetrics queueAPartXMetrics = metrics.get("xroot.a"); + + + QueueMetrics queueBPartXMetrics = metrics.get("xroot.b"); + + QueueMetrics rootMetrics = metrics.get("root"); + + assertEquals(10 * GB, partXMetrics.getAvailableMB(), delta); + assertEquals(10 * GB, partDefaultMetrics.getAvailableMB(), delta); + assertEquals(2.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(7.5 * GB, queueBPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(5 * GB, queueAPartXMetrics.getAvailableMB(), delta); + assertEquals(5 * GB, queueBPartXMetrics.getAvailableMB(), delta); + + assertEquals(20 * GB, rootMetrics.getAvailableMB(), delta); + assertEquals(7.5 * GB, queueAMetrics.getAvailableMB(), delta); + assertEquals(12.5 * GB, queueBMetrics.getAvailableMB(), delta); + // app1 -> a RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2); @@ -2110,13 +2175,13 @@ public RMNodeLabelsManager createNodeLabelManager() { // app1 asks for 3 partition= containers am1.allocate("*", 1 * GB, 3, new ArrayList()); - // NM1 do 50 heartbeats - CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); - RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + for (int i = + 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } - SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); for (int i = 0; i < 50; i++) { - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); } // app1 gets all resource in partition=x (non-exclusive) @@ -2134,35 +2199,116 @@ public RMNodeLabelsManager createNodeLabelManager() { Assert.assertEquals(9 * GB, reportNm2.getAvailableResource().getMemorySize()); - LeafQueue leafQueue = (LeafQueue) cs.getQueue("a"); - // 3GB is used from label x quota. 1.5 GB is remaining from default label. - // 2GB is remaining from label x. - assertEquals(20 * GB / 10, leafQueue.getMetrics().getAvailableMB()); + assertEquals(7 * GB, partXMetrics.getAvailableMB(), delta); + assertEquals(9 * GB, partDefaultMetrics.getAvailableMB(), delta); + + assertEquals(1.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(1 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta); + assertEquals(7.5 * GB, queueBPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta); + assertEquals(5 * GB, queueBPartXMetrics.getAvailableMB(), delta); + + assertEquals(4 * GB, queueAMetrics.getAllocatedMB(), delta); + assertEquals(3.5 * GB, queueAMetrics.getAvailableMB(), delta); + assertEquals(0 * GB, queueBMetrics.getAllocatedMB(), delta); + assertEquals(12.5 * GB, queueBMetrics.getAvailableMB(), delta); + assertEquals(0 * GB, queueAMetrics.getPendingMB(), delta); + assertEquals(0 * GB, queueAPartDefaultMetrics.getPendingMB(), delta); + assertEquals(0 * GB, queueAPartXMetrics.getPendingMB(), delta); + assertEquals(0 * GB, queueBPartDefaultMetrics.getPendingMB(), delta); + assertEquals(0 * GB, queueBPartXMetrics.getPendingMB(), delta); + + assertEquals(3.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); assertEquals(4 * GB, leafQueue.getMetrics().getAllocatedMB()); - // app1 asks for 1 default partition container + Map metrics1 = + QueueMetrics.getQueueMetrics(); + QueueMetrics partDefaultQueueAUserMetrics = + metrics.get("defaultroot.auser"); + + QueueMetrics partXQueueAUserMetrics = + metrics.get("xroot.auser"); + QueueMetrics queueAUserMetrics = + metrics.get("root.auser"); + + assertEquals(4 * GB, queueAUserMetrics.getAvailableMB(), delta); + assertEquals(4 * GB, queueAUserMetrics.getAllocatedMB(), delta); + + assertEquals(1.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(1 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta); + assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta); + + assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), delta); + assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta); + assertEquals(2 * GB, partXQueueAUserMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta); + + am1.allocate("*", 1 * GB, 5, new ArrayList()); - // NM2 do couple of heartbeats - RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + for (int i = + 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } - SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId()); - cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + for (int i = + 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } - // app1 gets all resource in default partition - Assert.assertEquals(2, schedulerNode2.getNumContainers()); + Assert.assertEquals(1, schedulerNode2.getNumContainers()); + Assert.assertEquals(8, schedulerNode1.getNumContainers()); - // 3GB is used from label x quota. 2GB used from default label. - // So 0.5 GB is remaining from default label. - assertEquals(5 * GB / 10, leafQueue.getMetrics().getAvailableMB()); - assertEquals(5 * GB, leafQueue.getMetrics().getAllocatedMB()); + assertEquals(1.5 * GB, queueAMetrics.getAvailableMB(), delta); + assertEquals(9 * GB, queueAMetrics.getAllocatedMB()); - // The total memory tracked by QueueMetrics is 10GB - // for the default partition - CSQueue rootQueue = cs.getRootQueue(); - assertEquals(13 * GB, rootQueue.getMetrics().getAvailableMB() - + - rootQueue.getMetrics().getAllocatedMB()); + assertEquals(1.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(1 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta); + assertEquals(0 * GB, queueAPartXMetrics.getAvailableMB(), delta); + assertEquals(8 * GB, queueAPartXMetrics.getAllocatedMB(), delta); + + assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), + delta); + assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta); + assertEquals(2 * GB, partXQueueAUserMetrics.getAvailableMB(), delta); + assertEquals(8 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta); + + assertEquals(2 * GB, queueAUserMetrics.getAvailableMB(), delta); + assertEquals(9 * GB, queueAUserMetrics.getAllocatedMB(), delta); + + assertEquals(2 * GB, partXMetrics.getAvailableMB(), delta); + assertEquals(8 * GB, partXMetrics.getAllocatedMB(), delta); + assertEquals(9 * GB, partDefaultMetrics.getAvailableMB(), delta); + assertEquals(1 * GB, partDefaultMetrics.getAllocatedMB(), delta); + + rm1.killApp(app1.getApplicationId()); + rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + + assertEquals(20 * GB, rootQueue.getMetrics().getAvailableMB(), delta); + assertEquals(7.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); + assertEquals(12.5 * GB, leafQueueB.getMetrics().getAvailableMB(), delta); + assertEquals(9, queueAMetrics.getAggregateAllocatedContainers()); + assertEquals(9, queueAMetrics.getAggegatedReleasedContainers()); + assertEquals(1, queueAPartDefaultMetrics.getAggregateAllocatedContainers()); + assertEquals(1, queueAPartDefaultMetrics.getAggegatedReleasedContainers()); + assertEquals(8, queueAPartXMetrics.getAggregateAllocatedContainers()); + assertEquals(8, queueAPartXMetrics.getAggegatedReleasedContainers()); + assertEquals(2.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(5 * GB, queueAPartXMetrics.getAvailableMB(), delta); + assertEquals(8 * GB, queueAUserMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), delta); + assertEquals(5 * GB, partXQueueAUserMetrics.getAvailableMB(), delta); rm1.close(); } @@ -2210,7 +2356,7 @@ public RMNodeLabelsManager createNodeLabelManager() { CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); LeafQueue leafQueueA = (LeafQueue) cs.getQueue("a"); - assertEquals(10 * GB, leafQueueA.getMetrics().getAvailableMB()); + assertEquals(22 * GB, leafQueueA.getMetrics().getAvailableMB()); assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB()); // app1 -> a @@ -2243,7 +2389,7 @@ public RMNodeLabelsManager createNodeLabelManager() { Assert.assertEquals(12 * GB, reportNm2.getAvailableResource().getMemorySize()); - assertEquals(4 * GB, leafQueueA.getMetrics().getAvailableMB()); + assertEquals(16 * GB, leafQueueA.getMetrics().getAvailableMB()); assertEquals(6 * GB, leafQueueA.getMetrics().getAllocatedMB()); // app2 -> a @@ -2274,15 +2420,12 @@ public RMNodeLabelsManager createNodeLabelManager() { Assert.assertEquals(6 * GB, reportNm2.getAvailableResource().getMemorySize()); - assertEquals(6 * GB, leafQueueA.getMetrics().getAvailableMB()); + assertEquals(10 * GB, leafQueueA.getMetrics().getAvailableMB()); assertEquals(12 * GB, leafQueueA.getMetrics().getAllocatedMB()); - // The total memory tracked by QueueMetrics is 12GB - // for the default partition CSQueue rootQueue = cs.getRootQueue(); - assertEquals(18 * GB, rootQueue.getMetrics().getAvailableMB() - + - rootQueue.getMetrics().getAllocatedMB()); + assertEquals(22 * GB, rootQueue.getMetrics().getAvailableMB() + + rootQueue.getMetrics().getAllocatedMB()); // Kill all apps in queue a cs.killAllAppsInQueue("a");