diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index c4c98e2302c..c3adb20c66a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -376,7 +376,7 @@ private void containerBasedPreemptOrKill(CSQueue root, for (String partitionToLookAt : allPartitions) { cloneQueues(root, Resources - .clone(nlm.getResourceByLabel(partitionToLookAt, clusterResources)), + .clone(nlm.getResourceByLabel(partitionToLookAt)), partitionToLookAt); } @@ -656,8 +656,7 @@ public float getMinimumThresholdForIntraQueuePreemption() { @Override public Resource getPartitionResource(String partition) { - return Resources.clone(nlm.getResourceByLabel(partition, - Resources.clone(scheduler.getClusterResource()))); + return Resources.clone(nlm.getResourceByLabel(partition)); } public LinkedHashSet getUnderServedQueuesPerPartition( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java index 507f696d057..d351b9a9942 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java @@ -344,19 +344,29 @@ public void reinitializeQueueLabels(Map> queueToLabels) { writeLock.unlock(); } } - - public Resource getQueueResource(String queueName, Set queueLabels, - Resource clusterResource) { + + /** + * Return the total resources available to this queue according to its node + * labels. If the queue cannot be found, no resources will be returned. If + * the queue's labels contains the {@link ANY} label, then null will be + * returned. + * + * @param queueName the name of the queue whose accessible resources will be + * returned + * @return the accessible resources for the queue + */ + public Resource getQueueResource(String queueName) { try { readLock.lock(); - if (queueLabels.contains(ANY)) { - return clusterResource; - } Queue q = queueCollections.get(queueName); - if (null == q) { + + if (q == null) { return Resources.none(); + } else if (q.accessibleNodeLabels.contains(ANY)) { + return null; + } else { + return q.resource; } - return q.resource; } finally { readLock.unlock(); } @@ -517,7 +527,7 @@ private void updateResourceMappings(Map before, } } - public Resource getResourceByLabel(String label, Resource clusterResource) { + public Resource getResourceByLabel(String label) { label = normalizeLabel(label); if (label.equals(NO_LABEL)) { return noNodeLabel.getResource(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index db63cd868d0..1d90dbedf60 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -1199,8 +1199,7 @@ public void recordContainerAllocationTime(long value) { } @Private - public boolean hasPendingResourceRequest(ResourceCalculator rc, - String nodePartition, Resource cluster, + public boolean hasPendingResourceRequest(String nodePartition, SchedulingMode schedulingMode) { // We need to consider unconfirmed allocations if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { @@ -1213,16 +1212,12 @@ public boolean hasPendingResourceRequest(ResourceCalculator rc, // To avoid too many allocation-proposals rejected for non-default // partition allocation if (StringUtils.equals(nodePartition, RMNodeLabelsManager.NO_LABEL)) { - pending = Resources.subtract(pending, Resources + Resources.subtractFromNonNegative(pending, Resources .createResource(unconfirmedAllocatedMem.get(), unconfirmedAllocatedVcores.get())); } - if (Resources.greaterThan(rc, cluster, pending, Resources.none())) { - return true; - } - - return false; + return !Resources.isNone(pending); } /* diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index d7c452a1ffc..a2ceee2f3c8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -604,22 +604,22 @@ private Resource getCurrentLimitResource(String nodePartition, * limit-set-by-parent) */ Resource queueMaxResource = - getQueueMaxResource(nodePartition, clusterResource); + getQueueMaxResource(nodePartition); return Resources.min(resourceCalculator, clusterResource, queueMaxResource, currentResourceLimits.getLimit()); } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { // When we doing non-exclusive resource allocation, maximum capacity of // all queues on this label equals to total resource with the label. - return labelManager.getResourceByLabel(nodePartition, clusterResource); + return labelManager.getResourceByLabel(nodePartition); } return Resources.none(); } - Resource getQueueMaxResource(String nodePartition, Resource clusterResource) { + Resource getQueueMaxResource(String nodePartition) { return Resources.multiplyAndNormalizeDown(resourceCalculator, - labelManager.getResourceByLabel(nodePartition, clusterResource), + labelManager.getResourceByLabel(nodePartition), queueCapacities.getAbsoluteMaximumCapacity(nodePartition), minimumAllocation); } @@ -713,7 +713,7 @@ boolean canAssignToThisQueue(Resource clusterResource, + ", currentUsedCapacity: " + Resources .divide(resourceCalculator, clusterResource, queueUsage.getUsed(nodePartition), labelManager - .getResourceByLabel(nodePartition, clusterResource)) + .getResourceByLabel(nodePartition)) + ", max-capacity: " + queueCapacities .getAbsoluteMaximumCapacity(nodePartition)); } @@ -781,7 +781,7 @@ public void incUsedResource(String nodeLabel, Resource resourceToInc, // ResourceUsage has its own lock, no addition lock needs here. queueUsage.incUsed(nodeLabel, resourceToInc); CSQueueUtils.updateUsedCapacity(resourceCalculator, - labelManager.getResourceByLabel(nodeLabel, Resources.none()), + labelManager.getResourceByLabel(nodeLabel), nodeLabel, this); if (null != parent) { parent.incUsedResource(nodeLabel, resourceToInc, null); @@ -797,7 +797,7 @@ public void decUsedResource(String nodeLabel, Resource resourceToDec, // ResourceUsage has its own lock, no addition lock needs here. queueUsage.decUsed(nodeLabel, resourceToDec); CSQueueUtils.updateUsedCapacity(resourceCalculator, - labelManager.getResourceByLabel(nodeLabel, Resources.none()), + labelManager.getResourceByLabel(nodeLabel), nodeLabel, this); if (null != parent) { parent.decUsedResource(nodeLabel, resourceToDec, null); @@ -904,10 +904,10 @@ public boolean accept(Resource cluster, Resource maxResourceLimit; if (allocation.getSchedulingMode() == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { - maxResourceLimit = getQueueMaxResource(partition, cluster); + maxResourceLimit = getQueueMaxResource(partition); } else{ maxResourceLimit = labelManager.getResourceByLabel( - schedulerContainer.getNodePartition(), cluster); + schedulerContainer.getNodePartition()); } if (!Resources.fitsIn(resourceCalculator, cluster, Resources.add(queueUsage.getUsed(partition), netAllocated), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index e1014c11fc8..40e65e84dbd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -249,7 +249,7 @@ private static Resource getMaxAvailableResourceToQueue( // Calculate guaranteed resource for a label in a queue by below logic. // (total label resource) * (absolute capacity of label in that queue) Resource queueGuranteedResource = Resources.multiply(nlm - .getResourceByLabel(partition, cluster), queue.getQueueCapacities() + .getResourceByLabel(partition), queue.getQueueCapacities() .getAbsoluteCapacity(partition)); // Available resource in queue for a specific label will be calculated as @@ -292,11 +292,11 @@ public static void updateQueueStatistics( for (String partition : Sets.union( queueCapacities.getNodePartitionsSet(), queueResourceUsage.getNodePartitionsSet())) { - updateUsedCapacity(rc, nlm.getResourceByLabel(partition, cluster), + updateUsedCapacity(rc, nlm.getResourceByLabel(partition), partition, childQueue); } } else { - updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster), + updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition), nodePartition, childQueue); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 7f50272240b..273c31cebdd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1238,8 +1238,7 @@ private CSAssignment allocateContainerOnSingleNode(PlacementSet ps, boolean withNodeHeartbeat) { CSAssignment assignment = getRootQueue().assignContainers( getClusterResource(), ps, new ResourceLimits(labelManager - .getResourceByLabel(ps.getPartition(), getClusterResource())), + .getResourceByLabel(ps.getPartition())), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assignment.setSchedulingMode(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); @@ -1345,8 +1344,7 @@ private CSAssignment allocateOrReserveNewContainers( // TODO, now we only consider limits for parent for non-labeled // resources, should consider labeled resources as well. new ResourceLimits(labelManager - .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, - getClusterResource())), + .getResourceByLabel(RMNodeLabelsManager.NO_LABEL)), SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY); assignment.setSchedulingMode(SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY); submitResourceCommitRequest(getClusterResource(), assignment); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index f24e30aa1ee..ac2179cb20b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -673,8 +673,7 @@ public Resource getUserAMResourceLimitPerPartition( Resource queuePartitionResource = Resources .multiplyAndNormalizeUp(resourceCalculator, - labelManager.getResourceByLabel(nodePartition, - lastClusterResource), + labelManager.getResourceByLabel(nodePartition), queueCapacities.getAbsoluteCapacity(nodePartition), minimumAllocation); @@ -707,7 +706,7 @@ public Resource calculateAndGetAMResourceLimitPerPartition( */ Resource queuePartitionResource = Resources.multiplyAndNormalizeUp( resourceCalculator, - labelManager.getResourceByLabel(nodePartition, lastClusterResource), + labelManager.getResourceByLabel(nodePartition), queueCapacities.getAbsoluteCapacity(nodePartition), minimumAllocation); @@ -1307,14 +1306,14 @@ protected Resource getHeadroom(User user, Resource queueCurrentLimit, protected Resource getHeadroom(User user, Resource queueCurrentLimit, Resource clusterResource, FiCaSchedulerApp application, String partition) { - return getHeadroom(user, queueCurrentLimit, clusterResource, + return getHeadroom(user, queueCurrentLimit, getResourceLimitForActiveUsers(application.getUser(), clusterResource, partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), partition); } private Resource getHeadroom(User user, - Resource currentPartitionResourceLimit, Resource clusterResource, + Resource currentPartitionResourceLimit, Resource userLimitResource, String partition) { /** * Headroom is: @@ -1341,7 +1340,7 @@ private Resource getHeadroom(User user, currentPartitionResourceLimit = partition.equals(RMNodeLabelsManager.NO_LABEL) ? currentPartitionResourceLimit - : getQueueMaxResource(partition, clusterResource); + : getQueueMaxResource(partition); Resource headroom = Resources.componentwiseMin( Resources.subtract(userLimitResource, user.getUsed(partition)), @@ -1353,7 +1352,7 @@ private Resource getHeadroom(User user, //headroom = min (unused resourcelimit of a label, calculated headroom ) Resource clusterPartitionResource = - labelManager.getResourceByLabel(partition, clusterResource); + labelManager.getResourceByLabel(partition); Resource clusterFreePartitionResource = Resources.subtract(clusterPartitionResource, csContext.getClusterResourceUsage().getUsed(partition)); @@ -1390,7 +1389,7 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, Resource headroom = metrics.getUserMetrics(user) == null ? Resources.none() : getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(), - clusterResource, userLimit, nodePartition); + userLimit, nodePartition); if (LOG.isDebugEnabled()) { LOG.debug("Headroom calculation for user " + user + ": " + " userLimit=" @@ -1538,13 +1537,10 @@ private void updateSchedulerHealthForCompletedContainer( /** * Recalculate QueueUsage Ratio. * - * @param clusterResource - * Total Cluster Resource * @param nodePartition * Partition */ - public void recalculateQueueUsageRatio(Resource clusterResource, - String nodePartition) { + public void recalculateQueueUsageRatio(String nodePartition) { try { writeLock.lock(); ResourceUsage queueResourceUsage = getQueueResourceUsage(); @@ -1553,10 +1549,10 @@ public void recalculateQueueUsageRatio(Resource clusterResource, for (String partition : Sets.union( getQueueCapacities().getNodePartitionsSet(), queueResourceUsage.getNodePartitionsSet())) { - usersManager.updateUsageRatio(partition, clusterResource); + usersManager.updateUsageRatio(partition); } } else { - usersManager.updateUsageRatio(nodePartition, clusterResource); + usersManager.updateUsageRatio(nodePartition); } } finally { writeLock.unlock(); @@ -1715,7 +1711,7 @@ private void updateCurrentResourceLimits( new ResourceLimits(currentResourceLimits.getLimit()); Resource queueMaxResource = Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager - .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource), + .getResourceByLabel(RMNodeLabelsManager.NO_LABEL), queueCapacities .getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL), minimumAllocation); @@ -1738,7 +1734,7 @@ public void updateClusterResource(Resource clusterResource, setQueueResourceLimitsInfo(clusterResource); // Update user consumedRatios - recalculateQueueUsageRatio(clusterResource, null); + recalculateQueueUsageRatio(null); // Update metrics CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 6800b74f8d4..f09a7561aed 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -688,7 +688,7 @@ private ResourceLimits getResourceLimitsOfChild(CSQueue child, // Get child's max resource Resource childConfiguredMaxResource = Resources.multiplyAndNormalizeDown( resourceCalculator, - labelManager.getResourceByLabel(nodePartition, clusterResource), + labelManager.getResourceByLabel(nodePartition), child.getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition), minimumAllocation); @@ -996,8 +996,7 @@ private void killContainersToEnforceMaxQueueCapacity(String partition, return; } - Resource partitionResource = labelManager.getResourceByLabel(partition, - null); + Resource partitionResource = labelManager.getResourceByLabel(partition); Resource maxResource = Resources.multiply(partitionResource, getQueueCapacities().getAbsoluteMaximumCapacity(partition)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java index 5f7d185c8d2..06dacba7bf2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java @@ -675,8 +675,7 @@ private long getLocalVersionOfUsersState(String nodePartition, private Resource computeUserLimit(String userName, Resource clusterResource, String nodePartition, SchedulingMode schedulingMode, boolean activeUser) { - Resource partitionResource = labelManager.getResourceByLabel(nodePartition, - clusterResource); + Resource partitionResource = labelManager.getResourceByLabel(nodePartition); /* * What is our current capacity? @@ -805,14 +804,11 @@ private Resource computeUserLimit(String userName, Resource clusterResource, * * @param partition * Node partition - * @param clusterResource - * Cluster Resource */ - public void updateUsageRatio(String partition, Resource clusterResource) { + public void updateUsageRatio(String partition) { try { writeLock.lock(); - Resource resourceByLabel = labelManager.getResourceByLabel(partition, - clusterResource); + Resource resourceByLabel = labelManager.getResourceByLabel(partition); float consumed = 0; User user; for (Map.Entry entry : getUsers().entrySet()) { @@ -1044,8 +1040,7 @@ public User updateUserResourceUsage(String userName, Resource resource, userLimitNeedsRecompute(); // Update usage ratios - Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition, - scheduler.getClusterResource()); + Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition); incQueueUsageRatio(nodePartition, user.updateUsageRatio( resourceCalculator, resourceByLabel, nodePartition)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index f753d31fdbf..b3eca936fe3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -838,8 +838,8 @@ public CSAssignment assignContainers(Resource clusterResource, if (reservedContainer == null) { // Check if application needs more resource, skip if it doesn't need more. - if (!application.hasPendingResourceRequest(rc, - ps.getPartition(), clusterResource, schedulingMode)) { + if (!application.hasPendingResourceRequest(ps.getPartition(), + schedulingMode)) { if (LOG.isDebugEnabled()) { LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId() + ", because it doesn't need more resource, schedulingMode=" diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 17bb104605d..1f77add76fe 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -913,7 +913,7 @@ protected void getActivedAppDiagnosticMessage( diagnosticMessage.append(" ; "); diagnosticMessage.append("Partition Resource = "); diagnosticMessage.append(rmContext.getNodeLabelManager() - .getResourceByLabel(appAMNodePartitionName, Resources.none())); + .getResourceByLabel(appAMNodePartitionName)); diagnosticMessage.append(" ; "); diagnosticMessage.append("Queue's Absolute capacity = "); diagnosticMessage.append( @@ -984,10 +984,9 @@ public ApplicationResourceUsageReport getResourceUsageReport() { // TODO: improve this writeLock.lock(); ApplicationResourceUsageReport report = super.getResourceUsageReport(); - Resource cluster = rmContext.getScheduler().getClusterResource(); Resource totalPartitionRes = rmContext.getNodeLabelManager().getResourceByLabel( - getAppAMNodePartitionName(), cluster); + getAppAMNodePartitionName()); ResourceCalculator calc = rmContext.getScheduler().getResourceCalculator(); if (!calc.isInvalidDivisor(totalPartitionRes)) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index 71e6f7fd7df..fcc271f434a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -96,6 +96,8 @@ private final SchedulingPolicy defaultSchedulingPolicy; + private final Map> accessibleNodeLabels; + // Policy for mapping apps to queues @VisibleForTesting QueuePlacementPolicy placementPolicy; @@ -128,7 +130,8 @@ public AllocationConfiguration(Map minQueueResources, Map> configuredQueues, ReservationQueueConfiguration globalReservationQueueConfig, Set reservableQueues, - Set nonPreemptableQueues) { + Set nonPreemptableQueues, + Map> accessibleNodeLabels) { this.minQueueResources = minQueueResources; this.maxQueueResources = maxQueueResources; this.maxChildQueueResources = maxChildQueueResources; @@ -152,6 +155,7 @@ public AllocationConfiguration(Map minQueueResources, this.placementPolicy = placementPolicy; this.configuredQueues = configuredQueues; this.nonPreemptableQueues = nonPreemptableQueues; + this.accessibleNodeLabels = accessibleNodeLabels; } public AllocationConfiguration(Configuration conf) { @@ -181,6 +185,7 @@ public AllocationConfiguration(Configuration conf) { placementPolicy = QueuePlacementPolicy.fromConfiguration(conf, configuredQueues); nonPreemptableQueues = new HashSet<>(); + accessibleNodeLabels = new HashMap<>(); } /** @@ -329,6 +334,14 @@ SchedulingPolicy getSchedulingPolicy(String queueName) { return (policy == null) ? defaultSchedulingPolicy : policy; } + public Set getAccessibleNodeLabels(String queueName) { + return accessibleNodeLabels.get(queueName); + } + + public Map> getAccessibleNodeLabels() { + return Collections.unmodifiableMap(accessibleNodeLabels); + } + public SchedulingPolicy getDefaultSchedulingPolicy() { return defaultSchedulingPolicy; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index 313a27ae378..fe53b0cc481 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -243,6 +243,7 @@ public synchronized void reloadAllocations() throws IOException, new HashMap<>(); Set reservableQueues = new HashSet<>(); Set nonPreemptableQueues = new HashSet<>(); + Map> accessibleNodeLabels = new HashMap<>(); int userMaxAppsDefault = Integer.MAX_VALUE; int queueMaxAppsDefault = Integer.MAX_VALUE; Resource queueMaxResourcesDefault = Resources.unbounded(); @@ -386,7 +387,7 @@ public synchronized void reloadAllocations() throws IOException, queueWeights, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, reservationAcls, configuredQueues, reservableQueues, - nonPreemptableQueues); + nonPreemptableQueues, accessibleNodeLabels); } // Load placement policy and pass it configured queues @@ -436,7 +437,8 @@ public synchronized void reloadAllocations() throws IOException, defaultSchedPolicy, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, reservationAcls, newPlacementPolicy, configuredQueues, - globalReservationQueueConfig, reservableQueues, nonPreemptableQueues); + globalReservationQueueConfig, reservableQueues, nonPreemptableQueues, + accessibleNodeLabels); lastSuccessfulReload = clock.getTime(); lastReloadAttemptFailed = false; @@ -463,7 +465,8 @@ private void loadQueue(String parentName, Element element, Map> resAcls, Map> configuredQueues, Set reservableQueues, - Set nonPreemptableQueues) + Set nonPreemptableQueues, + Map> accessibleNodeLabels) throws AllocationConfigurationException { String queueName = FairSchedulerUtilities.trimQueueName( element.getAttribute("name")); @@ -568,6 +571,21 @@ private void loadQueue(String parentName, Element element, if (!Boolean.parseBoolean(text)) { nonPreemptableQueues.add(queueName); } + } else if ("nodeLabels".equals(field.getTagName())) { + String[] nodeLabel = + ((Text)field.getFirstChild()).getData().trim().split(","); + + if ((nodeLabel != null) && (nodeLabel.length > 0)) { + Set nodeLabelSet = new HashSet<>(); + + for (String label : nodeLabel) { + if ((label != null) && (label.length() > 0)) { + nodeLabelSet.add(label); + } + } + + accessibleNodeLabels.put(queueName, nodeLabelSet); + } } else if ("queue".endsWith(field.getTagName()) || "pool".equals(field.getTagName())) { loadQueue(queueName, field, minQueueResources, maxQueueResources, @@ -575,7 +593,7 @@ private void loadQueue(String parentName, Element element, queueWeights, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, resAcls, configuredQueues, reservableQueues, - nonPreemptableQueues); + nonPreemptableQueues, accessibleNodeLabels); isLeaf = false; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 309dff488f0..be7cea1e7fd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; @@ -55,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -226,6 +228,8 @@ private void subtractResourcesOnBlacklistedNodes( /** * Headroom depends on resources in the cluster, current usage of the * queue, queue's fair-share and queue's max-resources. + * + * @return the headroom as a {@link Resource} instance */ @Override public Resource getHeadroom() { @@ -234,9 +238,12 @@ public Resource getHeadroom() { Resource queueFairShare = fsQueue.getFairShare(); Resource queueUsage = fsQueue.getResourceUsage(); - Resource clusterResource = this.scheduler.getClusterResource(); - Resource clusterUsage = this.scheduler.getRootQueueMetrics() - .getAllocatedResources(); + // We don't have a way to track the resources used per partition, so just + // do the math with the cluster resources. The fair share is already capped + // at the partition max, so we should be fine. + Resource clusterResource = scheduler.getClusterResource(); + Resource clusterUsage = + scheduler.getRootQueueMetrics().getAllocatedResources(); Resource clusterAvailableResources = Resources.subtract(clusterResource, clusterUsage); @@ -1323,18 +1330,43 @@ public void updateDemand() { @Override public Resource assignContainer(FSSchedulerNode node) { + Resource res; + if (isOverAMShareLimit()) { - PendingAsk amAsk = appSchedulingInfo.getNextPendingAsk(); - updateAMDiagnosticMsg(amAsk.getPerAllocationResource(), - " exceeds maximum AM resource allowed)."); - if (LOG.isDebugEnabled()) { - LOG.debug("AM resource request: " + amAsk.getPerAllocationResource() - + " exceeds maximum AM resource allowed, " - + getQueue().dumpState()); + logOverAMShareLimit(); + res = Resources.none(); + } else if (!nodeLabelsMatchContainerLabel(node.getNodeID())) { + res = Resources.none(); + } else { + res = assignContainer(node, false); + } + + return res; + } + + private void logOverAMShareLimit() { + PendingAsk amAsk = appSchedulingInfo.getNextPendingAsk(); + updateAMDiagnosticMsg(amAsk.getPerAllocationResource(), + " exceeds maximum AM resource allowed)."); + if (LOG.isDebugEnabled()) { + LOG.debug("AM resource request: " + amAsk.getPerAllocationResource() + + " exceeds maximum AM resource allowed, " + + getQueue().dumpState()); + } + } + + private boolean nodeLabelsMatchContainerLabel(NodeId nodeId) { + Set labelsOnNode = + scheduler.getLabelsManager().getLabelsOnNode(nodeId); + + for (String label : labelsOnNode) { + if (hasPendingResourceRequest(label, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)) { + return true; } - return Resources.none(); } - return assignContainer(node, false); + + return false; } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index b911a1ae71a..274f2d417c8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -28,6 +28,7 @@ import java.util.TreeSet; import com.google.common.annotations.VisibleForTesting; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; @@ -615,6 +617,19 @@ boolean isStarved() { return isStarvedForMinShare() || isStarvedForFairShare(); } + /** + * Test whether this queue should accept this app on the basis of node labels. + * + * @param app the app to test + * @return true if the queue should accept this app based on node labels + */ + boolean acceptAppNodeLabels(RMApp app) { + Set labels = getAccessibleNodeLabels(); + + return labels.contains(app.getAmNodeLabelExpression()) && + labels.contains(app.getAppNodeLabelExpression()); + } + @Override protected void dumpStateInternal(StringBuilder sb) { sb.append("{Name: " + getName() + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 5b4e4dc9e5e..c2caa28a150 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -219,7 +219,7 @@ public Resource assignContainer(FSSchedulerNode node) { try { for (FSQueue child : childQueues) { assigned = child.assignContainer(node); - if (!Resources.equals(assigned, Resources.none())) { + if (!Resources.isNone(assigned)) { break; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 10168232a94..181ee8d2bbd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -48,6 +48,8 @@ import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @Private @Unstable @@ -75,6 +77,7 @@ protected Resource maxShare; protected int maxRunningApps; protected Resource maxChildQueueResource; + private Set accessibleLabels = null; // maxAMShare is a value between 0 and 1. protected float maxAMShare; @@ -106,10 +109,11 @@ public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { * * @param recursive whether child queues should be reinitialized recursively */ - public void reinit(boolean recursive) { + public final void reinit(boolean recursive) { AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); allocConf.initFSQueue(this); updatePreemptionVariables(); + updateNodeLabels(); if (recursive) { for (FSQueue child : getChildQueues()) { @@ -118,7 +122,7 @@ public void reinit(boolean recursive) { } } - public String getName() { + public final String getName() { return name; } @@ -135,7 +139,7 @@ public FSParentQueue getParent() { return parent; } - public void setPolicy(SchedulingPolicy policy) { + public final void setPolicy(SchedulingPolicy policy) { policy.initialize(scheduler.getContext()); this.policy = policy; } @@ -271,30 +275,56 @@ public FSQueueMetrics getMetrics() { return metrics; } - /** Get the fair share assigned to this Schedulable. */ + /** + * Get the fair share assigned to this Schedulable. + * + * @return the fair share + */ + @Override public Resource getFairShare() { return fairShare; } @Override public void setFairShare(Resource fairShare) { - this.fairShare = fairShare; - metrics.setFairShare(fairShare); + Resource partitionMax = scheduler.getLabelsManager().getQueueResource(name); + + if (partitionMax != null) { + this.fairShare = Resources.componentwiseMin(fairShare, partitionMax); + } else { + this.fairShare = fairShare; + } + + metrics.setFairShare(this.fairShare); + if (LOG.isDebugEnabled()) { - LOG.debug("The updated fairShare for " + getName() + " is " + fairShare); + LOG.debug("The updated fairShare for " + name + " is " + this.fairShare); } } - /** Get the steady fair share assigned to this Schedulable. */ + /** + * Get the steady fair share assigned to this Schedulable. + * + * @return the steady fair share + */ public Resource getSteadyFairShare() { return steadyFairShare; } void setSteadyFairShare(Resource steadyFairShare) { - this.steadyFairShare = steadyFairShare; - metrics.setSteadyFairShare(steadyFairShare); + Resource partitionMax = scheduler.getLabelsManager().getQueueResource(name); + + if (partitionMax != null) { + this.steadyFairShare = + Resources.componentwiseMin(steadyFairShare, partitionMax); + } else { + this.steadyFairShare = steadyFairShare; + } + + metrics.setSteadyFairShare(this.steadyFairShare); } + @Override public boolean hasAccess(QueueACL acl, UserGroupInformation user) { return authorizer.checkPermission( new AccessRequest(queueEntity, user, @@ -386,6 +416,18 @@ private void updatePreemptionVariables() { } } + /** ++ * Update node label for this queue ++ */ + public void updateNodeLabels() { + accessibleLabels = + scheduler.getAllocationConfiguration().getAccessibleNodeLabels(name); + + if ((accessibleLabels == null) && (parent != null)) { + accessibleLabels = parent.getAccessibleNodeLabels(); + } + } + /** * Gets the children of this queue, if any. */ @@ -415,6 +457,17 @@ boolean assignContainerPreCheck(FSSchedulerNode node) { LOG.debug("Assigning container failed on node '" + node.getNodeName() + " because it has reserved containers."); } + + return false; + } else if (!nodeLabelCheck(node.getNodeID())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Assigning container failed on node '" + node.getNodeName() + + " because the node labels don't agree: " + accessibleLabels + + " on the queue v/s " + + scheduler.getLabelsManager().getLabelsOnNode(node.getNodeID()) + + " on the node"); + } + return false; } else if (!Resources.fitsIn(getResourceUsage(), maxShare)) { if (LOG.isDebugEnabled()) { @@ -422,10 +475,40 @@ boolean assignContainerPreCheck(FSSchedulerNode node) { + " because queue resource usage is larger than MaxShare: " + dumpState()); } + return false; - } else { + } + + return true; + } + + /** + * Check if the queue's labels allow it to assign containers on this node. + * + * @param nodeId the ID of the node to check + * @return true if the queue is allowed to assign containers on this node + */ + protected boolean nodeLabelCheck(NodeId nodeId) { + boolean queueHasLabels = (accessibleLabels != null) && + !accessibleLabels.isEmpty(); + + // A queue with no label will accept any node + if (!queueHasLabels) { return true; } + + Set labelsOnNode = + scheduler.getLabelsManager().getLabelsOnNode(nodeId); + + if ((labelsOnNode != null) && !labelsOnNode.isEmpty()) { + for (String queueLabel : accessibleLabels) { + if (labelsOnNode.contains(queueLabel)) { + return true; + } + } + } + + return false; } /** @@ -444,10 +527,9 @@ public String toString() { @Override public Set getAccessibleNodeLabels() { - // TODO, add implementation for FS - return null; + return accessibleLabels; } - + @Override public String getDefaultNodeLabelExpression() { // TODO, add implementation for FS diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index a5afa96a133..6ed7d277f3f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -105,6 +105,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; /** * A scheduler that schedules resources between a set of queues. The scheduler @@ -188,6 +189,8 @@ @VisibleForTesting final MaxRunningAppsEnforcer maxRunningEnforcer; + private RMNodeLabelsManager labelManager; + private AllocationFileLoaderService allocsLoader; @VisibleForTesting AllocationConfiguration allocConf; @@ -424,6 +427,16 @@ public FairSchedulerEventLog getEventLog() { } /** + * Convenience method for use by other fair scheduler components to get the + * node labels manager. + * + * @return the node labels manager + */ + RMNodeLabelsManager getLabelsManager() { + return labelManager; + } + + /** * Add a new application to the scheduler, with a given id, queue name, and * user. This will accept a new app even if the user or queue is above * configured limits, but the app will not be marked as runnable. @@ -564,8 +577,11 @@ FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) { appRejectMsg = "Application rejected by queue placement policy"; } else { queue = queueMgr.getLeafQueue(queueName, true); + if (queue == null) { appRejectMsg = queueName + " is not a leaf queue"; + } else if (!queue.acceptAppNodeLabels(rmApp)) { + appRejectMsg = "Application rejected based on node labels mismatch"; } } } catch (IllegalStateException se) { @@ -1248,10 +1264,25 @@ public void recover(RMState state) throws Exception { // NOT IMPLEMENTED } + /** + * Set the {@link RMContext}. + * + * @param rmContext the {@link RMContext} + */ + @Override public void setRMContext(RMContext rmContext) { this.rmContext = rmContext; } + /** + * Return the {@link RMContext}. + * + * @return the {@link RMContext} + */ + RMContext getRmContext() { + return this.rmContext; + } + private void initScheduler(Configuration conf) throws IOException { try { writeLock.lock(); @@ -1293,6 +1324,8 @@ private void initScheduler(Configuration conf) throws IOException { eventLog.init(this.conf); allocConf = new AllocationConfiguration(conf); + labelManager = rmContext.getNodeLabelManager(); + try { queueMgr.initialize(conf); } catch (Exception e) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index c08d13e9c48..2d78cc988b1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -42,6 +42,7 @@ import java.util.Iterator; import java.util.Set; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; /** * Maintains a list of queues as well as scheduling parameters for each queue, @@ -82,6 +83,16 @@ public void initialize(Configuration conf) throws IOException, getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true); // Recursively reinitialize to propagate queue properties rootQueue.reinit(true); + + updateNodeLabels(scheduler.getAllocationConfiguration()); + } + + private void updateNodeLabels(AllocationConfiguration allocConf) { + // After the queues are initialized, update the label manager with the + // queue-to-labels mapping. + RMNodeLabelsManager labelsManager = scheduler.getLabelsManager(); + + labelsManager.reinitializeQueueLabels(allocConf.getAccessibleNodeLabels()); } /** @@ -276,10 +287,14 @@ private FSQueue createNewQueues(FSQueueType queueType, String queueName = i.next(); // Check if child policy is allowed - SchedulingPolicy childPolicy = scheduler.getAllocationConfiguration(). - getSchedulingPolicy(queueName); + SchedulingPolicy childPolicy = queueConf.getSchedulingPolicy(queueName); + if (!parent.getPolicy().isChildPolicyAllowed(childPolicy)) { - LOG.error("Can't create queue '" + queueName + "'."); + LOG.error("Can't create queue '" + queueName + "' because the " + + "configured policy (" + childPolicy.getName() + ") is not " + + "allowed by the parent queue's policy (" + + parent.getPolicy().getName() + ")."); + return null; } @@ -523,6 +538,8 @@ public void updateAllocationConfiguration(AllocationConfiguration queueConf) { rootQueue.reinit(true); // Update steady fair shares for all queues rootQueue.recomputeSteadyShares(); + + updateNodeLabels(queueConf); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index 72377b0c096..f8d2f029368 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -98,11 +98,9 @@ public Resource getHeadroom(Resource queueFairShare, Resource queueUsage, int queueAvailableCPU = Math.max(queueFairShare.getVirtualCores() - queueUsage .getVirtualCores(), 0); - Resource headroom = Resources.createResource( + return Resources.createResource( Math.min(maxAvailable.getMemorySize(), queueAvailableMemory), - Math.min(maxAvailable.getVirtualCores(), - queueAvailableCPU)); - return headroom; + Math.min(maxAvailable.getVirtualCores(), queueAvailableCPU)); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index 4fc0ea490b9..6729c8f4a98 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -506,7 +506,7 @@ private void mockNodeLabelsManager(String nodeLabelsConfigStr) throws IOExceptio p.indexOf(","))); boolean exclusivity = Boolean.valueOf(p.substring(p.indexOf(",") + 1, p.length())); - when(nlm.getResourceByLabel(eq(partitionName), any(Resource.class))) + when(nlm.getResourceByLabel(eq(partitionName))) .thenReturn(res); when(nlm.isExclusiveNodeLabel(eq(partitionName))).thenReturn(exclusivity); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index a14a2b13684..7c19ad6b763 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -48,7 +48,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.util.Clock; @@ -1087,7 +1086,7 @@ ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData, private void setResourceAndNodeDetails() { when(mCS.getClusterResource()).thenReturn(clusterResources); - when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn( + when(lm.getResourceByLabel(anyString())).thenReturn( clusterResources); FiCaSchedulerNode mNode = mock(FiCaSchedulerNode.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java index 1da6f93f664..a734c47591f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java @@ -88,37 +88,37 @@ public void testGetLabelResourceWhenNodeActiveDeactive() throws Exception { mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"), toNodeId("n2"), toSet("p2"), toNodeId("n3"), toSet("p3"))); - Assert.assertEquals(mgr.getResourceByLabel("p1", null), EMPTY_RESOURCE); - Assert.assertEquals(mgr.getResourceByLabel("p2", null), EMPTY_RESOURCE); - Assert.assertEquals(mgr.getResourceByLabel("p3", null), EMPTY_RESOURCE); - Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, null), + Assert.assertEquals(mgr.getResourceByLabel("p1"), EMPTY_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p2"), EMPTY_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p3"), EMPTY_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL), EMPTY_RESOURCE); // active two NM to n1, one large and one small mgr.activateNode(NodeId.newInstance("n1", 1), SMALL_RESOURCE); mgr.activateNode(NodeId.newInstance("n1", 2), LARGE_NODE); - Assert.assertEquals(mgr.getResourceByLabel("p1", null), + Assert.assertEquals(mgr.getResourceByLabel("p1"), Resources.add(SMALL_RESOURCE, LARGE_NODE)); // check add labels multiple times shouldn't overwrite // original attributes on labels like resource mgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("p1", "p4")); - Assert.assertEquals(mgr.getResourceByLabel("p1", null), + Assert.assertEquals(mgr.getResourceByLabel("p1"), Resources.add(SMALL_RESOURCE, LARGE_NODE)); - Assert.assertEquals(mgr.getResourceByLabel("p4", null), EMPTY_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p4"), EMPTY_RESOURCE); // change the large NM to small, check if resource updated mgr.updateNodeResource(NodeId.newInstance("n1", 2), SMALL_RESOURCE); - Assert.assertEquals(mgr.getResourceByLabel("p1", null), + Assert.assertEquals(mgr.getResourceByLabel("p1"), Resources.multiply(SMALL_RESOURCE, 2)); // deactive one NM, and check if resource updated mgr.deactivateNode(NodeId.newInstance("n1", 1)); - Assert.assertEquals(mgr.getResourceByLabel("p1", null), SMALL_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p1"), SMALL_RESOURCE); // continus deactive, check if resource updated mgr.deactivateNode(NodeId.newInstance("n1", 2)); - Assert.assertEquals(mgr.getResourceByLabel("p1", null), EMPTY_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p1"), EMPTY_RESOURCE); // Add two NM to n1 back mgr.activateNode(NodeId.newInstance("n1", 1), SMALL_RESOURCE); @@ -126,7 +126,7 @@ public void testGetLabelResourceWhenNodeActiveDeactive() throws Exception { // And remove p1, now the two NM should come to default label, mgr.removeFromClusterNodeLabels(ImmutableSet.of("p1")); - Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, null), + Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL), Resources.add(SMALL_RESOURCE, LARGE_NODE)); } @@ -152,10 +152,10 @@ public void testGetLabelResource() throws Exception { // change label of n1 to p2 mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p2"))); - Assert.assertEquals(mgr.getResourceByLabel("p1", null), EMPTY_RESOURCE); - Assert.assertEquals(mgr.getResourceByLabel("p2", null), + Assert.assertEquals(mgr.getResourceByLabel("p1"), EMPTY_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p2"), Resources.multiply(SMALL_RESOURCE, 2)); - Assert.assertEquals(mgr.getResourceByLabel("p3", null), SMALL_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p3"), SMALL_RESOURCE); // add more labels mgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("p4", "p5", "p6")); @@ -180,16 +180,16 @@ public void testGetLabelResource() throws Exception { mgr.activateNode(NodeId.newInstance("n9", 1), SMALL_RESOURCE); // check varibles - Assert.assertEquals(mgr.getResourceByLabel("p1", null), SMALL_RESOURCE); - Assert.assertEquals(mgr.getResourceByLabel("p2", null), + Assert.assertEquals(mgr.getResourceByLabel("p1"), SMALL_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p2"), Resources.multiply(SMALL_RESOURCE, 3)); - Assert.assertEquals(mgr.getResourceByLabel("p3", null), + Assert.assertEquals(mgr.getResourceByLabel("p3"), Resources.multiply(SMALL_RESOURCE, 2)); - Assert.assertEquals(mgr.getResourceByLabel("p4", null), + Assert.assertEquals(mgr.getResourceByLabel("p4"), Resources.multiply(SMALL_RESOURCE, 1)); - Assert.assertEquals(mgr.getResourceByLabel("p5", null), + Assert.assertEquals(mgr.getResourceByLabel("p5"), Resources.multiply(SMALL_RESOURCE, 1)); - Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, null), + Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL), Resources.multiply(SMALL_RESOURCE, 1)); // change a bunch of nodes -> labels @@ -212,24 +212,22 @@ public void testGetLabelResource() throws Exception { toNodeId("n9"), toSet("p1"))); // check varibles - Assert.assertEquals(mgr.getResourceByLabel("p1", null), + Assert.assertEquals(mgr.getResourceByLabel("p1"), Resources.multiply(SMALL_RESOURCE, 2)); - Assert.assertEquals(mgr.getResourceByLabel("p2", null), + Assert.assertEquals(mgr.getResourceByLabel("p2"), Resources.multiply(SMALL_RESOURCE, 3)); - Assert.assertEquals(mgr.getResourceByLabel("p3", null), + Assert.assertEquals(mgr.getResourceByLabel("p3"), Resources.multiply(SMALL_RESOURCE, 2)); - Assert.assertEquals(mgr.getResourceByLabel("p4", null), + Assert.assertEquals(mgr.getResourceByLabel("p4"), Resources.multiply(SMALL_RESOURCE, 0)); - Assert.assertEquals(mgr.getResourceByLabel("p5", null), + Assert.assertEquals(mgr.getResourceByLabel("p5"), Resources.multiply(SMALL_RESOURCE, 0)); - Assert.assertEquals(mgr.getResourceByLabel("", null), + Assert.assertEquals(mgr.getResourceByLabel(""), Resources.multiply(SMALL_RESOURCE, 2)); } @Test(timeout=5000) public void testGetQueueResource() throws Exception { - Resource clusterResource = Resource.newInstance(9999, 1); - /* * Node->Labels: * host1 : red @@ -266,15 +264,14 @@ public void testGetQueueResource() throws Exception { // check resource Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), - mgr.getQueueResource("Q1", q1Label, clusterResource)); + mgr.getQueueResource("Q1")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), - mgr.getQueueResource("Q2", q2Label, clusterResource)); + mgr.getQueueResource("Q2")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2), - mgr.getQueueResource("Q3", q3Label, clusterResource)); + mgr.getQueueResource("Q3")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 1), - mgr.getQueueResource("Q4", q4Label, clusterResource)); - Assert.assertEquals(clusterResource, - mgr.getQueueResource("Q5", q5Label, clusterResource)); + mgr.getQueueResource("Q4")); + Assert.assertNull(mgr.getQueueResource("Q5")); mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("host2"), toSet("blue"))); /* @@ -288,15 +285,14 @@ public void testGetQueueResource() throws Exception { // check resource Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), - mgr.getQueueResource("Q1", q1Label, clusterResource)); + mgr.getQueueResource("Q1")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), - mgr.getQueueResource("Q2", q2Label, clusterResource)); + mgr.getQueueResource("Q2")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), - mgr.getQueueResource("Q3", q3Label, clusterResource)); + mgr.getQueueResource("Q3")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2), - mgr.getQueueResource("Q4", q4Label, clusterResource)); - Assert.assertEquals(clusterResource, - mgr.getQueueResource("Q5", q5Label, clusterResource)); + mgr.getQueueResource("Q4")); + Assert.assertNull(mgr.getQueueResource("Q5")); /* * Check resource after deactive/active some nodes @@ -312,15 +308,14 @@ public void testGetQueueResource() throws Exception { // check resource Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2), - mgr.getQueueResource("Q1", q1Label, clusterResource)); + mgr.getQueueResource("Q1")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), - mgr.getQueueResource("Q2", q2Label, clusterResource)); + mgr.getQueueResource("Q2")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), - mgr.getQueueResource("Q3", q3Label, clusterResource)); + mgr.getQueueResource("Q3")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2), - mgr.getQueueResource("Q4", q4Label, clusterResource)); - Assert.assertEquals(clusterResource, - mgr.getQueueResource("Q5", q5Label, clusterResource)); + mgr.getQueueResource("Q4")); + Assert.assertNull(mgr.getQueueResource("Q5")); /* * Check resource after refresh queue: @@ -347,15 +342,14 @@ public void testGetQueueResource() throws Exception { // check resource Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2), - mgr.getQueueResource("Q1", q1Label, clusterResource)); + mgr.getQueueResource("Q1")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2), - mgr.getQueueResource("Q2", q2Label, clusterResource)); + mgr.getQueueResource("Q2")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2), - mgr.getQueueResource("Q3", q3Label, clusterResource)); + mgr.getQueueResource("Q3")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2), - mgr.getQueueResource("Q4", q4Label, clusterResource)); - Assert.assertEquals(clusterResource, - mgr.getQueueResource("Q5", q5Label, clusterResource)); + mgr.getQueueResource("Q4")); + Assert.assertNull(mgr.getQueueResource("Q5")); /* * Active NMs in nodes already have NM @@ -370,15 +364,14 @@ public void testGetQueueResource() throws Exception { // check resource Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), - mgr.getQueueResource("Q1", q1Label, clusterResource)); + mgr.getQueueResource("Q1")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), - mgr.getQueueResource("Q2", q2Label, clusterResource)); + mgr.getQueueResource("Q2")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), - mgr.getQueueResource("Q3", q3Label, clusterResource)); + mgr.getQueueResource("Q3")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), - mgr.getQueueResource("Q4", q4Label, clusterResource)); - Assert.assertEquals(clusterResource, - mgr.getQueueResource("Q5", q5Label, clusterResource)); + mgr.getQueueResource("Q4")); + Assert.assertNull(mgr.getQueueResource("Q5")); /* * Deactive NMs in nodes already have NMs @@ -393,15 +386,14 @@ public void testGetQueueResource() throws Exception { // check resource Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 1), - mgr.getQueueResource("Q1", q1Label, clusterResource)); + mgr.getQueueResource("Q1")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 1), - mgr.getQueueResource("Q2", q2Label, clusterResource)); + mgr.getQueueResource("Q2")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 1), - mgr.getQueueResource("Q3", q3Label, clusterResource)); + mgr.getQueueResource("Q3")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 1), - mgr.getQueueResource("Q4", q4Label, clusterResource)); - Assert.assertEquals(clusterResource, - mgr.getQueueResource("Q5", q5Label, clusterResource)); + mgr.getQueueResource("Q4")); + Assert.assertNull(mgr.getQueueResource("Q5")); } @Test(timeout=5000) @@ -414,7 +406,7 @@ public void testGetLabelResourceWhenMultipleNMsExistingInSameHost() throws IOExc // check resource of no label, it should be small * 4 Assert.assertEquals( - mgr.getResourceByLabel(CommonNodeLabelsManager.NO_LABEL, null), + mgr.getResourceByLabel(CommonNodeLabelsManager.NO_LABEL), Resources.multiply(SMALL_RESOURCE, 4)); // change two of these nodes to p1, check resource of no_label and P1 @@ -424,10 +416,10 @@ public void testGetLabelResourceWhenMultipleNMsExistingInSameHost() throws IOExc // check resource Assert.assertEquals( - mgr.getResourceByLabel(CommonNodeLabelsManager.NO_LABEL, null), + mgr.getResourceByLabel(CommonNodeLabelsManager.NO_LABEL), Resources.multiply(SMALL_RESOURCE, 2)); Assert.assertEquals( - mgr.getResourceByLabel("p1", null), + mgr.getResourceByLabel("p1"), Resources.multiply(SMALL_RESOURCE, 2)); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index eef86a44990..363ecf7747e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -276,8 +276,8 @@ public static RMContext createRMContext(Configuration conf) { new ClientToAMTokenSecretManagerInRM(), null)); RMNodeLabelsManager nlm = mock(RMNodeLabelsManager.class); - when(nlm.getQueueResource(any(String.class), anySetOf(String.class), - any(Resource.class))).thenAnswer(new Answer() { + when(nlm.getQueueResource(any(String.class))) + .thenAnswer(new Answer() { @Override public Resource answer(InvocationOnMock invocation) throws Throwable { Object[] args = invocation.getArguments(); @@ -285,7 +285,7 @@ public Resource answer(InvocationOnMock invocation) throws Throwable { } }); - when(nlm.getResourceByLabel(any(String.class), any(Resource.class))) + when(nlm.getResourceByLabel(any(String.class))) .thenAnswer(new Answer() { @Override public Resource answer(InvocationOnMock invocation) throws Throwable { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index d45f756a2e9..35f897094aa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -3916,7 +3916,7 @@ public void testApplicationQueuePercent() when(rmContext.getRMApps()) .thenReturn(new ConcurrentHashMap()); RMNodeLabelsManager nlm = mock(RMNodeLabelsManager.class); - when(nlm.getResourceByLabel(any(), any())).thenReturn(res); + when(nlm.getResourceByLabel(any())).thenReturn(res); when(rmContext.getNodeLabelManager()).thenReturn(nlm); // Queue "test" consumes 100% of the cluster, so its capacity and absolute diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 4bc5127e9da..da890b509f6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -109,9 +109,8 @@ public void register(Class eventType, new NMTokenSecretManagerInRM(conf), new ClientToAMTokenSecretManagerInRM()); RMNodeLabelsManager nlm = mock(RMNodeLabelsManager.class); - when( - nlm.getQueueResource(any(String.class), any(Set.class), - any(Resource.class))).thenAnswer(new Answer() { + when(nlm.getQueueResource(any(String.class))) + .thenAnswer(new Answer() { @Override public Resource answer(InvocationOnMock invocation) throws Throwable { Object[] args = invocation.getArguments(); @@ -119,7 +118,7 @@ public Resource answer(InvocationOnMock invocation) throws Throwable { } }); - when(nlm.getResourceByLabel(any(String.class), any(Resource.class))) + when(nlm.getResourceByLabel(any(String.class))) .thenAnswer(new Answer() { @Override public Resource answer(InvocationOnMock invocation) throws Throwable { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index af4e1dd32a0..1028999b990 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -118,7 +118,15 @@ protected ResourceRequest createResourceRequest( protected ResourceRequest createResourceRequest( int memory, int vcores, String host, int priority, int numContainers, boolean relaxLocality) { - ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class); + return createResourceRequest(memory, vcores, host, priority, numContainers, + relaxLocality, RMNodeLabelsManager.NO_LABEL); + } + + protected ResourceRequest createResourceRequest( + int memory, int vcores, String host, int priority, int numContainers, + boolean relaxLocality, String label) { + ResourceRequest request = + recordFactory.newRecordInstance(ResourceRequest.class); request.setCapability(BuilderUtils.newResource(memory, vcores)); request.setResourceName(host); request.setNumContainers(numContainers); @@ -126,7 +134,8 @@ protected ResourceRequest createResourceRequest( prio.setPriority(priority); request.setPriority(prio); request.setRelaxLocality(relaxLocality); - request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL); + request.setNodeLabelExpression(label); + return request; } @@ -163,6 +172,13 @@ protected ApplicationAttemptId createSchedulingRequest( protected ApplicationAttemptId createSchedulingRequest( int memory, int vcores, String queueId, String userId, int numContainers, int priority) { + return createSchedulingRequest(memory, vcores, queueId, userId, + numContainers, priority, RMNodeLabelsManager.NO_LABEL); + } + + protected ApplicationAttemptId createSchedulingRequest( + int memory, int vcores, String queueId, String userId, int numContainers, + int priority, String label) { ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); scheduler.addApplication(id.getApplicationId(), queueId, userId, false); // This conditional is for testAclSubmitApplication where app is rejected @@ -170,9 +186,9 @@ protected ApplicationAttemptId createSchedulingRequest( if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) { scheduler.addApplicationAttempt(id, false, false); } - List ask = new ArrayList(); + List ask = new ArrayList<>(); ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, - priority, numContainers, true); + priority, numContainers, true, label); ask.add(request); RMApp rmApp = mock(RMApp.class); @@ -188,8 +204,8 @@ protected ApplicationAttemptId createSchedulingRequest( resourceManager.getRMContext().getRMApps() .put(id.getApplicationId(), rmApp); - scheduler.allocate(id, ask, new ArrayList(), - null, null, NULL_UPDATE_REQUESTS); + scheduler.allocate(id, ask, new ArrayList<>(), null, null, + NULL_UPDATE_REQUESTS); scheduler.update(); return id; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSNodeLabel.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSNodeLabel.java new file mode 100644 index 00000000000..4c67459c7a3 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSNodeLabel.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + + +@SuppressWarnings("unchecked") +public class TestFSNodeLabel extends FairSchedulerTestBase { + private final static String ALLOC_FILE = + new File(TEST_DIR, "test-queues").getAbsolutePath(); + + @Before + public void setUp() throws IOException { + scheduler = new FairScheduler(); + conf = createConfiguration(); + conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + resourceManager = new ResourceManager(); + resourceManager.init(conf); + + // TODO: This test should really be using MockRM. For now starting stuff + // that is needed at a bare minimum. + ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); + resourceManager.getRMContext().getStateStore().start(); + + // to initialize the master key + resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey(); + + scheduler.setRMContext(resourceManager.getRMContext()); + } + + @After + public void tearDown() { + if (scheduler != null) { + scheduler.stop(); + scheduler = null; + } + if (resourceManager != null) { + resourceManager.stop(); + resourceManager = null; + } + QueueMetrics.clearQueueMetrics(); + DefaultMetricsSystem.shutdown(); + } + + @Test + public void testAssignmentWithNodeLabel() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println("1024mb,0vcores"); + out.println(""); + out.println(""); + out.println("1024mb,0vcores"); + out.println("label2"); + out.println(""); + out.println(""); + out.println("label3,label4"); + out.println(""); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Create 3 nodes + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + RMNode node3 = + MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, + "127.0.0.3"); + NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); + scheduler.handle(nodeEvent3); + // add node label + Set clusterLabel = new HashSet<>(); + clusterLabel.add(NodeLabel.newInstance("label1")); + clusterLabel.add(NodeLabel.newInstance("label2")); + clusterLabel.add(NodeLabel.newInstance("label3")); + scheduler.getLabelsManager().addToCluserNodeLabels(clusterLabel); + Map> nodeLabelMap = new HashMap<>(); + // node1: label1 + // node2: label2, label3 + // node3: + Set nodeLabel1 = new HashSet<>(); + nodeLabel1.add(RMNodeLabelsManager.NO_LABEL); + Set nodeLabel2 = new HashSet<>(); + nodeLabel2.add("label2"); + Set nodeLabel3 = new HashSet<>(); + nodeLabel3.add("label3"); + nodeLabelMap.put(node1.getNodeID(), nodeLabel1); + nodeLabelMap.put(node2.getNodeID(), nodeLabel2); + nodeLabelMap.put(node3.getNodeID(), nodeLabel3); + scheduler.getLabelsManager().addLabelsToNode(nodeLabelMap); + + //case1 : app submitted into queue without node label could allocated on each node + ApplicationAttemptId appAttId1 = createSchedulingRequest(3074, 1, "root.queueA", + "user1", 3 , 3); + scheduler.update(); + NodeUpdateSchedulerEvent nodeUpdate11 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeUpdate12 = new NodeUpdateSchedulerEvent(node2); + NodeUpdateSchedulerEvent nodeUpdate13 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate11); + scheduler.handle(nodeUpdate12); + scheduler.handle(nodeUpdate13); + // app2 will allocated on node1 + for (RMContainer container: scheduler.getSchedulerApp(appAttId1).getLiveContainers()) { + assertTrue(container.getAllocatedNode().equals(node1.getNodeID())); + } + AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( + appAttId1, RMAppAttemptState.FINISHED, false); + scheduler.handle(appRemovedEvent1); + scheduler.update(); + + //case2 : app submitted into queueB could only be allocated on node1 + // now super app finished + ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 1, + "root.queueB", "user1", 3 , 3, "label2"); + scheduler.update(); + NodeUpdateSchedulerEvent nodeUpdate21 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeUpdate22 = new NodeUpdateSchedulerEvent(node2); + NodeUpdateSchedulerEvent nodeUpdate23 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate21); + scheduler.handle(nodeUpdate22); + scheduler.handle(nodeUpdate23); + // app2 will allocated on node2 + for (RMContainer container: scheduler.getSchedulerApp(appAttId2).getLiveContainers()) { + assertTrue(container.getAllocatedNode().equals(node2.getNodeID())); + } + AppAttemptRemovedSchedulerEvent appRemovedEvent2 = new AppAttemptRemovedSchedulerEvent( + appAttId2, RMAppAttemptState.FINISHED, false); + scheduler.handle(appRemovedEvent2); + scheduler.update(); + + //case3 : app submitted into queueC could be allocated on node3 + // now super app finished + ApplicationAttemptId appAttId3 = createSchedulingRequest(1024, 1, + "root.queueC", "user1", 3, 3, "label3"); + scheduler.update(); + NodeUpdateSchedulerEvent nodeUpdate31 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeUpdate32 = new NodeUpdateSchedulerEvent(node2); + NodeUpdateSchedulerEvent nodeUpdate33 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate31); + scheduler.handle(nodeUpdate32); + scheduler.handle(nodeUpdate33); + // app3 will allocated on node2 + for (RMContainer container: scheduler.getSchedulerApp(appAttId3).getLiveContainers()) { + assertTrue(container.getAllocatedNode().equals(node3.getNodeID())); + } + AppAttemptRemovedSchedulerEvent appRemovedEvent3 = new AppAttemptRemovedSchedulerEvent( + appAttId3, RMAppAttemptState.FINISHED, false); + scheduler.handle(appRemovedEvent3); + scheduler.update(); + } +}