diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java index 66e945fc2ff..709e4e57942 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java @@ -130,6 +130,11 @@ public Host copy() { } return c; } + + @Override + public String toString() { + return labels + " : " + nms; + } } protected static class Node { @@ -158,6 +163,11 @@ public Node copy() { c.running = running; return c; } + + @Override + public String toString() { + return nodeId + " (" + labels + ")" + resource; + } } private enum NodeLabelUpdateOperation { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/RMNodeLabel.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/RMNodeLabel.java index feeeaf1134e..35f17222a9c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/RMNodeLabel.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/RMNodeLabel.java @@ -135,4 +135,9 @@ public int hashCode() { return (int) ((((long) labelName.hashCode() << 8) + (resource.hashCode() << 4) + numActiveNMs) % prime); } + + @Override + public String toString() { + return labelName + " " + resource; + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index c4c98e2302c..c3adb20c66a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -376,7 +376,7 @@ private void containerBasedPreemptOrKill(CSQueue root, for (String partitionToLookAt : allPartitions) { cloneQueues(root, Resources - .clone(nlm.getResourceByLabel(partitionToLookAt, clusterResources)), + .clone(nlm.getResourceByLabel(partitionToLookAt)), partitionToLookAt); } @@ -656,8 +656,7 @@ public float getMinimumThresholdForIntraQueuePreemption() { @Override public Resource getPartitionResource(String partition) { - return Resources.clone(nlm.getResourceByLabel(partition, - Resources.clone(scheduler.getClusterResource()))); + return Resources.clone(nlm.getResourceByLabel(partition)); } public LinkedHashSet getUnderServedQueuesPerPartition( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java index 507f696d057..f47068d65d6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java @@ -52,13 +52,17 @@ protected Queue() { accessibleNodeLabels = - Collections.newSetFromMap(new ConcurrentHashMap()); - resource = Resource.newInstance(0, 0); + Collections.newSetFromMap(new ConcurrentHashMap<>()); + resource = Resources.clone(Resources.none()); + } + + @Override + public String toString() { + return accessibleNodeLabels + " : " + resource; } } - ConcurrentMap queueCollections = - new ConcurrentHashMap(); + ConcurrentMap queueCollections = new ConcurrentHashMap<>(); private YarnAuthorizationProvider authorizer; private RMContext rmContext = null; @@ -320,43 +324,72 @@ public void reinitializeQueueLabels(Map> queueToLabels) { this.queueCollections.clear(); for (Entry> entry : queueToLabels.entrySet()) { - String queue = entry.getKey(); - Queue q = new Queue(); - this.queueCollections.put(queue, q); + addQueue(entry.getKey(), entry.getValue(), false); + } + } finally { + writeLock.unlock(); + } + } - Set labels = entry.getValue(); - if (labels.contains(ANY)) { - continue; - } + public void addQueue(String queue, Set labels, boolean setResources) { + Queue q = new Queue(); + writeLock.lock(); + try { + queueCollections.put(queue, q); + + // ANY is the same as none + if ((labels != null) && !labels.contains(ANY)) { q.accessibleNodeLabels.addAll(labels); + for (Host host : nodeCollections.values()) { for (Entry nentry : host.nms.entrySet()) { NodeId nodeId = nentry.getKey(); Node nm = nentry.getValue(); + if (nm.running && isNodeUsableByQueue(getLabelsByNode(nodeId), q)) { Resources.addTo(q.resource, nm.resource); } } } + + if (setResources) { + for (String label : labels) { + Resources.addTo(q.resource, + labelCollections.get(label).getResource()); + } + } + } else if (setResources) { + Resources.addTo(q.resource, + labelCollections.get(NO_LABEL).getResource()); } } finally { writeLock.unlock(); } } - - public Resource getQueueResource(String queueName, Set queueLabels, - Resource clusterResource) { + + /** + * Return the total resources available to this queue according to its node + * labels. If the queue cannot be found, no resources will be returned. If + * the queue's labels contains the {@link ANY} label, then null will be + * returned. + * + * @param queueName the name of the queue whose accessible resources will be + * returned + * @return the accessible resources for the queue + */ + public Resource getQueueResource(String queueName) { try { readLock.lock(); - if (queueLabels.contains(ANY)) { - return clusterResource; - } Queue q = queueCollections.get(queueName); - if (null == q) { + + if (q == null) { return Resources.none(); + } else if (q.accessibleNodeLabels.contains(ANY)) { + return null; + } else { + return q.resource; } - return q.resource; } finally { readLock.unlock(); } @@ -483,7 +516,7 @@ private void updateResourceMappings(Map before, newNodeToLabelsMap.put(nodeId, ImmutableSet.copyOf(newLabels)); - // no label in the past + // no label now if (newLabels.isEmpty()) { // update labels RMNodeLabel label = labelCollections.get(NO_LABEL); @@ -517,7 +550,7 @@ private void updateResourceMappings(Map before, } } - public Resource getResourceByLabel(String label, Resource clusterResource) { + public Resource getResourceByLabel(String label) { label = normalizeLabel(label); if (label.equals(NO_LABEL)) { return noNodeLabel.getResource(); @@ -587,4 +620,29 @@ public void setRMContext(RMContext rmContext) { readLock.unlock(); } } + + @Override + public String toString() { + StringBuilder out = new StringBuilder(); + + out.append("LABELS:\n"); + for (Entry e : labelCollections.entrySet()) { + out.append("\t").append(e.getKey()).append(": "); + out.append(e.getValue().toString()).append("\n"); + } + + out.append("QUEUES:\n"); + for (Entry e : queueCollections.entrySet()) { + out.append("\t").append(e.getKey()).append(": "); + out.append(e.getValue().toString()).append("\n"); + } + + out.append("NODES:\n"); + for (Entry e : nodeCollections.entrySet()) { + out.append("\t").append(e.getKey()).append(": "); + out.append(e.getValue().toString()).append("\n"); + } + + return out.toString(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 98192caa28e..8806f3bfae1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1938,10 +1938,15 @@ public String getLogAggregationFailureMessagesForNM(NodeId nodeId) { public String getAppNodeLabelExpression() { String appNodeLabelExpression = getApplicationSubmissionContext().getNodeLabelExpression(); - appNodeLabelExpression = (appNodeLabelExpression == null) - ? NodeLabel.NODE_LABEL_EXPRESSION_NOT_SET : appNodeLabelExpression; - appNodeLabelExpression = (appNodeLabelExpression.trim().isEmpty()) - ? NodeLabel.DEFAULT_NODE_LABEL_PARTITION : appNodeLabelExpression; + + if (appNodeLabelExpression == null) { + appNodeLabelExpression = NodeLabel.NODE_LABEL_EXPRESSION_NOT_SET; + } + + if (appNodeLabelExpression.trim().isEmpty()) { + appNodeLabelExpression = NodeLabel.DEFAULT_NODE_LABEL_PARTITION; + } + return appNodeLabelExpression; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java index d166e5fc568..9121acbc877 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java @@ -75,11 +75,14 @@ public void recoverContainer(Resource clusterResource, SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer); /** - * Get labels can be accessed of this queue - * labels={*}, means this queue can access any label - * labels={ }, means this queue cannot access any label except node without label - * labels={a, b, c} means this queue can access a or b or c - * @return labels + * Get labels can be accessed from this queue. + *
    + *
  • labels={*}, means this queue can access any label
  • + *
  • labels={ }, means this queue cannot access any label except node + * without label
  • + *
  • labels={a, b, c} means this queue can access a or b or c
  • + *
+ * @return the accessible labels */ public Set getAccessibleNodeLabels(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index db63cd868d0..1d90dbedf60 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -1199,8 +1199,7 @@ public void recordContainerAllocationTime(long value) { } @Private - public boolean hasPendingResourceRequest(ResourceCalculator rc, - String nodePartition, Resource cluster, + public boolean hasPendingResourceRequest(String nodePartition, SchedulingMode schedulingMode) { // We need to consider unconfirmed allocations if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { @@ -1213,16 +1212,12 @@ public boolean hasPendingResourceRequest(ResourceCalculator rc, // To avoid too many allocation-proposals rejected for non-default // partition allocation if (StringUtils.equals(nodePartition, RMNodeLabelsManager.NO_LABEL)) { - pending = Resources.subtract(pending, Resources + Resources.subtractFromNonNegative(pending, Resources .createResource(unconfirmedAllocatedMem.get(), unconfirmedAllocatedVcores.get())); } - if (Resources.greaterThan(rc, cluster, pending, Resources.none())) { - return true; - } - - return false; + return !Resources.isNone(pending); } /* diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 250f4e6b9a7..e21365fec60 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -604,22 +604,22 @@ private Resource getCurrentLimitResource(String nodePartition, * limit-set-by-parent) */ Resource queueMaxResource = - getQueueMaxResource(nodePartition, clusterResource); + getQueueMaxResource(nodePartition); return Resources.min(resourceCalculator, clusterResource, queueMaxResource, currentResourceLimits.getLimit()); } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { // When we doing non-exclusive resource allocation, maximum capacity of // all queues on this label equals to total resource with the label. - return labelManager.getResourceByLabel(nodePartition, clusterResource); + return labelManager.getResourceByLabel(nodePartition); } return Resources.none(); } - Resource getQueueMaxResource(String nodePartition, Resource clusterResource) { + Resource getQueueMaxResource(String nodePartition) { return Resources.multiplyAndNormalizeDown(resourceCalculator, - labelManager.getResourceByLabel(nodePartition, clusterResource), + labelManager.getResourceByLabel(nodePartition), queueCapacities.getAbsoluteMaximumCapacity(nodePartition), minimumAllocation); } @@ -713,7 +713,7 @@ boolean canAssignToThisQueue(Resource clusterResource, + ", currentUsedCapacity: " + Resources .divide(resourceCalculator, clusterResource, queueUsage.getUsed(nodePartition), labelManager - .getResourceByLabel(nodePartition, clusterResource)) + .getResourceByLabel(nodePartition)) + ", max-capacity: " + queueCapacities .getAbsoluteMaximumCapacity(nodePartition)); } @@ -781,7 +781,7 @@ public void incUsedResource(String nodeLabel, Resource resourceToInc, // ResourceUsage has its own lock, no addition lock needs here. queueUsage.incUsed(nodeLabel, resourceToInc); CSQueueUtils.updateUsedCapacity(resourceCalculator, - labelManager.getResourceByLabel(nodeLabel, Resources.none()), + labelManager.getResourceByLabel(nodeLabel), nodeLabel, this); if (null != parent) { parent.incUsedResource(nodeLabel, resourceToInc, null); @@ -797,7 +797,7 @@ public void decUsedResource(String nodeLabel, Resource resourceToDec, // ResourceUsage has its own lock, no addition lock needs here. queueUsage.decUsed(nodeLabel, resourceToDec); CSQueueUtils.updateUsedCapacity(resourceCalculator, - labelManager.getResourceByLabel(nodeLabel, Resources.none()), + labelManager.getResourceByLabel(nodeLabel), nodeLabel, this); if (null != parent) { parent.decUsedResource(nodeLabel, resourceToDec, null); @@ -904,10 +904,10 @@ public boolean accept(Resource cluster, Resource maxResourceLimit; if (allocation.getSchedulingMode() == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { - maxResourceLimit = getQueueMaxResource(partition, cluster); + maxResourceLimit = getQueueMaxResource(partition); } else{ maxResourceLimit = labelManager.getResourceByLabel( - schedulerContainer.getNodePartition(), cluster); + schedulerContainer.getNodePartition()); } if (!Resources.fitsIn(resourceCalculator, Resources.add(queueUsage.getUsed(partition), netAllocated), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index e1014c11fc8..40e65e84dbd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -249,7 +249,7 @@ private static Resource getMaxAvailableResourceToQueue( // Calculate guaranteed resource for a label in a queue by below logic. // (total label resource) * (absolute capacity of label in that queue) Resource queueGuranteedResource = Resources.multiply(nlm - .getResourceByLabel(partition, cluster), queue.getQueueCapacities() + .getResourceByLabel(partition), queue.getQueueCapacities() .getAbsoluteCapacity(partition)); // Available resource in queue for a specific label will be calculated as @@ -292,11 +292,11 @@ public static void updateQueueStatistics( for (String partition : Sets.union( queueCapacities.getNodePartitionsSet(), queueResourceUsage.getNodePartitionsSet())) { - updateUsedCapacity(rc, nlm.getResourceByLabel(partition, cluster), + updateUsedCapacity(rc, nlm.getResourceByLabel(partition), partition, childQueue); } } else { - updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster), + updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition), nodePartition, childQueue); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 7f50272240b..273c31cebdd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1238,8 +1238,7 @@ private CSAssignment allocateContainerOnSingleNode(PlacementSet ps, boolean withNodeHeartbeat) { CSAssignment assignment = getRootQueue().assignContainers( getClusterResource(), ps, new ResourceLimits(labelManager - .getResourceByLabel(ps.getPartition(), getClusterResource())), + .getResourceByLabel(ps.getPartition())), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assignment.setSchedulingMode(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); @@ -1345,8 +1344,7 @@ private CSAssignment allocateOrReserveNewContainers( // TODO, now we only consider limits for parent for non-labeled // resources, should consider labeled resources as well. new ResourceLimits(labelManager - .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, - getClusterResource())), + .getResourceByLabel(RMNodeLabelsManager.NO_LABEL)), SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY); assignment.setSchedulingMode(SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY); submitResourceCommitRequest(getClusterResource(), assignment); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index f24e30aa1ee..ac2179cb20b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -673,8 +673,7 @@ public Resource getUserAMResourceLimitPerPartition( Resource queuePartitionResource = Resources .multiplyAndNormalizeUp(resourceCalculator, - labelManager.getResourceByLabel(nodePartition, - lastClusterResource), + labelManager.getResourceByLabel(nodePartition), queueCapacities.getAbsoluteCapacity(nodePartition), minimumAllocation); @@ -707,7 +706,7 @@ public Resource calculateAndGetAMResourceLimitPerPartition( */ Resource queuePartitionResource = Resources.multiplyAndNormalizeUp( resourceCalculator, - labelManager.getResourceByLabel(nodePartition, lastClusterResource), + labelManager.getResourceByLabel(nodePartition), queueCapacities.getAbsoluteCapacity(nodePartition), minimumAllocation); @@ -1307,14 +1306,14 @@ protected Resource getHeadroom(User user, Resource queueCurrentLimit, protected Resource getHeadroom(User user, Resource queueCurrentLimit, Resource clusterResource, FiCaSchedulerApp application, String partition) { - return getHeadroom(user, queueCurrentLimit, clusterResource, + return getHeadroom(user, queueCurrentLimit, getResourceLimitForActiveUsers(application.getUser(), clusterResource, partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), partition); } private Resource getHeadroom(User user, - Resource currentPartitionResourceLimit, Resource clusterResource, + Resource currentPartitionResourceLimit, Resource userLimitResource, String partition) { /** * Headroom is: @@ -1341,7 +1340,7 @@ private Resource getHeadroom(User user, currentPartitionResourceLimit = partition.equals(RMNodeLabelsManager.NO_LABEL) ? currentPartitionResourceLimit - : getQueueMaxResource(partition, clusterResource); + : getQueueMaxResource(partition); Resource headroom = Resources.componentwiseMin( Resources.subtract(userLimitResource, user.getUsed(partition)), @@ -1353,7 +1352,7 @@ private Resource getHeadroom(User user, //headroom = min (unused resourcelimit of a label, calculated headroom ) Resource clusterPartitionResource = - labelManager.getResourceByLabel(partition, clusterResource); + labelManager.getResourceByLabel(partition); Resource clusterFreePartitionResource = Resources.subtract(clusterPartitionResource, csContext.getClusterResourceUsage().getUsed(partition)); @@ -1390,7 +1389,7 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, Resource headroom = metrics.getUserMetrics(user) == null ? Resources.none() : getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(), - clusterResource, userLimit, nodePartition); + userLimit, nodePartition); if (LOG.isDebugEnabled()) { LOG.debug("Headroom calculation for user " + user + ": " + " userLimit=" @@ -1538,13 +1537,10 @@ private void updateSchedulerHealthForCompletedContainer( /** * Recalculate QueueUsage Ratio. * - * @param clusterResource - * Total Cluster Resource * @param nodePartition * Partition */ - public void recalculateQueueUsageRatio(Resource clusterResource, - String nodePartition) { + public void recalculateQueueUsageRatio(String nodePartition) { try { writeLock.lock(); ResourceUsage queueResourceUsage = getQueueResourceUsage(); @@ -1553,10 +1549,10 @@ public void recalculateQueueUsageRatio(Resource clusterResource, for (String partition : Sets.union( getQueueCapacities().getNodePartitionsSet(), queueResourceUsage.getNodePartitionsSet())) { - usersManager.updateUsageRatio(partition, clusterResource); + usersManager.updateUsageRatio(partition); } } else { - usersManager.updateUsageRatio(nodePartition, clusterResource); + usersManager.updateUsageRatio(nodePartition); } } finally { writeLock.unlock(); @@ -1715,7 +1711,7 @@ private void updateCurrentResourceLimits( new ResourceLimits(currentResourceLimits.getLimit()); Resource queueMaxResource = Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager - .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource), + .getResourceByLabel(RMNodeLabelsManager.NO_LABEL), queueCapacities .getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL), minimumAllocation); @@ -1738,7 +1734,7 @@ public void updateClusterResource(Resource clusterResource, setQueueResourceLimitsInfo(clusterResource); // Update user consumedRatios - recalculateQueueUsageRatio(clusterResource, null); + recalculateQueueUsageRatio(null); // Update metrics CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 6800b74f8d4..f09a7561aed 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -688,7 +688,7 @@ private ResourceLimits getResourceLimitsOfChild(CSQueue child, // Get child's max resource Resource childConfiguredMaxResource = Resources.multiplyAndNormalizeDown( resourceCalculator, - labelManager.getResourceByLabel(nodePartition, clusterResource), + labelManager.getResourceByLabel(nodePartition), child.getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition), minimumAllocation); @@ -996,8 +996,7 @@ private void killContainersToEnforceMaxQueueCapacity(String partition, return; } - Resource partitionResource = labelManager.getResourceByLabel(partition, - null); + Resource partitionResource = labelManager.getResourceByLabel(partition); Resource maxResource = Resources.multiply(partitionResource, getQueueCapacities().getAbsoluteMaximumCapacity(partition)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java index 33f30b00412..00676d75e21 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java @@ -675,8 +675,7 @@ private long getLocalVersionOfUsersState(String nodePartition, private Resource computeUserLimit(String userName, Resource clusterResource, String nodePartition, SchedulingMode schedulingMode, boolean activeUser) { - Resource partitionResource = labelManager.getResourceByLabel(nodePartition, - clusterResource); + Resource partitionResource = labelManager.getResourceByLabel(nodePartition); /* * What is our current capacity? @@ -807,14 +806,11 @@ private Resource computeUserLimit(String userName, Resource clusterResource, * * @param partition * Node partition - * @param clusterResource - * Cluster Resource */ - public void updateUsageRatio(String partition, Resource clusterResource) { + public void updateUsageRatio(String partition) { try { writeLock.lock(); - Resource resourceByLabel = labelManager.getResourceByLabel(partition, - clusterResource); + Resource resourceByLabel = labelManager.getResourceByLabel(partition); float consumed = 0; User user; for (Map.Entry entry : getUsers().entrySet()) { @@ -1046,8 +1042,7 @@ public User updateUserResourceUsage(String userName, Resource resource, userLimitNeedsRecompute(); // Update usage ratios - Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition, - scheduler.getClusterResource()); + Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition); incQueueUsageRatio(nodePartition, user.updateUsageRatio( resourceCalculator, resourceByLabel, nodePartition)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 72dfbdd6dfb..2471ccbff2f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -834,8 +834,8 @@ public CSAssignment assignContainers(Resource clusterResource, if (reservedContainer == null) { // Check if application needs more resource, skip if it doesn't need more. - if (!application.hasPendingResourceRequest(rc, - ps.getPartition(), clusterResource, schedulingMode)) { + if (!application.hasPendingResourceRequest(ps.getPartition(), + schedulingMode)) { if (LOG.isDebugEnabled()) { LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId() + ", because it doesn't need more resource, schedulingMode=" diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index a12c5ec7f68..f790295b19b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -908,7 +908,7 @@ protected void getActivedAppDiagnosticMessage( diagnosticMessage.append(" ; "); diagnosticMessage.append("Partition Resource = "); diagnosticMessage.append(rmContext.getNodeLabelManager() - .getResourceByLabel(appAMNodePartitionName, Resources.none())); + .getResourceByLabel(appAMNodePartitionName)); diagnosticMessage.append(" ; "); diagnosticMessage.append("Queue's Absolute capacity = "); diagnosticMessage.append( @@ -979,10 +979,9 @@ public ApplicationResourceUsageReport getResourceUsageReport() { // TODO: improve this writeLock.lock(); ApplicationResourceUsageReport report = super.getResourceUsageReport(); - Resource cluster = rmContext.getScheduler().getClusterResource(); Resource totalPartitionRes = rmContext.getNodeLabelManager().getResourceByLabel( - getAppAMNodePartitionName(), cluster); + getAppAMNodePartitionName()); ResourceCalculator calc = rmContext.getScheduler().getResourceCalculator(); if (!calc.isInvalidDivisor(totalPartitionRes)) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index 7bd69594856..89d352a9f6e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -95,6 +95,8 @@ private final SchedulingPolicy defaultSchedulingPolicy; + private final Map> accessibleNodeLabels; + // Policy for mapping apps to queues @VisibleForTesting QueuePlacementPolicy placementPolicy; @@ -129,7 +131,8 @@ public AllocationConfiguration(Map minQueueResources, Map> configuredQueues, ReservationQueueConfiguration globalReservationQueueConfig, Set reservableQueues, - Set nonPreemptableQueues) { + Set nonPreemptableQueues, + Map> accessibleNodeLabels) { this.minQueueResources = minQueueResources; this.maxQueueResources = maxQueueResources; this.maxChildQueueResources = maxChildQueueResources; @@ -153,6 +156,7 @@ public AllocationConfiguration(Map minQueueResources, this.placementPolicy = placementPolicy; this.configuredQueues = configuredQueues; this.nonPreemptableQueues = nonPreemptableQueues; + this.accessibleNodeLabels = accessibleNodeLabels; } public AllocationConfiguration(Configuration conf) { @@ -182,6 +186,7 @@ public AllocationConfiguration(Configuration conf) { placementPolicy = QueuePlacementPolicy.fromConfiguration(conf, configuredQueues); nonPreemptableQueues = new HashSet<>(); + accessibleNodeLabels = new HashMap<>(); } /** @@ -330,6 +335,14 @@ SchedulingPolicy getSchedulingPolicy(String queueName) { return (policy == null) ? defaultSchedulingPolicy : policy; } + public Set getAccessibleNodeLabels(String queueName) { + return accessibleNodeLabels.get(queueName); + } + + public Map> getAccessibleNodeLabels() { + return Collections.unmodifiableMap(accessibleNodeLabels); + } + public SchedulingPolicy getDefaultSchedulingPolicy() { return defaultSchedulingPolicy; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index 4d918c1d6f4..830fff4463f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -58,6 +58,7 @@ import org.xml.sax.SAXException; import com.google.common.annotations.VisibleForTesting; +import java.util.Collections; @Public @Unstable @@ -242,6 +243,7 @@ public synchronized void reloadAllocations() throws IOException, new HashMap<>(); Set reservableQueues = new HashSet<>(); Set nonPreemptableQueues = new HashSet<>(); + Map> accessibleNodeLabels = new HashMap<>(); int userMaxAppsDefault = Integer.MAX_VALUE; int queueMaxAppsDefault = Integer.MAX_VALUE; Resource queueMaxResourcesDefault = Resources.unbounded(); @@ -385,7 +387,7 @@ public synchronized void reloadAllocations() throws IOException, queueWeights, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, reservationAcls, configuredQueues, reservableQueues, - nonPreemptableQueues); + nonPreemptableQueues, accessibleNodeLabels); } // Load placement policy and pass it configured queues @@ -435,7 +437,8 @@ public synchronized void reloadAllocations() throws IOException, defaultSchedPolicy, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, reservationAcls, newPlacementPolicy, configuredQueues, - globalReservationQueueConfig, reservableQueues, nonPreemptableQueues); + globalReservationQueueConfig, reservableQueues, nonPreemptableQueues, + accessibleNodeLabels); lastSuccessfulReload = clock.getTime(); lastReloadAttemptFailed = false; @@ -462,7 +465,8 @@ private void loadQueue(String parentName, Element element, Map> resAcls, Map> configuredQueues, Set reservableQueues, - Set nonPreemptableQueues) + Set nonPreemptableQueues, + Map> accessibleNodeLabels) throws AllocationConfigurationException { String queueName = FairSchedulerUtilities.trimQueueName( element.getAttribute("name")); @@ -567,6 +571,21 @@ private void loadQueue(String parentName, Element element, if (!Boolean.parseBoolean(text)) { nonPreemptableQueues.add(queueName); } + } else if ("nodeLabels".equals(field.getTagName())) { + String[] nodeLabel = + ((Text)field.getFirstChild()).getData().trim().split(","); + + if ((nodeLabel != null) && (nodeLabel.length > 0)) { + Set nodeLabelSet = new HashSet<>(); + + for (String label : nodeLabel) { + if ((label != null) && (label.length() > 0)) { + nodeLabelSet.add(label); + } + } + + accessibleNodeLabels.put(queueName, nodeLabelSet); + } } else if ("queue".endsWith(field.getTagName()) || "pool".equals(field.getTagName())) { loadQueue(queueName, field, minQueueResources, maxQueueResources, @@ -574,7 +593,7 @@ private void loadQueue(String parentName, Element element, queueWeights, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, resAcls, configuredQueues, reservableQueues, - nonPreemptableQueues); + nonPreemptableQueues, accessibleNodeLabels); isLeaf = false; } } @@ -593,6 +612,10 @@ private void loadQueue(String parentName, Element element, configuredQueues.get(FSQueueType.PARENT).add(queueName); } + if (!accessibleNodeLabels.containsKey(queueName)) { + accessibleNodeLabels.put(queueName, Collections.EMPTY_SET); + } + // Set default acls if not defined // The root queue defaults to all access for (QueueACL acl : QueueACL.values()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 30245586a0d..5c873b85b48 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -54,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -219,6 +221,8 @@ private void subtractResourcesOnBlacklistedNodes( /** * Headroom depends on resources in the cluster, current usage of the * queue, queue's fair-share and queue's max-resources. + * + * @return the headroom as a {@link Resource} instance */ @Override public Resource getHeadroom() { @@ -227,9 +231,12 @@ public Resource getHeadroom() { Resource queueFairShare = fsQueue.getFairShare(); Resource queueUsage = fsQueue.getResourceUsage(); - Resource clusterResource = this.scheduler.getClusterResource(); - Resource clusterUsage = this.scheduler.getRootQueueMetrics() - .getAllocatedResources(); + // We don't have a way to track the resources used per partition, so just + // do the math with the cluster resources. The fair share is already capped + // at the partition max, so we should be fine. + Resource clusterResource = scheduler.getClusterResource(); + Resource clusterUsage = + scheduler.getRootQueueMetrics().getAllocatedResources(); Resource clusterAvailableResources = Resources.subtract(clusterResource, clusterUsage); @@ -1316,18 +1323,48 @@ public void updateDemand() { @Override public Resource assignContainer(FSSchedulerNode node) { + Resource res; + if (isOverAMShareLimit()) { - PendingAsk amAsk = appSchedulingInfo.getNextPendingAsk(); - updateAMDiagnosticMsg(amAsk.getPerAllocationResource(), - " exceeds maximum AM resource allowed)."); - if (LOG.isDebugEnabled()) { - LOG.debug("AM resource request: " + amAsk.getPerAllocationResource() - + " exceeds maximum AM resource allowed, " - + getQueue().dumpState()); + logOverAMShareLimit(); + res = Resources.none(); + } else if (!nodeLabelsMatchContainerLabel(node.getNodeID())) { + res = Resources.none(); + } else { + res = assignContainer(node, false); + } + + return res; + } + + private void logOverAMShareLimit() { + PendingAsk amAsk = appSchedulingInfo.getNextPendingAsk(); + updateAMDiagnosticMsg(amAsk.getPerAllocationResource(), + " exceeds maximum AM resource allowed)."); + if (LOG.isDebugEnabled()) { + LOG.debug("AM resource request: " + amAsk.getPerAllocationResource() + + " exceeds maximum AM resource allowed, " + + getQueue().dumpState()); + } + } + + private boolean nodeLabelsMatchContainerLabel(NodeId nodeId) { + Set labelsOnNode = + scheduler.getLabelsManager().getLabelsOnNode(nodeId); + + if ((labelsOnNode != null) && !labelsOnNode.isEmpty()) { + for (String label : labelsOnNode) { + if (hasPendingResourceRequest(label, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)) { + return true; + } } - return Resources.none(); + + return false; + } else { + return hasPendingResourceRequest(RMNodeLabelsManager.NO_LABEL, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); } - return assignContainer(node, false); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 1dcfffcd1d8..b28ee2038a3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -28,6 +28,7 @@ import java.util.TreeSet; import com.google.common.annotations.VisibleForTesting; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -37,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; @@ -614,6 +616,28 @@ boolean isStarved() { return isStarvedForMinShare() || isStarvedForFairShare(); } + /** + * Test whether this queue should accept this app on the basis of node labels. + * A queue with no labels accepts all requests. An app with no labels will + * always be accepted by any queue. + * + * @param app the app to test + * @return true if the queue should accept this app based on node labels + */ + boolean acceptAppNodeLabels(RMApp app) { + if ((accessibleLabels != null) && !accessibleLabels.isEmpty()) { + String appLabel = app.getAppNodeLabelExpression(); + String amLabel = app.getAmNodeLabelExpression(); + + return ((appLabel == null) || appLabel.isEmpty() || + accessibleLabels.contains(appLabel)) && + ((amLabel == null) || amLabel.isEmpty() || + accessibleLabels.contains(amLabel)); + } + + return true; + } + @Override protected void dumpStateInternal(StringBuilder sb) { sb.append("{Name: " + getName() + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 5b4e4dc9e5e..c2caa28a150 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -219,7 +219,7 @@ public Resource assignContainer(FSSchedulerNode node) { try { for (FSQueue child : childQueues) { assigned = child.assignContainer(node); - if (!Resources.equals(assigned, Resources.none())) { + if (!Resources.isNone(assigned)) { break; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 8ae3cb69496..230abde4faf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -47,6 +47,9 @@ import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; +import java.util.Collections; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @Private @Unstable @@ -74,6 +77,8 @@ protected Resource maxShare; protected int maxRunningApps; protected Resource maxChildQueueResource; + protected Set accessibleLabels = null; + private boolean hasLabels = false; // maxAMShare is a value between 0 and 1. protected float maxAMShare; @@ -105,10 +110,11 @@ public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { * * @param recursive whether child queues should be reinitialized recursively */ - public void reinit(boolean recursive) { + public final void reinit(boolean recursive) { AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); allocConf.initFSQueue(this); updatePreemptionVariables(); + updateNodeLabels(); if (recursive) { for (FSQueue child : getChildQueues()) { @@ -117,6 +123,7 @@ public void reinit(boolean recursive) { } } + @Override public String getName() { return name; } @@ -134,7 +141,7 @@ public FSParentQueue getParent() { return parent; } - public void setPolicy(SchedulingPolicy policy) { + public final void setPolicy(SchedulingPolicy policy) { policy.initialize(scheduler.getContext()); this.policy = policy; } @@ -270,30 +277,56 @@ public FSQueueMetrics getMetrics() { return metrics; } - /** Get the fair share assigned to this Schedulable. */ + /** + * Get the fair share assigned to this Schedulable. + * + * @return the fair share + */ + @Override public Resource getFairShare() { return fairShare; } @Override public void setFairShare(Resource fairShare) { - this.fairShare = fairShare; - metrics.setFairShare(fairShare); + Resource partitionMax = scheduler.getLabelsManager().getQueueResource(name); + + if (partitionMax != null) { + this.fairShare = Resources.componentwiseMin(fairShare, partitionMax); + } else { + this.fairShare = fairShare; + } + + metrics.setFairShare(this.fairShare); + if (LOG.isDebugEnabled()) { - LOG.debug("The updated fairShare for " + getName() + " is " + fairShare); + LOG.debug("The updated fairShare for " + name + " is " + this.fairShare); } } - /** Get the steady fair share assigned to this Schedulable. */ + /** + * Get the steady fair share assigned to this Schedulable. + * + * @return the steady fair share + */ public Resource getSteadyFairShare() { return steadyFairShare; } void setSteadyFairShare(Resource steadyFairShare) { - this.steadyFairShare = steadyFairShare; - metrics.setSteadyFairShare(steadyFairShare); + Resource partitionMax = scheduler.getLabelsManager().getQueueResource(name); + + if (partitionMax != null) { + this.steadyFairShare = + Resources.componentwiseMin(steadyFairShare, partitionMax); + } else { + this.steadyFairShare = steadyFairShare; + } + + metrics.setSteadyFairShare(this.steadyFairShare); } + @Override public boolean hasAccess(QueueACL acl, UserGroupInformation user) { return authorizer.checkPermission( new AccessRequest(queueEntity, user, @@ -385,6 +418,26 @@ private void updatePreemptionVariables() { } } + /** ++ * Update node label for this queue ++ */ + public void updateNodeLabels() { + accessibleLabels = + scheduler.getAllocationConfiguration().getAccessibleNodeLabels(name); + + if ((accessibleLabels == null) && (parent != null)) { + accessibleLabels = parent.getAccessibleNodeLabels(); + } + + if (accessibleLabels == null) { + accessibleLabels = Collections.EMPTY_SET; + } + + // No label and the "any" label are equivalent + hasLabels = !accessibleLabels.isEmpty() && + !accessibleLabels.contains(RMNodeLabelsManager.NO_LABEL); + } + /** * Gets the children of this queue, if any. */ @@ -414,6 +467,17 @@ boolean assignContainerPreCheck(FSSchedulerNode node) { LOG.debug("Assigning container failed on node '" + node.getNodeName() + " because it has reserved containers."); } + + return false; + } else if (!nodeLabelCheck(node.getNodeID())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Assigning container failed on node '" + node.getNodeName() + + " because the node labels don't agree: " + accessibleLabels + + " on the queue v/s " + + scheduler.getLabelsManager().getLabelsOnNode(node.getNodeID()) + + " on the node"); + } + return false; } else if (!Resources.fitsIn(getResourceUsage(), maxShare)) { if (LOG.isDebugEnabled()) { @@ -421,10 +485,39 @@ boolean assignContainerPreCheck(FSSchedulerNode node) { + " because queue resource usage is larger than MaxShare: " + dumpState()); } + return false; - } else { - return true; } + + return true; + } + + /** + * Check if the queue's labels allow it to assign containers on this node. + * + * @param nodeId the ID of the node to check + * @return true if the queue is allowed to assign containers on this node + */ + protected boolean nodeLabelCheck(NodeId nodeId) { + // A queue with no label will accept any node + if (hasLabels) { + Set labelsOnNode = + scheduler.getLabelsManager().getLabelsOnNode(nodeId); + + if ((labelsOnNode == null) || labelsOnNode.isEmpty()) { + return true; + } else { + for (String queueLabel : accessibleLabels) { + if (labelsOnNode.contains(queueLabel)) { + return true; + } + } + + return false; + } + } + + return true; } /** @@ -443,10 +536,9 @@ public String toString() { @Override public Set getAccessibleNodeLabels() { - // TODO, add implementation for FS - return null; + return accessibleLabels; } - + @Override public String getDefaultNodeLabelExpression() { // TODO, add implementation for FS diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 9cf2b2e764d..9a5c60ae9d8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -96,6 +96,7 @@ import java.util.Collection; import java.util.Comparator; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -104,6 +105,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; /** * A scheduler that schedules resources between a set of queues. The scheduler @@ -187,6 +189,8 @@ @VisibleForTesting final MaxRunningAppsEnforcer maxRunningEnforcer; + private RMNodeLabelsManager labelsManager; + private AllocationFileLoaderService allocsLoader; @VisibleForTesting AllocationConfiguration allocConf; @@ -419,6 +423,16 @@ public FairSchedulerEventLog getEventLog() { } /** + * Convenience method for use by other fair scheduler components to get the + * node labels manager. + * + * @return the node labels manager + */ + RMNodeLabelsManager getLabelsManager() { + return labelsManager; + } + + /** * Add a new application to the scheduler, with a given id, queue name, and * user. This will accept a new app even if the user or queue is above * configured limits, but the app will not be marked as runnable. @@ -559,8 +573,11 @@ FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) { appRejectMsg = "Application rejected by queue placement policy"; } else { queue = queueMgr.getLeafQueue(queueName, true); + if (queue == null) { appRejectMsg = queueName + " is not a leaf queue"; + } else if ((rmApp != null) && !queue.acceptAppNodeLabels(rmApp)) { + appRejectMsg = "Application rejected based on node labels mismatch"; } } } catch (IllegalStateException se) { @@ -718,6 +735,11 @@ private void addNode(List containerReports, usePortForNodeName); nodeTracker.addNode(schedulerNode); + if (labelsManager != null) { + labelsManager.activateNode(node.getNodeID(), + schedulerNode.getTotalResource()); + } + triggerUpdate(); Resource clusterResource = getClusterResource(); @@ -760,6 +782,10 @@ private void removeNode(RMNode rmNode) { SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL); } + if (labelsManager != null) { + labelsManager.deactivateNode(rmNode.getNodeID()); + } + nodeTracker.removeNode(nodeId); Resource clusterResource = getClusterResource(); queueMgr.getRootQueue().setSteadyFairShare(clusterResource); @@ -879,6 +905,11 @@ protected void nodeUpdate(RMNode nm) { super.nodeUpdate(nm); FSSchedulerNode fsNode = getFSSchedulerNode(nm.getNodeID()); + + if (labelsManager != null) { + labelsManager.activateNode(nm.getNodeID(), fsNode.getTotalResource()); + } + attemptScheduling(fsNode); long duration = getClock().getTime() - start; @@ -1243,6 +1274,12 @@ public void recover(RMState state) throws Exception { // NOT IMPLEMENTED } + /** + * Set the {@link RMContext}. + * + * @param rmContext the {@link RMContext} + */ + @Override public void setRMContext(RMContext rmContext) { this.rmContext = rmContext; } @@ -1288,6 +1325,8 @@ private void initScheduler(Configuration conf) throws IOException { eventLog.init(this.conf); allocConf = new AllocationConfiguration(conf); + labelsManager = rmContext.getNodeLabelManager(); + try { queueMgr.initialize(conf); } catch (Exception e) { @@ -1320,6 +1359,8 @@ private void initScheduler(Configuration conf) throws IOException { } catch (Exception e) { throw new IOException("Failed to initialize FairScheduler", e); } + + queueMgr.updateNodeLabels(); } @VisibleForTesting diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index c08d13e9c48..e72d1ce9c0b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -42,6 +42,7 @@ import java.util.Iterator; import java.util.Set; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; /** * Maintains a list of queues as well as scheduling parameters for each queue, @@ -85,6 +86,20 @@ public void initialize(Configuration conf) throws IOException, } /** + * After the allocation configuration has been loaded, + * register the queues with the node labels manager. + */ + void updateNodeLabels() { + Map> labelsMap = new HashMap<>(); + + for (FSQueue queue : queues.values()) { + labelsMap.put(queue.getName(), queue.getAccessibleNodeLabels()); + } + + scheduler.getLabelsManager().reinitializeQueueLabels(labelsMap); + } + + /** * Get a leaf queue by name, creating it if the create param is true and is necessary. * If the queue is not or can not be a leaf queue, i.e. it already exists as a * parent queue, or one of the parents in its name is already a leaf queue, @@ -276,10 +291,14 @@ private FSQueue createNewQueues(FSQueueType queueType, String queueName = i.next(); // Check if child policy is allowed - SchedulingPolicy childPolicy = scheduler.getAllocationConfiguration(). - getSchedulingPolicy(queueName); + SchedulingPolicy childPolicy = queueConf.getSchedulingPolicy(queueName); + if (!parent.getPolicy().isChildPolicyAllowed(childPolicy)) { - LOG.error("Can't create queue '" + queueName + "'."); + LOG.error("Can't create queue '" + queueName + "' because the " + + "configured policy (" + childPolicy.getName() + ") is not " + + "allowed by the parent queue's policy (" + + parent.getPolicy().getName() + ")."); + return null; } @@ -301,6 +320,8 @@ private FSQueue createNewQueues(FSQueueType queueType, parent.addChildQueue(queue); setChildResourceLimits(parent, queue, queueConf); queues.put(queue.getName(), queue); + scheduler.getLabelsManager().addQueue(queueName, + queueConf.getAccessibleNodeLabels(queueName), true); // If we just created a leaf node, the newParent is null, but that's OK // because we only create a leaf node in the very last iteration. @@ -523,6 +544,8 @@ public void updateAllocationConfiguration(AllocationConfiguration queueConf) { rootQueue.reinit(true); // Update steady fair shares for all queues rootQueue.recomputeSteadyShares(); + + updateNodeLabels(); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index e58b3572968..f371ad8205c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -98,11 +98,9 @@ public Resource getHeadroom(Resource queueFairShare, Resource queueUsage, int queueAvailableCPU = Math.max(queueFairShare.getVirtualCores() - queueUsage .getVirtualCores(), 0); - Resource headroom = Resources.createResource( + return Resources.createResource( Math.min(maxAvailable.getMemorySize(), queueAvailableMemory), - Math.min(maxAvailable.getVirtualCores(), - queueAvailableCPU)); - return headroom; + Math.min(maxAvailable.getVirtualCores(), queueAvailableCPU)); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index 4fc0ea490b9..6729c8f4a98 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -506,7 +506,7 @@ private void mockNodeLabelsManager(String nodeLabelsConfigStr) throws IOExceptio p.indexOf(","))); boolean exclusivity = Boolean.valueOf(p.substring(p.indexOf(",") + 1, p.length())); - when(nlm.getResourceByLabel(eq(partitionName), any(Resource.class))) + when(nlm.getResourceByLabel(eq(partitionName))) .thenReturn(res); when(nlm.isExclusiveNodeLabel(eq(partitionName))).thenReturn(exclusivity); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index a14a2b13684..7c19ad6b763 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -48,7 +48,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.util.Clock; @@ -1087,7 +1086,7 @@ ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData, private void setResourceAndNodeDetails() { when(mCS.getClusterResource()).thenReturn(clusterResources); - when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn( + when(lm.getResourceByLabel(anyString())).thenReturn( clusterResources); FiCaSchedulerNode mNode = mock(FiCaSchedulerNode.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java index 1da6f93f664..a734c47591f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java @@ -88,37 +88,37 @@ public void testGetLabelResourceWhenNodeActiveDeactive() throws Exception { mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"), toNodeId("n2"), toSet("p2"), toNodeId("n3"), toSet("p3"))); - Assert.assertEquals(mgr.getResourceByLabel("p1", null), EMPTY_RESOURCE); - Assert.assertEquals(mgr.getResourceByLabel("p2", null), EMPTY_RESOURCE); - Assert.assertEquals(mgr.getResourceByLabel("p3", null), EMPTY_RESOURCE); - Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, null), + Assert.assertEquals(mgr.getResourceByLabel("p1"), EMPTY_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p2"), EMPTY_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p3"), EMPTY_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL), EMPTY_RESOURCE); // active two NM to n1, one large and one small mgr.activateNode(NodeId.newInstance("n1", 1), SMALL_RESOURCE); mgr.activateNode(NodeId.newInstance("n1", 2), LARGE_NODE); - Assert.assertEquals(mgr.getResourceByLabel("p1", null), + Assert.assertEquals(mgr.getResourceByLabel("p1"), Resources.add(SMALL_RESOURCE, LARGE_NODE)); // check add labels multiple times shouldn't overwrite // original attributes on labels like resource mgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("p1", "p4")); - Assert.assertEquals(mgr.getResourceByLabel("p1", null), + Assert.assertEquals(mgr.getResourceByLabel("p1"), Resources.add(SMALL_RESOURCE, LARGE_NODE)); - Assert.assertEquals(mgr.getResourceByLabel("p4", null), EMPTY_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p4"), EMPTY_RESOURCE); // change the large NM to small, check if resource updated mgr.updateNodeResource(NodeId.newInstance("n1", 2), SMALL_RESOURCE); - Assert.assertEquals(mgr.getResourceByLabel("p1", null), + Assert.assertEquals(mgr.getResourceByLabel("p1"), Resources.multiply(SMALL_RESOURCE, 2)); // deactive one NM, and check if resource updated mgr.deactivateNode(NodeId.newInstance("n1", 1)); - Assert.assertEquals(mgr.getResourceByLabel("p1", null), SMALL_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p1"), SMALL_RESOURCE); // continus deactive, check if resource updated mgr.deactivateNode(NodeId.newInstance("n1", 2)); - Assert.assertEquals(mgr.getResourceByLabel("p1", null), EMPTY_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p1"), EMPTY_RESOURCE); // Add two NM to n1 back mgr.activateNode(NodeId.newInstance("n1", 1), SMALL_RESOURCE); @@ -126,7 +126,7 @@ public void testGetLabelResourceWhenNodeActiveDeactive() throws Exception { // And remove p1, now the two NM should come to default label, mgr.removeFromClusterNodeLabels(ImmutableSet.of("p1")); - Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, null), + Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL), Resources.add(SMALL_RESOURCE, LARGE_NODE)); } @@ -152,10 +152,10 @@ public void testGetLabelResource() throws Exception { // change label of n1 to p2 mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p2"))); - Assert.assertEquals(mgr.getResourceByLabel("p1", null), EMPTY_RESOURCE); - Assert.assertEquals(mgr.getResourceByLabel("p2", null), + Assert.assertEquals(mgr.getResourceByLabel("p1"), EMPTY_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p2"), Resources.multiply(SMALL_RESOURCE, 2)); - Assert.assertEquals(mgr.getResourceByLabel("p3", null), SMALL_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p3"), SMALL_RESOURCE); // add more labels mgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("p4", "p5", "p6")); @@ -180,16 +180,16 @@ public void testGetLabelResource() throws Exception { mgr.activateNode(NodeId.newInstance("n9", 1), SMALL_RESOURCE); // check varibles - Assert.assertEquals(mgr.getResourceByLabel("p1", null), SMALL_RESOURCE); - Assert.assertEquals(mgr.getResourceByLabel("p2", null), + Assert.assertEquals(mgr.getResourceByLabel("p1"), SMALL_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p2"), Resources.multiply(SMALL_RESOURCE, 3)); - Assert.assertEquals(mgr.getResourceByLabel("p3", null), + Assert.assertEquals(mgr.getResourceByLabel("p3"), Resources.multiply(SMALL_RESOURCE, 2)); - Assert.assertEquals(mgr.getResourceByLabel("p4", null), + Assert.assertEquals(mgr.getResourceByLabel("p4"), Resources.multiply(SMALL_RESOURCE, 1)); - Assert.assertEquals(mgr.getResourceByLabel("p5", null), + Assert.assertEquals(mgr.getResourceByLabel("p5"), Resources.multiply(SMALL_RESOURCE, 1)); - Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, null), + Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL), Resources.multiply(SMALL_RESOURCE, 1)); // change a bunch of nodes -> labels @@ -212,24 +212,22 @@ public void testGetLabelResource() throws Exception { toNodeId("n9"), toSet("p1"))); // check varibles - Assert.assertEquals(mgr.getResourceByLabel("p1", null), + Assert.assertEquals(mgr.getResourceByLabel("p1"), Resources.multiply(SMALL_RESOURCE, 2)); - Assert.assertEquals(mgr.getResourceByLabel("p2", null), + Assert.assertEquals(mgr.getResourceByLabel("p2"), Resources.multiply(SMALL_RESOURCE, 3)); - Assert.assertEquals(mgr.getResourceByLabel("p3", null), + Assert.assertEquals(mgr.getResourceByLabel("p3"), Resources.multiply(SMALL_RESOURCE, 2)); - Assert.assertEquals(mgr.getResourceByLabel("p4", null), + Assert.assertEquals(mgr.getResourceByLabel("p4"), Resources.multiply(SMALL_RESOURCE, 0)); - Assert.assertEquals(mgr.getResourceByLabel("p5", null), + Assert.assertEquals(mgr.getResourceByLabel("p5"), Resources.multiply(SMALL_RESOURCE, 0)); - Assert.assertEquals(mgr.getResourceByLabel("", null), + Assert.assertEquals(mgr.getResourceByLabel(""), Resources.multiply(SMALL_RESOURCE, 2)); } @Test(timeout=5000) public void testGetQueueResource() throws Exception { - Resource clusterResource = Resource.newInstance(9999, 1); - /* * Node->Labels: * host1 : red @@ -266,15 +264,14 @@ public void testGetQueueResource() throws Exception { // check resource Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), - mgr.getQueueResource("Q1", q1Label, clusterResource)); + mgr.getQueueResource("Q1")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), - mgr.getQueueResource("Q2", q2Label, clusterResource)); + mgr.getQueueResource("Q2")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2), - mgr.getQueueResource("Q3", q3Label, clusterResource)); + mgr.getQueueResource("Q3")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 1), - mgr.getQueueResource("Q4", q4Label, clusterResource)); - Assert.assertEquals(clusterResource, - mgr.getQueueResource("Q5", q5Label, clusterResource)); + mgr.getQueueResource("Q4")); + Assert.assertNull(mgr.getQueueResource("Q5")); mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("host2"), toSet("blue"))); /* @@ -288,15 +285,14 @@ public void testGetQueueResource() throws Exception { // check resource Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), - mgr.getQueueResource("Q1", q1Label, clusterResource)); + mgr.getQueueResource("Q1")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), - mgr.getQueueResource("Q2", q2Label, clusterResource)); + mgr.getQueueResource("Q2")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), - mgr.getQueueResource("Q3", q3Label, clusterResource)); + mgr.getQueueResource("Q3")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2), - mgr.getQueueResource("Q4", q4Label, clusterResource)); - Assert.assertEquals(clusterResource, - mgr.getQueueResource("Q5", q5Label, clusterResource)); + mgr.getQueueResource("Q4")); + Assert.assertNull(mgr.getQueueResource("Q5")); /* * Check resource after deactive/active some nodes @@ -312,15 +308,14 @@ public void testGetQueueResource() throws Exception { // check resource Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2), - mgr.getQueueResource("Q1", q1Label, clusterResource)); + mgr.getQueueResource("Q1")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), - mgr.getQueueResource("Q2", q2Label, clusterResource)); + mgr.getQueueResource("Q2")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), - mgr.getQueueResource("Q3", q3Label, clusterResource)); + mgr.getQueueResource("Q3")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2), - mgr.getQueueResource("Q4", q4Label, clusterResource)); - Assert.assertEquals(clusterResource, - mgr.getQueueResource("Q5", q5Label, clusterResource)); + mgr.getQueueResource("Q4")); + Assert.assertNull(mgr.getQueueResource("Q5")); /* * Check resource after refresh queue: @@ -347,15 +342,14 @@ public void testGetQueueResource() throws Exception { // check resource Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2), - mgr.getQueueResource("Q1", q1Label, clusterResource)); + mgr.getQueueResource("Q1")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2), - mgr.getQueueResource("Q2", q2Label, clusterResource)); + mgr.getQueueResource("Q2")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2), - mgr.getQueueResource("Q3", q3Label, clusterResource)); + mgr.getQueueResource("Q3")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2), - mgr.getQueueResource("Q4", q4Label, clusterResource)); - Assert.assertEquals(clusterResource, - mgr.getQueueResource("Q5", q5Label, clusterResource)); + mgr.getQueueResource("Q4")); + Assert.assertNull(mgr.getQueueResource("Q5")); /* * Active NMs in nodes already have NM @@ -370,15 +364,14 @@ public void testGetQueueResource() throws Exception { // check resource Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), - mgr.getQueueResource("Q1", q1Label, clusterResource)); + mgr.getQueueResource("Q1")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), - mgr.getQueueResource("Q2", q2Label, clusterResource)); + mgr.getQueueResource("Q2")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), - mgr.getQueueResource("Q3", q3Label, clusterResource)); + mgr.getQueueResource("Q3")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), - mgr.getQueueResource("Q4", q4Label, clusterResource)); - Assert.assertEquals(clusterResource, - mgr.getQueueResource("Q5", q5Label, clusterResource)); + mgr.getQueueResource("Q4")); + Assert.assertNull(mgr.getQueueResource("Q5")); /* * Deactive NMs in nodes already have NMs @@ -393,15 +386,14 @@ public void testGetQueueResource() throws Exception { // check resource Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 1), - mgr.getQueueResource("Q1", q1Label, clusterResource)); + mgr.getQueueResource("Q1")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 1), - mgr.getQueueResource("Q2", q2Label, clusterResource)); + mgr.getQueueResource("Q2")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 1), - mgr.getQueueResource("Q3", q3Label, clusterResource)); + mgr.getQueueResource("Q3")); Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 1), - mgr.getQueueResource("Q4", q4Label, clusterResource)); - Assert.assertEquals(clusterResource, - mgr.getQueueResource("Q5", q5Label, clusterResource)); + mgr.getQueueResource("Q4")); + Assert.assertNull(mgr.getQueueResource("Q5")); } @Test(timeout=5000) @@ -414,7 +406,7 @@ public void testGetLabelResourceWhenMultipleNMsExistingInSameHost() throws IOExc // check resource of no label, it should be small * 4 Assert.assertEquals( - mgr.getResourceByLabel(CommonNodeLabelsManager.NO_LABEL, null), + mgr.getResourceByLabel(CommonNodeLabelsManager.NO_LABEL), Resources.multiply(SMALL_RESOURCE, 4)); // change two of these nodes to p1, check resource of no_label and P1 @@ -424,10 +416,10 @@ public void testGetLabelResourceWhenMultipleNMsExistingInSameHost() throws IOExc // check resource Assert.assertEquals( - mgr.getResourceByLabel(CommonNodeLabelsManager.NO_LABEL, null), + mgr.getResourceByLabel(CommonNodeLabelsManager.NO_LABEL), Resources.multiply(SMALL_RESOURCE, 2)); Assert.assertEquals( - mgr.getResourceByLabel("p1", null), + mgr.getResourceByLabel("p1"), Resources.multiply(SMALL_RESOURCE, 2)); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index eef86a44990..363ecf7747e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -276,8 +276,8 @@ public static RMContext createRMContext(Configuration conf) { new ClientToAMTokenSecretManagerInRM(), null)); RMNodeLabelsManager nlm = mock(RMNodeLabelsManager.class); - when(nlm.getQueueResource(any(String.class), anySetOf(String.class), - any(Resource.class))).thenAnswer(new Answer() { + when(nlm.getQueueResource(any(String.class))) + .thenAnswer(new Answer() { @Override public Resource answer(InvocationOnMock invocation) throws Throwable { Object[] args = invocation.getArguments(); @@ -285,7 +285,7 @@ public Resource answer(InvocationOnMock invocation) throws Throwable { } }); - when(nlm.getResourceByLabel(any(String.class), any(Resource.class))) + when(nlm.getResourceByLabel(any(String.class))) .thenAnswer(new Answer() { @Override public Resource answer(InvocationOnMock invocation) throws Throwable { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index a32352b3af2..9cd637f95a2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -3916,7 +3916,7 @@ public void testApplicationQueuePercent() when(rmContext.getRMApps()) .thenReturn(new ConcurrentHashMap()); RMNodeLabelsManager nlm = mock(RMNodeLabelsManager.class); - when(nlm.getResourceByLabel(any(), any())).thenReturn(res); + when(nlm.getResourceByLabel(any())).thenReturn(res); when(rmContext.getNodeLabelManager()).thenReturn(nlm); // Queue "test" consumes 100% of the cluster, so its capacity and absolute diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 4bc5127e9da..da890b509f6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -109,9 +109,8 @@ public void register(Class eventType, new NMTokenSecretManagerInRM(conf), new ClientToAMTokenSecretManagerInRM()); RMNodeLabelsManager nlm = mock(RMNodeLabelsManager.class); - when( - nlm.getQueueResource(any(String.class), any(Set.class), - any(Resource.class))).thenAnswer(new Answer() { + when(nlm.getQueueResource(any(String.class))) + .thenAnswer(new Answer() { @Override public Resource answer(InvocationOnMock invocation) throws Throwable { Object[] args = invocation.getArguments(); @@ -119,7 +118,7 @@ public Resource answer(InvocationOnMock invocation) throws Throwable { } }); - when(nlm.getResourceByLabel(any(String.class), any(Resource.class))) + when(nlm.getResourceByLabel(any(String.class))) .thenAnswer(new Answer() { @Override public Resource answer(InvocationOnMock invocation) throws Throwable { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index af4e1dd32a0..1028999b990 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -118,7 +118,15 @@ protected ResourceRequest createResourceRequest( protected ResourceRequest createResourceRequest( int memory, int vcores, String host, int priority, int numContainers, boolean relaxLocality) { - ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class); + return createResourceRequest(memory, vcores, host, priority, numContainers, + relaxLocality, RMNodeLabelsManager.NO_LABEL); + } + + protected ResourceRequest createResourceRequest( + int memory, int vcores, String host, int priority, int numContainers, + boolean relaxLocality, String label) { + ResourceRequest request = + recordFactory.newRecordInstance(ResourceRequest.class); request.setCapability(BuilderUtils.newResource(memory, vcores)); request.setResourceName(host); request.setNumContainers(numContainers); @@ -126,7 +134,8 @@ protected ResourceRequest createResourceRequest( prio.setPriority(priority); request.setPriority(prio); request.setRelaxLocality(relaxLocality); - request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL); + request.setNodeLabelExpression(label); + return request; } @@ -163,6 +172,13 @@ protected ApplicationAttemptId createSchedulingRequest( protected ApplicationAttemptId createSchedulingRequest( int memory, int vcores, String queueId, String userId, int numContainers, int priority) { + return createSchedulingRequest(memory, vcores, queueId, userId, + numContainers, priority, RMNodeLabelsManager.NO_LABEL); + } + + protected ApplicationAttemptId createSchedulingRequest( + int memory, int vcores, String queueId, String userId, int numContainers, + int priority, String label) { ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); scheduler.addApplication(id.getApplicationId(), queueId, userId, false); // This conditional is for testAclSubmitApplication where app is rejected @@ -170,9 +186,9 @@ protected ApplicationAttemptId createSchedulingRequest( if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) { scheduler.addApplicationAttempt(id, false, false); } - List ask = new ArrayList(); + List ask = new ArrayList<>(); ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, - priority, numContainers, true); + priority, numContainers, true, label); ask.add(request); RMApp rmApp = mock(RMApp.class); @@ -188,8 +204,8 @@ protected ApplicationAttemptId createSchedulingRequest( resourceManager.getRMContext().getRMApps() .put(id.getApplicationId(), rmApp); - scheduler.allocate(id, ask, new ArrayList(), - null, null, NULL_UPDATE_REQUESTS); + scheduler.allocate(id, ask, new ArrayList<>(), null, null, + NULL_UPDATE_REQUESTS); scheduler.update(); return id; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSNodeLabel.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSNodeLabel.java new file mode 100644 index 00000000000..4c67459c7a3 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSNodeLabel.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + + +@SuppressWarnings("unchecked") +public class TestFSNodeLabel extends FairSchedulerTestBase { + private final static String ALLOC_FILE = + new File(TEST_DIR, "test-queues").getAbsolutePath(); + + @Before + public void setUp() throws IOException { + scheduler = new FairScheduler(); + conf = createConfiguration(); + conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + resourceManager = new ResourceManager(); + resourceManager.init(conf); + + // TODO: This test should really be using MockRM. For now starting stuff + // that is needed at a bare minimum. + ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); + resourceManager.getRMContext().getStateStore().start(); + + // to initialize the master key + resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey(); + + scheduler.setRMContext(resourceManager.getRMContext()); + } + + @After + public void tearDown() { + if (scheduler != null) { + scheduler.stop(); + scheduler = null; + } + if (resourceManager != null) { + resourceManager.stop(); + resourceManager = null; + } + QueueMetrics.clearQueueMetrics(); + DefaultMetricsSystem.shutdown(); + } + + @Test + public void testAssignmentWithNodeLabel() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println("1024mb,0vcores"); + out.println(""); + out.println(""); + out.println("1024mb,0vcores"); + out.println("label2"); + out.println(""); + out.println(""); + out.println("label3,label4"); + out.println(""); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Create 3 nodes + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + RMNode node3 = + MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, + "127.0.0.3"); + NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); + scheduler.handle(nodeEvent3); + // add node label + Set clusterLabel = new HashSet<>(); + clusterLabel.add(NodeLabel.newInstance("label1")); + clusterLabel.add(NodeLabel.newInstance("label2")); + clusterLabel.add(NodeLabel.newInstance("label3")); + scheduler.getLabelsManager().addToCluserNodeLabels(clusterLabel); + Map> nodeLabelMap = new HashMap<>(); + // node1: label1 + // node2: label2, label3 + // node3: + Set nodeLabel1 = new HashSet<>(); + nodeLabel1.add(RMNodeLabelsManager.NO_LABEL); + Set nodeLabel2 = new HashSet<>(); + nodeLabel2.add("label2"); + Set nodeLabel3 = new HashSet<>(); + nodeLabel3.add("label3"); + nodeLabelMap.put(node1.getNodeID(), nodeLabel1); + nodeLabelMap.put(node2.getNodeID(), nodeLabel2); + nodeLabelMap.put(node3.getNodeID(), nodeLabel3); + scheduler.getLabelsManager().addLabelsToNode(nodeLabelMap); + + //case1 : app submitted into queue without node label could allocated on each node + ApplicationAttemptId appAttId1 = createSchedulingRequest(3074, 1, "root.queueA", + "user1", 3 , 3); + scheduler.update(); + NodeUpdateSchedulerEvent nodeUpdate11 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeUpdate12 = new NodeUpdateSchedulerEvent(node2); + NodeUpdateSchedulerEvent nodeUpdate13 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate11); + scheduler.handle(nodeUpdate12); + scheduler.handle(nodeUpdate13); + // app2 will allocated on node1 + for (RMContainer container: scheduler.getSchedulerApp(appAttId1).getLiveContainers()) { + assertTrue(container.getAllocatedNode().equals(node1.getNodeID())); + } + AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( + appAttId1, RMAppAttemptState.FINISHED, false); + scheduler.handle(appRemovedEvent1); + scheduler.update(); + + //case2 : app submitted into queueB could only be allocated on node1 + // now super app finished + ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 1, + "root.queueB", "user1", 3 , 3, "label2"); + scheduler.update(); + NodeUpdateSchedulerEvent nodeUpdate21 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeUpdate22 = new NodeUpdateSchedulerEvent(node2); + NodeUpdateSchedulerEvent nodeUpdate23 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate21); + scheduler.handle(nodeUpdate22); + scheduler.handle(nodeUpdate23); + // app2 will allocated on node2 + for (RMContainer container: scheduler.getSchedulerApp(appAttId2).getLiveContainers()) { + assertTrue(container.getAllocatedNode().equals(node2.getNodeID())); + } + AppAttemptRemovedSchedulerEvent appRemovedEvent2 = new AppAttemptRemovedSchedulerEvent( + appAttId2, RMAppAttemptState.FINISHED, false); + scheduler.handle(appRemovedEvent2); + scheduler.update(); + + //case3 : app submitted into queueC could be allocated on node3 + // now super app finished + ApplicationAttemptId appAttId3 = createSchedulingRequest(1024, 1, + "root.queueC", "user1", 3, 3, "label3"); + scheduler.update(); + NodeUpdateSchedulerEvent nodeUpdate31 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeUpdate32 = new NodeUpdateSchedulerEvent(node2); + NodeUpdateSchedulerEvent nodeUpdate33 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate31); + scheduler.handle(nodeUpdate32); + scheduler.handle(nodeUpdate33); + // app3 will allocated on node2 + for (RMContainer container: scheduler.getSchedulerApp(appAttId3).getLiveContainers()) { + assertTrue(container.getAllocatedNode().equals(node3.getNodeID())); + } + AppAttemptRemovedSchedulerEvent appRemovedEvent3 = new AppAttemptRemovedSchedulerEvent( + appAttId3, RMAppAttemptState.FINISHED, false); + scheduler.handle(appRemovedEvent3); + scheduler.update(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSParentQueue.java index 3ae8f832150..45c5fa6ed43 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSParentQueue.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.Set; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; @@ -40,11 +41,15 @@ public void setUp() throws Exception { conf = new FairSchedulerConfiguration(); FairScheduler scheduler = mock(FairScheduler.class); AllocationConfiguration allocConf = new AllocationConfiguration(conf); + RMNodeLabelsManager labelsMgr = new RMNodeLabelsManager(); + + labelsMgr.init(conf); when(scheduler.getAllocationConfiguration()).thenReturn(allocConf); when(scheduler.getConf()).thenReturn(conf); + when(scheduler.getLabelsManager()).thenReturn(labelsMgr); SystemClock clock = SystemClock.getInstance(); when(scheduler.getClock()).thenReturn(clock); - notEmptyQueues = new HashSet(); + notEmptyQueues = new HashSet<>(); queueManager = new QueueManager(scheduler) { @Override public boolean isEmpty(FSQueue queue) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 0ef4d7ba5ea..85d23d46e8b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -133,7 +133,7 @@ public void setUp() throws IOException { scheduler = new FairScheduler(); conf = createConfiguration(); - resourceManager = new MockRM(conf); + resourceManager = new MockRM(conf, null, false, false); ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); resourceManager.getRMContext().getStateStore().start(); @@ -319,14 +319,19 @@ public void testSimpleFairShareCalculation() throws IOException { scheduler.getQueueManager().getRootQueue().recomputeSteadyShares(); Collection queues = scheduler.getQueueManager().getLeafQueues(); - assertEquals(3, queues.size()); + assertEquals("Unexpected number of queues", 3, queues.size()); // Divided three ways - between the two queues and the default queue for (FSLeafQueue p : queues) { - assertEquals(3414, p.getFairShare().getMemorySize()); - assertEquals(3414, p.getMetrics().getFairShareMB()); - assertEquals(3414, p.getSteadyFairShare().getMemorySize()); - assertEquals(3414, p.getMetrics().getSteadyFairShareMB()); + assertEquals("Fair share for " + p.getName() + " should be 1/3 of the " + + "cluster", 3414, p.getFairShare().getMemorySize()); + assertEquals("Fair share in metrics for " + p.getName() + " should be " + + "1/3 of the cluster", 3414, p.getMetrics().getFairShareMB()); + assertEquals("Steady fair share for " + p.getName() + " should be 1/3 of " + + "the cluster", 3414, p.getSteadyFairShare().getMemorySize()); + assertEquals("Steady fair share in metrics for " + p.getName() + + " should be 1/3 of the cluster", + 3414, p.getMetrics().getSteadyFairShareMB()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java index 09c2370395a..f652d62acd3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java @@ -27,7 +27,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.junit.After; import org.junit.Before; @@ -41,6 +43,13 @@ @Before public void setUp() throws IOException { scheduler = new FairScheduler(); + + RMContextImpl context = new RMContextImpl(); + RMNodeLabelsManager labelsMgr = new RMNodeLabelsManager(); + + labelsMgr.init(new Configuration()); + context.setNodeLabelManager(labelsMgr); + scheduler.setRMContext(context); Configuration conf = new YarnConfiguration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java index f77df1ea399..0e53e200f13 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.util.ControlledClock; import org.junit.Before; import org.junit.Test; @@ -47,15 +48,19 @@ @Before public void setup() throws Exception { Configuration conf = new Configuration(); + AllocationConfiguration allocConf = new AllocationConfiguration( + conf); + RMNodeLabelsManager lablesMgr = new RMNodeLabelsManager(); + clock = new ControlledClock(); scheduler = mock(FairScheduler.class); when(scheduler.getConf()).thenReturn( new FairSchedulerConfiguration(conf)); when(scheduler.getClock()).thenReturn(clock); - AllocationConfiguration allocConf = new AllocationConfiguration( - conf); + when(scheduler.getLabelsManager()).thenReturn(lablesMgr); when(scheduler.getAllocationConfiguration()).thenReturn(allocConf); + lablesMgr.init(conf); queueManager = new QueueManager(scheduler); queueManager.initialize(conf); userMaxApps = allocConf.userMaxApps; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java index 05ab11c3150..1a2c5b74105 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java @@ -29,6 +29,7 @@ import org.junit.Test; import com.google.common.collect.Sets; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; public class TestQueueManager { private FairSchedulerConfiguration conf; @@ -42,12 +43,16 @@ public void setUp() throws Exception { scheduler = mock(FairScheduler.class); AllocationConfiguration allocConf = new AllocationConfiguration(conf); + RMNodeLabelsManager labelsMgr = new RMNodeLabelsManager(); + + labelsMgr.init(conf); // Set up some queues to test default child max resource inheritance allocConf.configuredQueues.get(FSQueueType.PARENT).add("root.test"); allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.test.childA"); allocConf.configuredQueues.get(FSQueueType.PARENT).add("root.test.childB"); + when(scheduler.getLabelsManager()).thenReturn(labelsMgr); when(scheduler.getAllocationConfiguration()).thenReturn(allocConf); when(scheduler.getConf()).thenReturn(conf); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java index b016c1b4fb8..152a97e3676 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java @@ -30,6 +30,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; @@ -52,6 +55,13 @@ public void setUp() throws Exception { scheduler = new FairScheduler(); conf = new FairSchedulerConfiguration(); + + RMContext context = new RMContextImpl(); + RMNodeLabelsManager labelsMgr = new RMNodeLabelsManager(); + + context.setNodeLabelManager(labelsMgr); + scheduler.setRMContext(context); + labelsMgr.init(conf); } public void testParseSchedulingPolicy()