commit bf61a99e01d362573942c47d2e4ecf03c0cf54f6 Author: Wangda Tan Date: Tue Jan 3 17:19:11 2017 -0800 YARN-5864.2 TODO: adding reservation-changing logic TODO: adding more test cases for priority-selector diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index ab36a4e..c090749 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -183,10 +183,6 @@ - - - - diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java index 8255a30..a80f317 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -49,13 +50,11 @@ @Override public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) { - if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) { - return -1; - } - if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) { - return 1; - } - return 0; + double assigned1 = getIdealPctOfGuaranteed(tq1); + double assigned2 = getIdealPctOfGuaranteed(tq2); + + return PriorityUtilizationQueueOrderingPolicy.compare(assigned1, + assigned2, tq1.relativePriority, tq2.relativePriority); } // Calculates idealAssigned / guaranteed @@ -156,6 +155,7 @@ protected void computeFixpointAllocation(Resource totGuarant, // way, the most underserved queue(s) are always given resources first. Collection underserved = getMostUnderservedQueues( orderedByNeed, tqComparator); + for (Iterator i = underserved.iterator(); i .hasNext();) { TempQueuePerPartition sub = i.next(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java index b48a287..4d8afaf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java @@ -48,7 +48,8 @@ * @param selectedCandidates already selected candidates from previous policies * @param clusterResource total resource * @param totalPreemptedResourceAllowed how many resources allowed to be - * preempted in this round + * preempted in this round. Should be + * updated(in-place set) after the call * @return merged selected candidates. */ public abstract Map> selectCandidates( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 324e845..de60148 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -19,6 +19,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; +import javafx.scene.Parent; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -36,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptableQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; @@ -214,6 +217,14 @@ public void init(Configuration config, RMContext context, if (isIntraQueuePreemptionEnabled) { candidatesSelectionPolicies.add(new IntraQueueCandidatesSelector(this)); } + + // Do we need white queue-priority preemption policy? + boolean isQueuePriorityPreemptionEnabled = + csConfig.getPriorityUtilizationOrderingPolicyPreemptionEnabled(); + if (isQueuePriorityPreemptionEnabled) { + candidatesSelectionPolicies.add( + new QueuePriorityContainerCandidateSelector(this)); + } } @Override @@ -352,6 +363,8 @@ private void containerBasedPreemptOrKill(CSQueue root, .clone(nlm.getResourceByLabel(partitionToLookAt, clusterResources)), partitionToLookAt); } + + // Update effective priority of queues } this.leafQueueNames = ImmutableSet.copyOf(getLeafQueueNames( @@ -470,11 +483,22 @@ private TempQueuePerPartition cloneQueues(CSQueue curQueue, reserved, curQueue); if (curQueue instanceof ParentQueue) { + String configuredOrderingPolicy = + ((ParentQueue) curQueue).getQueueOrderingPolicy().getConfigName(); + // Recursively add children for (CSQueue c : curQueue.getChildQueues()) { TempQueuePerPartition subq = cloneQueues(c, partitionResource, partitionToLookAt); + + // If we respect priority + if (StringUtils.equals( + CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY, + configuredOrderingPolicy)) { + subq.relativePriority = c.getPriority().getPriority(); + } ret.addChild(subq); + subq.parent = ret; } } } finally { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java new file mode 100644 index 0000000..3f260e8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java @@ -0,0 +1,435 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Table; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class QueuePriorityContainerCandidateSelector + extends PreemptionCandidatesSelector { + private static final Log LOG = + LogFactory.getLog(QueuePriorityContainerCandidateSelector.class); + + // Configured timeout before doing reserved container preemption + private long minTimeout; + + // All the reserved containers of the system which could possible preempt from + // queue with lower priorities + private List reservedContainers; + + // From -> To + // A digraph to represent if one queue has higher priority than another. + // For example, a->b means queue=a has higher priority than queue=b + private Table priorityDigraph = null; + + private Resource clusterResource; + private Map> selectedCandidates; + private Resource totalPreemptionAllowed; + + // A cached scheduler node map, will be refreshed each round. + private Map tempSchedulerNodeMap = new HashMap<>(); + + // Have we touched (make any changes to the node) for this round + // Once a node is touched, we will not try to move reservations to the node + private Set touchedNodes; + + private static final Comparator + CONTAINER_CREATION_TIME_COMPARATOR = new Comparator() { + @Override + public int compare(RMContainer o1, RMContainer o2) { + return Long.compare(o1.getCreationTime(), o2.getCreationTime()); + } + }; + + QueuePriorityContainerCandidateSelector( + CapacitySchedulerPreemptionContext preemptionContext) { + super(preemptionContext); + + // Initialize parameters + CapacitySchedulerConfiguration csc = + preemptionContext.getScheduler().getConfiguration(); + + minTimeout = csc.getPriorityUtilizationOrderingPolicyPreemptionTimeout(); + } + + private List getPathToRoot(TempQueuePerPartition tq) { + List list = new ArrayList<>(); + while (tq != null) { + list.add(tq); + tq = tq.parent; + } + return list; + } + + private void intializePriorityDigraph() { + // Make sure we iterate all leaf queue combinations + for (String q1 : preemptionContext.getLeafQueueNames()) { + for (String q2 : preemptionContext.getLeafQueueNames()) { + // Make sure we only calculate each combination once instead of all + // permutations + if (q1.compareTo(q2) < 0) { + TempQueuePerPartition tq1 = preemptionContext.getQueueByPartition(q1, + RMNodeLabelsManager.NO_LABEL); + TempQueuePerPartition tq2 = preemptionContext.getQueueByPartition(q2, + RMNodeLabelsManager.NO_LABEL); + + List path1 = getPathToRoot(tq1); + List path2 = getPathToRoot(tq2); + + // Get direct ancestor below LCA (Lowest common ancestor) + int i = path1.size() - 1; + int j = path2.size() - 1; + while (path1.get(i).queueName.equals(path2.get(j).queueName)) { + i--; + j--; + } + + // compare priority of path1[i] and path2[j] + int p1 = path1.get(i).relativePriority; + int p2 = path2.get(j).relativePriority; + if (p1 < p2) { + priorityDigraph.put(q2, q1, true); + if (LOG.isDebugEnabled()) { + LOG.debug("- Added priority ordering edge: " + q2 + " >> " + q1); + } + } else if (p2 < p1) { + priorityDigraph.put(q1, q2, true); + if (LOG.isDebugEnabled()) { + LOG.debug("- Added priority ordering edge: " + q1 + " >> " + q2); + } + } + } + } + } + } + + /** + * Do we allow demandingQueue preempt resource from toBePreemptedQueue + * + * @param demandingQueue demandingQueue + * @param toBePreemptedQueue toBePreemptedQueue + * @return can/canno + */ + private boolean preemptionAllowed(String demandingQueue, + String toBePreemptedQueue) { + return priorityDigraph.contains(demandingQueue, + toBePreemptedQueue); + } + + /** + * Can we preempt enough resource for given: + * + * @param requiredResource askedResource + * @param demandingQueue demandingQueue + * @param schedulerNode node + * @param toBeUnreservedContainer container-to-be-unreserved, set when trying + * to move reserved container to different nodes + * @param lookingForNewReservationPlacement Are we trying to look for move + * reservation to the node + * @param newlySelectedContainers newly selected containers, will be set when + * we can preempt enough resources from the node. + * + * @return can/cannot + */ + private boolean canPreemptEnoughResourceForAsked(Resource requiredResource, + String demandingQueue, FiCaSchedulerNode schedulerNode, + RMContainer toBeUnreservedContainer, + boolean lookingForNewReservationPlacement, + List newlySelectedContainers) { + // Do not check touched nodes again. + if (touchedNodes.contains(schedulerNode.getNodeID())) { + return false; + } + + TempSchedulerNode node = tempSchedulerNodeMap.get(schedulerNode.getNodeID()); + if (null == node) { + node = TempSchedulerNode.fromSchedulerNode(schedulerNode); + tempSchedulerNodeMap.put(schedulerNode.getNodeID(), node); + } + + if (null != toBeUnreservedContainer) { + if (lookingForNewReservationPlacement) { + // Node reserved by the same queue, skip this node + // We will not try to move the reservation to node which reserved resource + // from the same queue. + if (toBeUnreservedContainer.getQueueName().equals(demandingQueue)) { + return false; + } + + // Node reserved by another queue, and unfortunately we're not be able + // to preempt from the queue + if (!preemptionAllowed(demandingQueue, + toBeUnreservedContainer.getQueueName())) { + return false; + } + } + } + + // Need to preemption = asked - (node.total - node.allocated) + Resource lacking = Resources.subtract(requiredResource, Resources + .subtract(node.getTotalResource(), node.getAllocatedResource())); + + // On each host, simply check if we could preempt containers from + // lower-prioritized queues or not + List runningContainers = node.getRunningContainers(); + Collections.sort(runningContainers, CONTAINER_CREATION_TIME_COMPARATOR); + + // First of all, consider already selected containers + for (RMContainer runningContainer : runningContainers) { + if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected( + runningContainer, selectedCandidates)) { + Resources.subtractFrom(lacking, + runningContainer.getAllocatedResource()); + } + } + + // If we already can allocate the reserved container after preemption, + // skip following steps + if (Resources.fitsIn(rc, clusterResource, lacking, + Resources.none())) { + return true; + } + + Resource allowed = Resources.clone(totalPreemptionAllowed); + Resource selected = Resources.createResource(0); + + for (RMContainer runningContainer : runningContainers) { + // Only preempt resource from queue with lower priority + if (!preemptionAllowed(demandingQueue, + runningContainer.getQueueName())) { + continue; + } + + // Don't preempt AM container + if (runningContainer.isAMContainer()) { + continue; + } + + // Not allow to preempt more than limit + if (Resources.greaterThanOrEqual(rc, clusterResource, allowed, + runningContainer.getAllocatedResource())) { + Resources.subtractFrom(allowed, + runningContainer.getAllocatedResource()); + Resources.subtractFrom(lacking, + runningContainer.getAllocatedResource()); + Resources.addTo(selected, runningContainer.getAllocatedResource()); + + if (null != newlySelectedContainers) { + newlySelectedContainers.add(runningContainer); + } + } + + // Lacking <= 0 means we can allocate the reserved container + if (Resources.fitsIn(rc, clusterResource, lacking, Resources.none())) { + return true; + } + } + + return false; + } + + private boolean preChecksForMovingReservedContainerToNode( + RMContainer reservedContainer, FiCaSchedulerNode newNode) { + // Don't do this if it has hard-locality preferences + if (reservedContainer.getReservedSchedulerKey().getContainerToUpdate() + != null) { + // This means a container update request (like increase / promote) + return false; + } + + // For normal requests + FiCaSchedulerApp app = + preemptionContext.getScheduler().getApplicationAttempt( + reservedContainer.getApplicationAttemptId()); + if (!app.getAppSchedulingInfo().canDelayTo( + reservedContainer.getAllocatedSchedulerKey(), ResourceRequest.ANY)) { + // This is a hard locality request + return false; + } + + // Check if newNode's partition matches requested partition + if (!StringUtils.equals(reservedContainer.getNodeLabelExpression(), + newNode.getPartition())) { + return false; + } + + return true; + } + + private void tryToMakeBetterReservationPlacement( + RMContainer reservedContainer, + List allSchedulerNodes) { + for (FiCaSchedulerNode targetNode : allSchedulerNodes) { + // Precheck if we can move the rmContainer to the new targetNode + if (!preChecksForMovingReservedContainerToNode(reservedContainer, + targetNode)) { + continue; + } + + RMContainer toBeUnreservedContainer = targetNode.getReservedContainer(); + if (canPreemptEnoughResourceForAsked( + reservedContainer.getReservedResource(), + reservedContainer.getQueueName(), targetNode, toBeUnreservedContainer, + true, null)) { + NodeId fromNode = reservedContainer.getNodeId(); + + // We can place container to this targetNode, so just go ahead and notify + // scheduler + if (preemptionContext.getScheduler().moveReservedContainer( + reservedContainer, targetNode.getReservedContainer(), targetNode)) { + LOG.info("Successfully moved reserved container=" + reservedContainer + .getContainerId() + " from targetNode=" + fromNode + + " to targetNode=" + targetNode.getNodeID()); + touchedNodes.add(targetNode.getNodeID()); + } + } + } + } + + @Override + public Map> selectCandidates( + Map> selectedCandidates, + Resource clusterResource, + Resource totalPreemptedResourceAllowed) { + // Save parameters to be shared by other methods + this.selectedCandidates = selectedCandidates; + this.clusterResource = clusterResource; + this.totalPreemptionAllowed = totalPreemptedResourceAllowed; + + if (null == priorityDigraph) { + // Initialize digraph for the first time + priorityDigraph = HashBasedTable.create(); + intializePriorityDigraph(); + } + + if (priorityDigraph.isEmpty()) { + return selectedCandidates; + } + + reservedContainers = new ArrayList<>(); + + // Clear temp-scheduler-node-map every time when doing selection of + // containers. + tempSchedulerNodeMap.clear(); + touchedNodes = new HashSet<>(); + + // Add all reserved containers for analysis + List allSchedulerNodes = + preemptionContext.getScheduler().getAllNodes(); + for (FiCaSchedulerNode node : allSchedulerNodes) { + RMContainer reservedContainer = node.getReservedContainer(); + if (null != reservedContainer) { + // Add to reservedContainers list if the queue that the reserved + // container belongs to has high priority than at least one queue + if (priorityDigraph.containsRow( + reservedContainer.getQueueName())) { + reservedContainers.add(reservedContainer); + } + } + } + + // Sort reserved container by creation time + Collections.sort(reservedContainers, CONTAINER_CREATION_TIME_COMPARATOR); + + long currentTime = System.currentTimeMillis(); + + // From the begining of the list + for (RMContainer reservedContainer : reservedContainers) { + // Only try to preempt reserved container after reserved container created + // and cannot be allocated after minTimeout + if (currentTime - reservedContainer.getCreationTime() < minTimeout) { + break; + } + + FiCaSchedulerNode node = preemptionContext.getScheduler().getNode( + reservedContainer.getReservedNode()); + if (null == node) { + // Something is wrong, ignore + continue; + } + + List newlySelectedToBePreemptContainers = new ArrayList<>(); + boolean canPreempt = canPreemptEnoughResourceForAsked( + reservedContainer.getReservedResource(), + reservedContainer.getQueueName(), node, null, false, + newlySelectedToBePreemptContainers); + + // Add selected container if we can allocate reserved container by + // preemption others + if (canPreempt) { + touchedNodes.add(node.getNodeID()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to preempt following containers to make reserved " + + "container=" + reservedContainer.getContainerId() + " on node=" + + node.getNodeID() + " can be allocated:"); + } + + for (RMContainer c : newlySelectedToBePreemptContainers) { + if (LOG.isDebugEnabled()) { + LOG.debug(" --container=" + c.getContainerId() + " resource=" + c + .getReservedResource()); + } + + Set containers = selectedCandidates.get( + c.getApplicationAttemptId()); + if (null == containers) { + containers = new HashSet<>(); + selectedCandidates.put(c.getApplicationAttemptId(), containers); + } + containers.add(c); + + // Update totalPreemptionResourceAllowed + Resources.subtractFrom(totalPreemptedResourceAllowed, + c.getAllocatedResource()); + } + } else { + // We failed to get enough resource to allocate the container + // This typically happens when the reserved node is proper, will + // try to see if we can reserve the container on a better host. + tryToMakeBetterReservationPlacement(reservedContainer, + allSchedulerNodes); + } + } + + return selectedCandidates; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java index 28099c4..2bd097c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java @@ -51,6 +51,12 @@ LeafQueue leafQueue; boolean preemptionDisabled; + // Relative priority of this queue to its parent + // If parent queue's ordering policy doesn't respect priority, + // this will be always 0 + int relativePriority = 0; + TempQueuePerPartition parent = null; + TempQueuePerPartition(String queueName, Resource current, boolean preemptionDisabled, String partition, Resource killable, float absCapacity, float absMaxCapacity, Resource totalPartitionResource, @@ -112,8 +118,15 @@ Resource offer(Resource avail, ResourceCalculator rc, Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax( Resources.subtract(getMax(), idealAssigned), Resource.newInstance(0, 0)); - // remain = avail - min(avail, (max - assigned), (current + pending - - // assigned)) + // accepted = min{avail, + // max - assigned, + // current + pending - assigned, + // # Make sure a queue will not get more than max of its + // # used/guaranteed, this is to make sure preemption won't + // # happen if all active queues are beyond their guaranteed + // # This is for leaf queue only. + // max(guaranteed, used) - assigned} + // remain = avail - accepted Resource accepted = Resources.min(rc, clusterResource, absMaxCapIdealAssignedDelta, Resources.min(rc, clusterResource, avail, Resources @@ -136,6 +149,21 @@ Resource offer(Resource avail, ResourceCalculator rc, ? getUsed() : getUsedDeductReservd()), pending), idealAssigned))); + + // For leaf queue: accept = min(accept, max(guaranteed, used) - assigned) + // Why only for leaf queue? + // Because for a satisfied parent queue, it could have some under-utilized + // leaf queues. Such under-utilized leaf queue could preemption resources + // from over-utilized leaf queue located at other hierarchies. + if (null == children || children.isEmpty()) { + Resource maxOfGuranteedAndUsedDeductAssigned = Resources.subtract( + Resources.max(rc, clusterResource, getUsed(), getGuaranteed()), + idealAssigned); + maxOfGuranteedAndUsedDeductAssigned = Resources.max(rc, clusterResource, + maxOfGuranteedAndUsedDeductAssigned, Resources.none()); + accepted = Resources.min(rc, clusterResource, accepted, + maxOfGuranteedAndUsedDeductAssigned); + } Resource remain = Resources.subtract(avail, accepted); Resources.addTo(idealAssigned, accepted); return remain; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java new file mode 100644 index 0000000..320f262 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.List; + +/** + * This class will save necessary information which copied from + * FiCaSchedulerNode. This is added majorly for performance consideration, this + * can be cached to avoid hitting scheduler again and again. In addition, + * we can add some preemption-required fields to the class. + */ +public class TempSchedulerNode { + private List runningContainers; + private RMContainer reservedContainer; + private Resource totalResource; + + // excluded reserved resource + private Resource allocatedResource; + + // total - allocated + private Resource availableResource; + + // just a shortcut of reservedContainer.getResource. + private Resource reservedResource; + + private NodeId nodeId; + + public static TempSchedulerNode fromSchedulerNode( + FiCaSchedulerNode schedulerNode) { + TempSchedulerNode n = new TempSchedulerNode(); + n.totalResource = Resources.clone(schedulerNode.getTotalResource()); + n.allocatedResource = Resources.clone(schedulerNode.getAllocatedResource()); + n.runningContainers = schedulerNode.getCopiedListOfRunningContainers(); + n.reservedContainer = schedulerNode.getReservedContainer(); + if (n.reservedContainer != null) { + n.reservedResource = n.reservedContainer.getReservedResource(); + } else { + n.reservedResource = Resources.none(); + } + n.availableResource = Resources.subtract(n.totalResource, + n.allocatedResource); + n.nodeId = schedulerNode.getNodeID(); + return n; + } + + public NodeId getNodeId() { + return nodeId; + } + + public List getRunningContainers() { + return runningContainers; + } + + public void setRunningContainers(List runningContainers) { + this.runningContainers = runningContainers; + } + + public RMContainer getReservedContainer() { + return reservedContainer; + } + + public void setReservedContainer(RMContainer reservedContainer) { + this.reservedContainer = reservedContainer; + } + + public Resource getTotalResource() { + return totalResource; + } + + public void setTotalResource(Resource totalResource) { + this.totalResource = totalResource; + } + + public Resource getAllocatedResource() { + return allocatedResource; + } + + public void setAllocatedResource(Resource allocatedResource) { + this.allocatedResource = allocatedResource; + } + + public Resource getAvailableResource() { + return availableResource; + } + + public void setAvailableResource(Resource availableResource) { + this.availableResource = availableResource; + } + + public Resource getReservedResource() { + return reservedResource; + } + + public void setReservedResource(Resource reservedResource) { + this.reservedResource = reservedResource; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 79709a3..72ce1a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -556,7 +556,12 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { container.reservedResource = e.getReservedResource(); container.reservedNode = e.getReservedNode(); container.reservedSchedulerKey = e.getReservedSchedulerKey(); - + + Container c = container.getContainer(); + if (c != null) { + c.setNodeId(container.reservedNode); + } + if (!EnumSet.of(RMContainerState.NEW, RMContainerState.RESERVED) .contains(container.getState())) { // When container's state != NEW/RESERVED, it is an increase reservation diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 759db05..dc1bc7c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -360,7 +360,7 @@ public synchronized RMContainer getReservedContainer() { * Set the reserved container in the node. * @param reservedContainer Reserved container in the node. */ - protected synchronized void + public synchronized void setReservedContainer(RMContainer reservedContainer) { this.reservedContainer = reservedContainer; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index d1fa410..12741e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -109,6 +109,8 @@ protected ReentrantReadWriteLock.ReadLock readLock; protected ReentrantReadWriteLock.WriteLock writeLock; + volatile Priority priority = Priority.newInstance(0); + public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { this.labelManager = cs.getRMContext().getNodeLabelManager(); @@ -336,6 +338,9 @@ void setupQueueConfigs(Resource clusterResource) csContext.getConfiguration().getReservationContinueLook(); this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this); + + this.priority = csContext.getConfiguration().getQueuePriority( + getQueuePath()); } finally { writeLock.unlock(); } @@ -923,4 +928,9 @@ protected void appFinished() { this.writeLock.unlock(); } } + + @Override + public Priority getPriority() { + return this.priority; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index e30ec39..2e3ced5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; @@ -372,4 +373,10 @@ void apply(Resource cluster, */ public void validateSubmitApplication(ApplicationId applicationId, String userName, String queue) throws AccessControlException; + + /** + * Get priority of queue + * @return queue priority + */ + Priority getPriority(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 55ffe25..1bdaafd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -271,16 +271,6 @@ public void setResourceCalculator(ResourceCalculator rc) { } @Override - public Comparator getNonPartitionedQueueComparator() { - return CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR; - } - - @Override - public PartitionedQueueComparator getPartitionedQueueComparator() { - return CapacitySchedulerQueueManager.PARTITIONED_QUEUE_COMPARATOR; - } - - @Override public int getNumClusterNodes() { return nodeTracker.nodeCount(); } @@ -2487,4 +2477,98 @@ public int getAsyncSchedulingPendingBacklogs() { public CapacitySchedulerQueueManager getCapacitySchedulerQueueManager() { return this.queueManager; } + + /** + * Try to move a reserved container to a targetNode. + * If the targetNode is reserved by another application (other than this one). + * The previous reservation will be cancelled. + * + * @param toBeMovedContainer reserved container will be moved + * @param toBeUnreservedContainer the reserved container on the targetNode + * which will be unreserved, can be null + * @param targetNode targetNode + * @return true if move succeeded. Return false if the targetNode is reserved by + * a different container or move failed because of any other reasons. + */ + public boolean moveReservedContainer(RMContainer toBeMovedContainer, + RMContainer toBeUnreservedContainer, FiCaSchedulerNode targetNode) { + try { + writeLock.lock(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to move container=" + toBeMovedContainer + " to node=" + + targetNode.getNodeID()); + } + + FiCaSchedulerNode sourceNode = getNode(toBeMovedContainer.getNodeId()); + if (null == sourceNode) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to move reservation, cannot find source node=" + + toBeMovedContainer.getNodeId()); + } + return false; + } + + // Target node updated? + if (getNode(targetNode.getNodeID()) != targetNode) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Failed to move reservation, node updated or removed, moving " + + "cancelled."); + } + return false; + } + + // Target node's reservation status changed? + if (targetNode.getReservedContainer() != toBeUnreservedContainer) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Target node's reservation status changed, moving cancelled."); + } + return false; + } + + FiCaSchedulerApp app = getApplicationAttempt( + toBeMovedContainer.getApplicationAttemptId()); + if (null == app) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cannot find to-be-moved container's application=" + + toBeUnreservedContainer.getApplicationAttemptId()); + } + return false; + } + + if (toBeUnreservedContainer != null) { + FiCaSchedulerApp toBeUnreservedApp = getApplicationAttempt( + toBeUnreservedContainer.getApplicationAttemptId()); + if (null == toBeUnreservedApp) { + if (LOG.isDebugEnabled()) { + LOG.debug("Need to unreserved container, but target app=" + + toBeUnreservedContainer.getApplicationAttemptId() + + " cannot be found"); + } + return false; + } + + // Try to unreserve + toBeUnreservedApp.unreserve( + toBeUnreservedContainer.getReservedSchedulerKey(), targetNode, + toBeUnreservedContainer); + + // Regardless of unreserve status, check if the targetNode's reservation + // is cleaned + if (targetNode.getReservedContainer() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to unreserve container=" + toBeUnreservedContainer + .getContainerId()); + } + } + } + + // finally, move the reserved container + return app.moveReservation(toBeMovedContainer, sourceNode, targetNode); + } finally { + writeLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index bfaeba4..5aeeabe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -18,19 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.StringTokenizer; - import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -38,17 +27,21 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.ReservationACL; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; @@ -57,7 +50,17 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import com.google.common.collect.ImmutableSet; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.StringTokenizer; public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration { @@ -125,14 +128,21 @@ @Private public static final String MAXIMUM_ALLOCATION_VCORES = "maximum-allocation-vcores"; - + + /** + * Ordering policy of queues + */ public static final String ORDERING_POLICY = "ordering-policy"; - - public static final String FIFO_ORDERING_POLICY = "fifo"; - public static final String FAIR_ORDERING_POLICY = "fair"; + /* + * Ordering policy inside a leaf queue to sort apps + */ + public static final String FIFO_APP_ORDERING_POLICY = "fifo"; - public static final String DEFAULT_ORDERING_POLICY = FIFO_ORDERING_POLICY; + public static final String FAIR_APP_ORDERING_POLICY = "fair"; + + public static final String DEFAULT_APP_ORDERING_POLICY = + FIFO_APP_ORDERING_POLICY; @Private public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000; @@ -294,6 +304,11 @@ static String getQueuePrefix(String queue) { String queueName = PREFIX + queue + DOT; return queueName; } + + static String getQueueOrderingPolicyPrefix(String queue) { + String queueName = PREFIX + queue + DOT + ORDERING_POLICY + DOT; + return queueName; + } private String getNodeLabelPrefix(String queue, String label) { if (label.equals(CommonNodeLabelsManager.NO_LABEL)) { @@ -396,20 +411,23 @@ public int getUserLimit(String queue) { DEFAULT_USER_LIMIT); return userLimit; } - + + // TODO (wangda): We need to better distinguish app ordering policy and queue + // ordering policy's classname / configuration options, etc. And dedup code + // if possible. @SuppressWarnings("unchecked") - public OrderingPolicy getOrderingPolicy( + public OrderingPolicy getAppOrderingPolicy( String queue) { - String policyType = get(getQueuePrefix(queue) + ORDERING_POLICY, - DEFAULT_ORDERING_POLICY); + String policyType = get(getQueuePrefix(queue) + ORDERING_POLICY, + DEFAULT_APP_ORDERING_POLICY); OrderingPolicy orderingPolicy; - if (policyType.trim().equals(FIFO_ORDERING_POLICY)) { + if (policyType.trim().equals(FIFO_APP_ORDERING_POLICY)) { policyType = FifoOrderingPolicy.class.getName(); } - if (policyType.trim().equals(FAIR_ORDERING_POLICY)) { + if (policyType.trim().equals(FAIR_APP_ORDERING_POLICY)) { policyType = FairOrderingPolicy.class.getName(); } try { @@ -692,6 +710,20 @@ public Resource getMaximumAllocation() { return Resources.createResource(maximumMemory, maximumCores); } + @Private + public Priority getQueuePriority(String queue) { + String queuePolicyPrefix = getQueuePrefix(queue); + Priority pri = Priority.newInstance( + getInt(queuePolicyPrefix + "priority", 0)); + return pri; + } + + @Private + public void setQueuePriority(String queue, int priority) { + String queuePolicyPrefix = getQueuePrefix(queue); + setInt(queuePolicyPrefix + "priority", priority); + } + /** * Get the per queue setting for the maximum limit to allocate to * each container request. @@ -1162,4 +1194,95 @@ public int getGlobalMaximumApplicationsPerQueue() { getInt(QUEUE_GLOBAL_MAX_APPLICATION, (int) UNDEFINED); return maxApplicationsPerQueue; } + + /** + * Ordering policy inside a parent queue to sort queues + */ + + /** + * Less relative usage queue can get next resource, this is default + */ + public static final String QUEUE_UTILIZATION_ORDERING_POLICY = "utilization"; + + /** + * Combination of relative usage and priority + */ + public static final String QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY = + "utilization-priority"; + + public static final String DEFAULT_QUEUE_ORDERING_POLICY = + QUEUE_UTILIZATION_ORDERING_POLICY; + + + @Private + public void setQueueOrderingPolicy(String queue, String policy) { + set(getQueuePrefix(queue) + ORDERING_POLICY, policy); + } + + @Private + public QueueOrderingPolicy getQueueOrderingPolicy(String queue, + String parentPolicy) { + String defaultPolicy = parentPolicy; + if (null == defaultPolicy) { + defaultPolicy = DEFAULT_QUEUE_ORDERING_POLICY; + } + + String policyType = get(getQueuePrefix(queue) + ORDERING_POLICY, + defaultPolicy); + + QueueOrderingPolicy qop; + if (policyType.trim().equals(QUEUE_UTILIZATION_ORDERING_POLICY)) { + // Doesn't respect priority + qop = new PriorityUtilizationQueueOrderingPolicy(false); + } else if (policyType.trim().equals( + QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY)) { + qop = new PriorityUtilizationQueueOrderingPolicy(true); + } else { + String message = + "Unable to construct queue ordering policy=" + policyType + " queue=" + + queue; + throw new YarnRuntimeException(message); + } + + return qop; + } + + /** + * Get global configuration for ordering policies + */ + private String getOrderingPolicyGlobalConfigKey(String orderPolicyName, + String configKey) { + return PREFIX + ORDERING_POLICY + DOT + orderPolicyName + DOT + configKey; + } + + /** + * Global configurations of queue-priority-utilization ordering policy + */ + public boolean getPriorityUtilizationOrderingPolicyPreemptionEnabled() { + return getBoolean(getOrderingPolicyGlobalConfigKey( + QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY, "preemption.enabled"), + false); + } + + @VisibleForTesting + public void setPriorityUtilizationOrderingPolicyPreemptionEnabled( + boolean enabled) { + setBoolean(getOrderingPolicyGlobalConfigKey( + QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY, "preemption.enabled"), + enabled); + } + + public long getPriorityUtilizationOrderingPolicyPreemptionTimeout() { + return getLong(getOrderingPolicyGlobalConfigKey( + QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY, "preemption.timeout-ms"), + 60000L); + } + + @VisibleForTesting + public void setPriorityUtilizationOrderingPolicyPreemptionTimeoutMs( + long timeout) { + setLong(getOrderingPolicyGlobalConfigKey( + QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY, "preemption.timeout-ms"), + timeout); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 7d29619..373f53b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -60,10 +60,6 @@ Configuration getConf(); ResourceCalculator getResourceCalculator(); - - Comparator getNonPartitionedQueueComparator(); - - PartitionedQueueComparator getPartitionedQueueComparator(); FiCaSchedulerNode getNode(NodeId nodeId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index 6a3c08a..2cfe292 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -71,9 +71,6 @@ public int compare(CSQueue q1, CSQueue q2) { } }; - static final PartitionedQueueComparator PARTITIONED_QUEUE_COMPARATOR = - new PartitionedQueueComparator(); - static class QueueHook { public CSQueue hook(CSQueue queue) { return queue; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 18b38f4..e25cc62 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -182,7 +182,7 @@ protected void setupQueueConfigs(Resource clusterResource) CapacitySchedulerConfiguration conf = csContext.getConfiguration(); setOrderingPolicy( - conf.getOrderingPolicy(getQueuePath())); + conf.getAppOrderingPolicy(getQueuePath())); userLimit = conf.getUserLimit(getQueuePath()); userLimitFactor = conf.getUserLimitFactor(getQueuePath()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 946fca3..93dbee4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -42,11 +42,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; @@ -59,14 +65,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.TreeSet; @Private @Evolving @@ -74,23 +76,20 @@ private static final Log LOG = LogFactory.getLog(ParentQueue.class); - protected final Set childQueues; + protected final List childQueues; private final boolean rootQueue; - private final Comparator nonPartitionedQueueComparator; - private final PartitionedQueueComparator partitionQueueComparator; private volatile int numApplications; private final CapacitySchedulerContext scheduler; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + private QueueOrderingPolicy queueOrderingPolicy; + public ParentQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); this.scheduler = cs; - this.nonPartitionedQueueComparator = cs.getNonPartitionedQueueComparator(); - this.partitionQueueComparator = new PartitionedQueueComparator(); - this.rootQueue = (parent == null); float rawCapacity = cs.getConfiguration().getNonLabeledQueueCapacity(getQueuePath()); @@ -102,7 +101,7 @@ public ParentQueue(CapacitySchedulerContext cs, ". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE); } - this.childQueues = new TreeSet(nonPartitionedQueueComparator); + this.childQueues = new ArrayList<>(); setupQueueConfigs(cs.getClusterResource()); @@ -111,7 +110,14 @@ public ParentQueue(CapacitySchedulerContext cs, ", fullname=" + getQueuePath()); } - void setupQueueConfigs(Resource clusterResource) + // returns what is configured queue ordering policy + private String getQueueOrderingPolicyConfigName() { + return queueOrderingPolicy == null ? + null : + queueOrderingPolicy.getConfigName(); + } + + protected void setupQueueConfigs(Resource clusterResource) throws IOException { try { writeLock.lock(); @@ -129,13 +135,21 @@ void setupQueueConfigs(Resource clusterResource) } } + // Initialize queue ordering policy + queueOrderingPolicy = csContext.getConfiguration().getQueueOrderingPolicy( + getQueuePath(), parent == null ? + null : + ((ParentQueue) parent).getQueueOrderingPolicyConfigName()); + queueOrderingPolicy.setQueues(childQueues); + LOG.info(queueName + ", capacity=" + this.queueCapacities.getCapacity() + ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity() + ", maxCapacity=" + this.queueCapacities.getMaximumCapacity() + ", absoluteMaxCapacity=" + this.queueCapacities .getAbsoluteMaximumCapacity() + ", state=" + getState() + ", acls=" + aclsString + ", labels=" + labelStrBuilder.toString() + "\n" - + ", reservationsContinueLooking=" + reservationsContinueLooking); + + ", reservationsContinueLooking=" + reservationsContinueLooking + + ", orderingPolicy=" + getQueueOrderingPolicyConfigName()); } finally { writeLock.unlock(); } @@ -289,8 +303,8 @@ public void reinitialize(CSQueue newlyParsedQueue, // Re-configure existing child queues and add new ones // The CS has already checked to ensure all existing child queues are present! - Map currentChildQueues = getQueues(childQueues); - Map newChildQueues = getQueues( + Map currentChildQueues = getQueuesMap(childQueues); + Map newChildQueues = getQueuesMap( newlyParsedParentQueue.childQueues); for (Map.Entry e : newChildQueues.entrySet()) { String newChildQueueName = e.getKey(); @@ -325,7 +339,7 @@ public void reinitialize(CSQueue newlyParsedQueue, } } - Map getQueues(Set queues) { + private Map getQueuesMap(List queues) { Map queuesMap = new HashMap(); for (CSQueue queue : queues) { queuesMap.put(queue.getQueueName(), queue); @@ -667,13 +681,7 @@ private ResourceLimits getResourceLimitsOfChild(CSQueue child, private Iterator sortAndGetChildrenAllocationIterator( String partition) { - // Previously we keep a sorted list for default partition, it is not good - // when multi-threading scheduler is enabled, so to make a simpler code - // now re-sort queue every time irrespective to node partition. - partitionQueueComparator.setPartitionToLookAt(partition); - List childrenList = new ArrayList<>(childQueues); - Collections.sort(childrenList, partitionQueueComparator); - return childrenList.iterator(); + return queueOrderingPolicy.getAssignmentIterator(partition); } private CSAssignment assignContainersToChildQueues(Resource cluster, @@ -1070,4 +1078,8 @@ public void stopQueue() { this.writeLock.unlock(); } } + + public QueueOrderingPolicy getQueueOrderingPolicy() { + return queueOrderingPolicy; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PartitionedQueueComparator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PartitionedQueueComparator.java deleted file mode 100644 index 477c615..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PartitionedQueueComparator.java +++ /dev/null @@ -1,72 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; - -import java.util.Comparator; - -import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; - -public class PartitionedQueueComparator implements Comparator { - private String partitionToLookAt = null; - - public void setPartitionToLookAt(String partitionToLookAt) { - this.partitionToLookAt = partitionToLookAt; - } - - - @Override - public int compare(CSQueue q1, CSQueue q2) { - /* - * 1. Check accessible to given partition, if one queue accessible and - * the other not, accessible queue goes first. - */ - boolean q1Accessible = - q1.getAccessibleNodeLabels().contains(partitionToLookAt) - || q1.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY); - boolean q2Accessible = - q2.getAccessibleNodeLabels().contains(partitionToLookAt) - || q2.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY); - if (q1Accessible && !q2Accessible) { - return -1; - } else if (!q1Accessible && q2Accessible) { - return 1; - } - - /* - * - * 2. When two queue has same accessibility, check who will go first: - * Now we simply compare their used resource on the partition to lookAt - */ - float used1 = q1.getQueueCapacities().getUsedCapacity(partitionToLookAt); - float used2 = q2.getQueueCapacities().getUsedCapacity(partitionToLookAt); - if (Math.abs(used1 - used2) < 1e-6) { - // When used capacity is same, compare their guaranteed-capacity - float cap1 = q1.getQueueCapacities().getCapacity(partitionToLookAt); - float cap2 = q2.getQueueCapacities().getCapacity(partitionToLookAt); - - // when cap1 == cap2, we will compare queue's name - if (Math.abs(cap1 - cap2) < 1e-6) { - return q1.getQueueName().compareTo(q2.getQueueName()); - } - return Float.compare(cap2, cap1); - } - - return Float.compare(used1, used2); - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java new file mode 100644 index 0000000..fe60611 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.function.Supplier; + +/** + * For two queues with the same priority: + * - The queue with less relative used-capacity goes first - today’s behavior. + * - The default priority for all queues is 0 and equal. So, we get today’s + * behaviour at every level - the queue with the lowest used-capacity + * percentage gets the resources + * + * For two queues with different priorities: + * - Both the queues are under their guaranteed capacities: The queue with + * the higher priority gets resources + * - Both the queues are over or meeting their guaranteed capacities: + * The queue with the higher priority gets resources + * - One of the queues is over or meeting their guaranteed capacities and the + * other is under: The queue that is under its capacity guarantee gets the + * resources. + */ +public class PriorityUtilizationQueueOrderingPolicy implements QueueOrderingPolicy { + private List queues; + private boolean respectPriority; + + // This makes multiple threads can sort queues at the same time + // For different partitions. + private static ThreadLocal partitionToLookAt = + ThreadLocal.withInitial(new Supplier() { + @Override + public String get() { + return RMNodeLabelsManager.NO_LABEL; + } + }); + + /** + * Compare two queues with possibly different priority and assigned capacity, + * Will be used by preemption policy as well. + * + * @param relativeAssigned1 relativeAssigned1 + * @param relativeAssigned2 relativeAssigned2 + * @param priority1 p1 + * @param priority2 p2 + * @return compared result + */ + public static int compare(double relativeAssigned1, double relativeAssigned2, + int priority1, int priority2) { + if (priority1 == priority2) { + // The queue with less relative used-capacity goes first + return Double.compare(relativeAssigned1, relativeAssigned2); + } else { + // When priority is different: + if ((relativeAssigned1 < 1.0f && relativeAssigned2 < 1.0f) || ( + relativeAssigned1 >= 1.0f && relativeAssigned2 >= 1.0f)) { + // When both the queues are under their guaranteed capacities, + // Or both the queues are over or meeting their guaranteed capacities + // queue with higher used-capacity goes first + return Integer.compare(priority2, priority1); + } else { + // Otherwise, when one of the queues is over or meeting their + // guaranteed capacities and the other is under: The queue that is + // under its capacity guarantee gets the resources. + return Double.compare(relativeAssigned1, relativeAssigned2); + } + } + } + + /** + * Comparator that both looks at priority and utilization + */ + private class PriorityQueueComparator implements Comparator { + + @Override + public int compare(CSQueue q1, CSQueue q2) { + String p = partitionToLookAt.get(); + + int rc = compareQueueAccessToPartition(q1, q2, p); + if (0 != rc) { + return rc; + } + + float used1 = q1.getQueueCapacities().getUsedCapacity(p); + float used2 = q2.getQueueCapacities().getUsedCapacity(p); + int p1 = 0; + int p2 = 0; + if (respectPriority) { + p1 = q1.getPriority().getPriority(); + p2 = q2.getPriority().getPriority(); + } + + rc = PriorityUtilizationQueueOrderingPolicy.compare(used1, used2, p1, p2); + + // For queue with same used ratio / priority, queue with higher configured + // capacity goes first + if (0 == rc) { + float abs1 = q1.getQueueCapacities().getAbsoluteCapacity(p); + float abs2 = q2.getQueueCapacities().getAbsoluteCapacity(p); + return Float.compare(abs2, abs1); + } + + return rc; + } + + private int compareQueueAccessToPartition(CSQueue q1, CSQueue q2, String partition) { + // Everybody has access to default partition + if (StringUtils.equals(partition, RMNodeLabelsManager.NO_LABEL)) { + return 0; + } + + /* + * Check accessible to given partition, if one queue accessible and + * the other not, accessible queue goes first. + */ + boolean q1Accessible = + q1.getAccessibleNodeLabels() != null && q1.getAccessibleNodeLabels() + .contains(partition) || q1.getAccessibleNodeLabels().contains( + RMNodeLabelsManager.ANY); + boolean q2Accessible = + q2.getAccessibleNodeLabels() != null && q2.getAccessibleNodeLabels() + .contains(partition) || q2.getAccessibleNodeLabels().contains( + RMNodeLabelsManager.ANY); + if (q1Accessible && !q2Accessible) { + return -1; + } else if (!q1Accessible && q2Accessible) { + return 1; + } + + return 0; + } + } + + public PriorityUtilizationQueueOrderingPolicy(boolean respectPriority) { + this.respectPriority = respectPriority; + } + + @Override + public void setQueues(List queues) { + this.queues = queues; + } + + @Override + public Iterator getAssignmentIterator(String partition) { + // Since partitionToLookAt is a thread local variable, and every time we + // copy and sort queues, so it's safe for multi-threading environment. + PriorityUtilizationQueueOrderingPolicy.partitionToLookAt.set(partition); + List sortedQueue = new ArrayList<>(queues); + Collections.sort(sortedQueue, new PriorityQueueComparator()); + return sortedQueue.iterator(); + } + + @Override + public String getConfigName() { + if (respectPriority) { + return CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY; + } else{ + return CapacitySchedulerConfiguration.QUEUE_UTILIZATION_ORDERING_POLICY; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/QueueOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/QueueOrderingPolicy.java new file mode 100644 index 0000000..ff63c1a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/QueueOrderingPolicy.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; + +import java.util.Iterator; +import java.util.List; + +/** + * This will be used by + * {@link org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue} + * to decide allocation ordering of child queues. + */ +public interface QueueOrderingPolicy { + void setQueues(List queues); + + /** + * Return an iterator over the collection of CSQueues which orders + * them for container assignment. + */ + Iterator getAssignmentIterator(String partition); + + /** + * Returns configuration name (which will be used to set ordering policy + * @return configuration name + */ + String getConfigName(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 4329335..5e3b9be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -98,13 +99,13 @@ private final Set containersToPreempt = new HashSet(); - + private CapacityHeadroomProvider headroomProvider; private ResourceCalculator rc = new DefaultResourceCalculator(); private ResourceScheduler scheduler; - + private AbstractContainerAllocator containerAllocator; /** @@ -115,7 +116,7 @@ private Map toBeRemovedIncRequests = new ConcurrentHashMap<>(); - public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, + public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { this(applicationAttemptId, user, queue, activeUsersManager, rmContext, @@ -831,7 +832,7 @@ public NodeId getNodeIdToUnreserve( } return null; } - + public void setHeadroomProvider( CapacityHeadroomProvider headroomProvider) { try { @@ -841,7 +842,7 @@ public void setHeadroomProvider( writeLock.unlock(); } } - + @Override public Resource getHeadroom() { try { @@ -855,7 +856,7 @@ public Resource getHeadroom() { } } - + @Override public void transferStateFromPreviousAttempt( SchedulerApplicationAttempt appAttempt) { @@ -867,7 +868,7 @@ public void transferStateFromPreviousAttempt( writeLock.unlock(); } } - + public boolean reserveIncreasedContainer(SchedulerRequestKey schedulerKey, FiCaSchedulerNode node, RMContainer rmContainer, Resource reservedResource) { @@ -1148,4 +1149,85 @@ public int hashCode() { public boolean equals(Object o) { return super.equals(o); } + + /** + * Move reservation from one node to another + * Comparing to unreserve container on source node and reserve a new + * container on target node. This method will not create new RMContainer + * instance. And this operation is atomic. + * + * @param reservedContainer to be moved reserved container + * @param sourceNode source node + * @param targetNode target node + * + * @return succeeded or not + */ + public boolean moveReservation(RMContainer reservedContainer, + FiCaSchedulerNode sourceNode, FiCaSchedulerNode targetNode) { + try { + writeLock.lock(); + if (!sourceNode.getPartition().equals(targetNode.getPartition())) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Failed to move reservation, two nodes are in different partition"); + } + return false; + } + + // Update reserved container to node map + Map map = reservedContainers.get( + reservedContainer.getReservedSchedulerKey()); + if (null == map) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cannot find reserved container map."); + } + return false; + } + + // Check if reserved container changed + if (sourceNode.getReservedContainer() != reservedContainer) { + if (LOG.isDebugEnabled()) { + LOG.debug("To-be-moved container already updated."); + } + return false; + } + + // Check if target node is empty, acquires lock of target node to make sure + // reservation happens transactional + synchronized (targetNode){ + if (targetNode.getReservedContainer() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Target node is already occupied before moving"); + } + } + + try { + targetNode.reserveResource(this, + reservedContainer.getReservedSchedulerKey(), reservedContainer); + } catch (IllegalStateException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Reserve on target node failed, e=", e); + } + return false; + } + + // Set source node's reserved container to null + sourceNode.setReservedContainer(null); + map.remove(sourceNode.getNodeID()); + + // Update reserved container + reservedContainer.handle( + new RMContainerReservedEvent(reservedContainer.getContainerId(), + reservedContainer.getReservedResource(), targetNode.getNodeID(), + reservedContainer.getReservedSchedulerKey())); + + // Add to target node + map.put(targetNode.getNodeID(), reservedContainer); + + return true; + } + } finally { + writeLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index 83ff52b..4b6cf9b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -61,6 +62,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -476,6 +478,14 @@ private Resource parseResourceFromString(String p) { * -B... * * ";" splits queues, and there should no empty lines, no extra spaces + * + * For each queue, it has configurations to specify capacities (to each + * partition), format is: + *
+   * - (=[guaranteed max used pending], \
+   *               =[guaranteed max used pending])
+   *              {key1=value1,key2=value2};  // Additional configs
+   * 
*/ @SuppressWarnings({ "unchecked", "rawtypes" }) private ParentQueue mockQueueHierarchy(String queueExprs) { @@ -491,6 +501,10 @@ private ParentQueue mockQueueHierarchy(String queueExprs) { queue = parentQueue; List children = new ArrayList(); when(parentQueue.getChildQueues()).thenReturn(children); + QueueOrderingPolicy policy = mock(QueueOrderingPolicy.class); + when(policy.getConfigName()).thenReturn( + CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY); + when(parentQueue.getQueueOrderingPolicy()).thenReturn(policy); } else { LeafQueue leafQueue = mock(LeafQueue.class); final TreeSet apps = new TreeSet<>( @@ -623,11 +637,57 @@ private void setupQueue(CSQueue queue, String q, String[] queueExprArray, when(queue.getPreemptionDisabled()).thenReturn( conf.getPreemptionDisabled(queuePath, false)); + // Setup other queue configurations + Map otherConfigs = getOtherConfigurations( + queueExprArray[idx]); + if (otherConfigs.containsKey("priority")) { + when(queue.getPriority()).thenReturn( + Priority.newInstance(Integer.valueOf(otherConfigs.get("priority")))); + } else { + // set queue's priority to 0 by default + when(queue.getPriority()).thenReturn(Priority.newInstance(0)); + } + + // Setup disable preemption of queues + if (otherConfigs.containsKey("disable_preemption")) { + when(queue.getPreemptionDisabled()).thenReturn( + Boolean.valueOf(otherConfigs.get("disable_preemption"))); + } + nameToCSQueues.put(queueName, queue); when(cs.getQueue(eq(queueName))).thenReturn(queue); } /** + * Get additional queue's configurations + * @param queueExpr queue expr + * @return maps of configs + */ + private Map getOtherConfigurations(String queueExpr) { + if (queueExpr.contains("{")) { + int left = queueExpr.indexOf('{'); + int right = queueExpr.indexOf('}'); + + if (right > left) { + Map configs = new HashMap<>(); + + String subStr = queueExpr.substring(left + 1, right); + for (String kv : subStr.split(",")) { + if (kv.contains("=")) { + String key = kv.substring(0, kv.indexOf("=")); + String value = kv.substring(kv.indexOf("=") + 1); + configs.put(key, value); + } + } + + return configs; + } + } + + return Collections.EMPTY_MAP; + } + + /** * Level of a queue is how many "-" at beginning, root's level is 0 */ private int getLevel(String q) { @@ -737,6 +797,10 @@ public void checkPendingResource(CSQueue queue, String partition, int pending) { Assert.assertEquals(pending, ru.getPending(partition).getMemorySize()); } + public void checkPriority(CSQueue queue, int expectedPriority) { + Assert.assertEquals(expectedPriority, queue.getPriority().getPriority()); + } + public void checkReservedResource(CSQueue queue, String partition, int reserved) { ResourceUsage ru = queue.getQueueResourceUsage(); Assert.assertEquals(reserved, ru.getReserved(partition).getMemorySize()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java new file mode 100644 index 0000000..2b54d77 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java @@ -0,0 +1,361 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class TestPreemptionForQueueWithPriorities + extends ProportionalCapacityPreemptionPolicyMockFramework { + @Before + public void setup() { + super.setup(); + policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock); + } + + @Test + public void testPreemptionForHighestPriorityUnderutilizedQueue() + throws IOException { + /** + * The simplest test of queue with priorities, Queue structure is: + * + *

+     *        root
+     *       / |  \
+     *      a  b   c
+     * 
+ * + * For priorities + * - a=1 + * - b/c=2 + * + * So c will preempt more resource from a, till a reaches guaranteed + * resource. + */ + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[30 100 40 50]){priority=1};" + // a + "-b(=[30 100 59 50]){priority=2};" + // b + "-c(=[40 100 1 25]){priority=2}"; // c + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t(1,1,n1,,40,false);" + // app1 in a + "b\t(1,1,n1,,59,false);" + // app2 in b + "c\t(1,1,n1,,1,false);"; // app3 in c + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // 10 preempted from app1, 15 preempted from app2, and nothing preempted + // from app3 + verify(mDisp, times(10)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(15)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testPreemptionForLowestPriorityUnderutilizedQueue() + throws IOException { + /** + * Similar to above, make sure we can still make sure less utilized queue + * can get resource first regardless of priority. + * + * Queue structure is: + * + *
+     *        root
+     *       / |  \
+     *      a  b   c
+     * 
+ * + * For priorities + * - a=1 + * - b=2 + * - c=0 + * + * So c will preempt more resource from a, till a reaches guaranteed + * resource. + */ + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[30 100 40 50]){priority=1};" + // a + "-b(=[30 100 59 50]){priority=2};" + // b + "-c(=[40 100 1 25]){priority=0}"; // c + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t(1,1,n1,,40,false);" + // app1 in a + "b\t(1,1,n1,,59,false);" + // app2 in b + "c\t(1,1,n1,,1,false);"; // app3 in c + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // 10 preempted from app1, 15 preempted from app2, and nothing preempted + // from app3 + verify(mDisp, times(10)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(15)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testPreemptionWontHappenBetweenSatisfiedQueues() + throws IOException { + /** + * No preemption happen if a queue is already satisfied, regardless of + * priority + * + * Queue structure is: + * + *
+     *        root
+     *       / |  \
+     *      a  b   c
+     * 
+ * + * For priorities + * - a=1 + * - b=1 + * - c=2 + * + * When c is satisfied, it will not preempt any resource from other queues + */ + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[30 100 0 0]){priority=1};" + // a + "-b(=[30 100 40 50]){priority=1};" + // b + "-c(=[40 100 60 25]){priority=2}"; // c + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "b\t(1,1,n1,,40,false);" + // app1 in b + "c\t(1,1,n1,,60,false)"; // app2 in c + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // Nothing preempted + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testPreemptionForMultipleQueuesInTheSamePriorityBuckets() + throws IOException { + /** + * When a cluster has different priorities, each priority has multiple + * queues, preemption policy should try to balance resource between queues + * with same priority by ratio of their capacities + * + * Queue structure is: + * + *
+     * root
+     * - a (capacity=10), p=1
+     * - b (capacity=15), p=1
+     * - c (capacity=20), p=2
+     * - d (capacity=25), p=2
+     * - e (capacity=30), p=2
+     * 
+ */ + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[10 100 35 50]){priority=1};" + // a + "-b(=[15 100 25 50]){priority=1};" + // b + "-c(=[20 100 39 50]){priority=2};" + // c + "-d(=[25 100 0 0]){priority=2};" + // d + "-e(=[30 100 1 99]){priority=2}"; // e + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t(1,1,n1,,35,false);" + // app1 in a + "b\t(1,1,n1,,25,false);" + // app2 in b + "c\t(1,1,n1,,39,false);" + // app3 in c + "e\t(1,1,n1,,1,false)"; // app4 in e + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // 23 preempted from app1, 6 preempted from app2, and nothing preempted + // from app3/app4 + // (After preemption, a has 35 - 23 = 12, b has 25 - 6 = 19, so a:b after + // preemption is 1.58, close to 1.50) + verify(mDisp, times(23)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(6)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(4)))); + } + + @Test + public void testPreemptionForPriorityAndDisablePreemption() + throws IOException { + /** + * When a cluster has different priorities, each priority has multiple + * queues, preemption policy should try to balance resource between queues + * with same priority by ratio of their capacities. + * + * But also we need to make sure preemption disable will be honered + * regardless of priority. + * + * Queue structure is: + * + *
+     * root
+     * - a (capacity=10), p=1
+     * - b (capacity=15), p=1
+     * - c (capacity=20), p=2
+     * - d (capacity=25), p=2
+     * - e (capacity=30), p=2
+     * 
+ */ + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[10 100 35 50]){priority=1,disable_preemption=true};" + // a + "-b(=[15 100 25 50]){priority=1};" + // b + "-c(=[20 100 39 50]){priority=2};" + // c + "-d(=[25 100 0 0]){priority=2};" + // d + "-e(=[30 100 1 99]){priority=2}"; // e + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t(1,1,n1,,35,false);" + // app1 in a + "b\t(1,1,n1,,25,false);" + // app2 in b + "c\t(1,1,n1,,39,false);" + // app3 in c + "e\t(1,1,n1,,1,false)"; // app4 in e + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // We suppose to preempt some resource from A, but now since queueA + // disables preemption, so we need to preempt some resource from B and + // some from C even if C has higher priority than A + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(9)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, times(19)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(4)))); + } + + @Test + public void testPriorityPreemptionForHierarchicalOfQueues() + throws IOException { + /** + * When a queue has multiple hierarchy and different priorities: + * + *
+     * root
+     * - a (capacity=30), p=1
+     *   - a1 (capacity=40), p=1
+     *   - a2 (capacity=60), p=1
+     * - b (capacity=30), p=1
+     *   - b1 (capacity=50), p=1
+     *   - b1 (capacity=50), p=2
+     * - c (capacity=40), p=2
+     * 
+ */ + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[30 100 40 50]){priority=1};" + // a + "--a1(=[12 100 20 50]){priority=1};" + // a1 + "--a2(=[18 100 20 50]){priority=1};" + // a2 + "-b(=[30 100 59 50]){priority=1};" + // b + "--b1(=[15 100 30 50]){priority=1};" + // b1 + "--b2(=[15 100 29 50]){priority=2};" + // b2 + "-c(=[40 100 1 30]){priority=1}"; // c + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a1\t(1,1,n1,,20,false);" + // app1 in a1 + "a2\t(1,1,n1,,20,false);" + // app2 in a2 + "b1\t(1,1,n1,,30,false);" + // app3 in b1 + "b2\t(1,1,n1,,29,false);" + // app4 in b2 + "c\t(1,1,n1,,29,false)"; // app5 in c + + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // Preemption should first divide capacities between a / b, and b2 should + // get less preemption than b1 (because b2 has higher priority) + verify(mDisp, times(5)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, times(15)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + verify(mDisp, times(9)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(4)))); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index ad1490a..7bed28a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -219,7 +220,9 @@ public void testProportionalPreemption() { }; ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); - verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA))); + + // A will preempt guaranteed-allocated. + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA))); } @Test @@ -587,8 +590,8 @@ public void testOverCapacityImbalance() { }; ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); - // correct imbalance between over-capacity queues - verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA))); + // Will not preempt for over capacity queues + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); } @Test @@ -701,7 +704,7 @@ public void testZeroGuar() { public void testZeroGuarOverCap() { int[][] qData = new int[][] { // / A B C D E F - { 200, 100, 0, 99, 0, 100, 100 }, // abs + { 200, 100, 0, 100, 0, 100, 100 }, // abs { 200, 200, 200, 200, 200, 200, 200 }, // maxCap { 170, 170, 60, 20, 90, 0, 0 }, // used { 85, 50, 30, 10, 10, 20, 20 }, // pending @@ -712,14 +715,14 @@ public void testZeroGuarOverCap() { }; ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); - // we verify both that C has priority on B and D (has it has >0 guarantees) - // and that B and D are force to share their over capacity fairly (as they - // are both zero-guarantees) hence D sees some of its containers preempted - verify(mDisp, times(15)).handle(argThat(new IsPreemptionRequestFor(appC))); + // No preemption should happen because zero guaranteed queues should be + // treated as always satisfied, they should not preempt from each other. + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appD))); } - - @Test public void testHierarchicalLarge() { int[][] qData = new int[][] { @@ -1206,6 +1209,13 @@ ParentQueue mockParentQueue(ParentQueue p, int subqueues, when(pq.getChildQueues()).thenReturn(cqs); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); when(pq.getReadLock()).thenReturn(lock.readLock()); + + // Ordering policy + QueueOrderingPolicy policy = mock(QueueOrderingPolicy.class); + when(policy.getConfigName()).thenReturn( + CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY); + when(pq.getQueueOrderingPolicy()).thenReturn(policy); + when(pq.getPriority()).thenReturn(Priority.newInstance(0)); for (int i = 0; i < subqueues; ++i) { pqs.add(pq); } @@ -1269,6 +1279,7 @@ public Object answer(InvocationOnMock invocation) { } ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); when(lq.getReadLock()).thenReturn(lock.readLock()); + when(lq.getPriority()).thenReturn(Priority.newInstance(0)); p.getChildQueues().add(lq); return lq; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java index e31a889..1fd455a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java @@ -95,7 +95,7 @@ public void testNodePartitionPreemptionRespectGuaranteedCapacity() } @Test - public void testNodePartitionPreemptionRespectMaximumCapacity() + public void testNodePartitionPreemptionNotHappenBetweenSatisfiedQueues() throws IOException { /** * Queue structure is: @@ -114,8 +114,8 @@ public void testNodePartitionPreemptionRespectMaximumCapacity() * 2 apps in cluster. * app1 in b and app2 in c. * - * app1 uses 90x, and app2 use 10x. After preemption, app2 will preempt 10x - * from app1 because of max capacity. + * app1 uses 90x, and app2 use 10x. We don't expect preemption happen + * between them because all of them are satisfied */ String labelsConfig = "=100,true;" + // default partition @@ -139,9 +139,8 @@ public void testNodePartitionPreemptionRespectMaximumCapacity() buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); policy.editSchedule(); - // 30 preempted from app1, 30 preempted from app4, and nothing preempted - // from app2/app3 - verify(mDisp, times(20)).handle( + // No preemption happens + verify(mDisp, never()).handle( argThat(new IsPreemptionRequestFor(getAppAttemptId(1)))); verify(mDisp, never()).handle( argThat(new IsPreemptionRequestFor(getAppAttemptId(2)))); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java index 07d1eef..964a230 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java @@ -46,8 +46,8 @@ public void testBuilder() throws Exception { "root(=[200 200 100 100],red=[100 100 100 100],blue=[200 200 200 200]);" + //root "-a(=[100 200 100 100],red=[0 0 0 0],blue=[200 200 200 200]);" + // a "--a1(=[50 100 50 100],red=[0 0 0 0],blue=[100 200 200 0]);" + // a1 - "--a2(=[50 200 50 0],red=[0 0 0 0],blue=[100 200 0 200]);" + // a2 - "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])"; + "--a2(=[50 200 50 0],red=[0 0 0 0],blue=[100 200 0 200]){priority=2};" + // a2 + "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0]){priority=1,disable_preemption=true}"; String appsConfig= //queueName\t(priority,resource,host,expression,#repeat,reserved) // app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated) @@ -75,6 +75,7 @@ public void testBuilder() throws Exception { checkPendingResource(cs.getQueue("root"), "red", 100); checkAbsCapacities(cs.getQueue("root"), "blue", 1f, 1f, 1f); checkPendingResource(cs.getQueue("root"), "blue", 200); + checkPriority(cs.getQueue("root"), 0); // default // a checkAbsCapacities(cs.getQueue("a"), "", 0.5f, 1f, 0.5f); @@ -83,6 +84,7 @@ public void testBuilder() throws Exception { checkPendingResource(cs.getQueue("a"), "red", 0); checkAbsCapacities(cs.getQueue("a"), "blue", 1f, 1f, 1f); checkPendingResource(cs.getQueue("a"), "blue", 200); + checkPriority(cs.getQueue("a"), 0); // default // a1 checkAbsCapacities(cs.getQueue("a1"), "", 0.25f, 0.5f, 0.25f); @@ -91,6 +93,7 @@ public void testBuilder() throws Exception { checkPendingResource(cs.getQueue("a1"), "red", 0); checkAbsCapacities(cs.getQueue("a1"), "blue", 0.5f, 1f, 1f); checkPendingResource(cs.getQueue("a1"), "blue", 0); + checkPriority(cs.getQueue("a1"), 0); // default // a2 checkAbsCapacities(cs.getQueue("a2"), "", 0.25f, 1f, 0.25f); @@ -99,14 +102,18 @@ public void testBuilder() throws Exception { checkPendingResource(cs.getQueue("a2"), "red", 0); checkAbsCapacities(cs.getQueue("a2"), "blue", 0.5f, 1f, 0f); checkPendingResource(cs.getQueue("a2"), "blue", 200); + checkPriority(cs.getQueue("a2"), 2); + Assert.assertFalse(cs.getQueue("a2").getPreemptionDisabled()); - // b1 + // b checkAbsCapacities(cs.getQueue("b"), "", 0.5f, 1f, 0f); checkPendingResource(cs.getQueue("b"), "", 0); checkAbsCapacities(cs.getQueue("b"), "red", 1f, 1f, 1f); checkPendingResource(cs.getQueue("b"), "red", 100); checkAbsCapacities(cs.getQueue("b"), "blue", 0f, 0f, 0f); checkPendingResource(cs.getQueue("b"), "blue", 0); + checkPriority(cs.getQueue("b"), 1); + Assert.assertTrue(cs.getQueue("b").getPreemptionDisabled()); // Check ignored partitioned containers in queue Assert.assertEquals(100, ((LeafQueue) cs.getQueue("a1")) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java index bd9f615..aa6dbab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java @@ -46,7 +46,7 @@ final int GB = 1024; - Configuration conf; + CapacitySchedulerConfiguration conf; RMNodeLabelsManager mgr; @@ -54,13 +54,15 @@ @Before void setUp() throws Exception { - conf = new YarnConfiguration(); + conf = new CapacitySchedulerConfiguration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class); - conf = TestUtils.getConfigurationWithMultipleQueues(this.conf); + conf = (CapacitySchedulerConfiguration) TestUtils + .getConfigurationWithMultipleQueues(this.conf); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 100 * GB); // Set preemption related configurations conf.setInt(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 7382f3d..046ea4a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -110,9 +110,6 @@ public void setUp() throws IOException { thenReturn(Resources.createResource(16*GB, 32)); when(csContext.getClusterResource()). thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32)); - when(csContext.getNonPartitionedQueueComparator()). - thenReturn( - CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR); when(csContext.getResourceCalculator()). thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); @@ -276,9 +273,6 @@ public void testLimitsComputation() throws Exception { thenReturn(Resources.createResource(GB, 1)); when(csContext.getMaximumResourceCapability()). thenReturn(Resources.createResource(16*GB, 16)); - when(csContext.getNonPartitionedQueueComparator()). - thenReturn( - CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); @@ -581,9 +575,6 @@ public void testHeadroom() throws Exception { thenReturn(Resources.createResource(GB)); when(csContext.getMaximumResourceCapability()). thenReturn(Resources.createResource(16*GB)); - when(csContext.getNonPartitionedQueueComparator()). - thenReturn( - CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java index 1f87c53..2fa06e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java @@ -594,9 +594,6 @@ public void testHeadroom() throws Exception { .thenReturn(Resources.createResource(GB)); when(csContext.getMaximumResourceCapability()) .thenReturn(Resources.createResource(16 * GB)); - when(csContext.getNonPartitionedQueueComparator()) - .thenReturn( - CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); RMContext rmContext = TestUtils.getMockRMContext(); RMContext spyRMContext = spy(rmContext); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java index db6115c..ccf8098 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java @@ -167,8 +167,7 @@ public void testSurgicalPreemptionWithAvailableResource() * * 1) Two nodes (n1/n2) in the cluster, each of them has 20G. * - * 2) app1 submit to queue-a first, it asked 38 * 1G containers - * We will allocate 20 on n1 and 19 on n2. + * 2) app1 submit to queue-b, asks for 1G * 5 * * 3) app2 submit to queue-c, ask for one 4G container (for AM) * @@ -243,4 +242,225 @@ public void testSurgicalPreemptionWithAvailableResource() rm1.close(); } + + @Test(timeout = 60000) + public void testPriorityPreemptionWhenAllQueuesAreBelowGuaranteedCapacities() + throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + *
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * 
+ * + * 1) Two nodes (n1/n2) in the cluster, each of them has 20G. + * + * 2) app1 submit to queue-a first, it asked 38 * 1G containers + * We will allocate 20 on n1 and 19 on n2. + * + * 3) app2 submit to queue-c, ask for one 4G container (for AM) + * + * After preemption, we should expect: + * Preempt 3 containers from app1 and AM of app2 successfully allocated. + */ + conf.setPriorityUtilizationOrderingPolicyPreemptionEnabled(true); + conf.setPriorityUtilizationOrderingPolicyPreemptionTimeoutMs(1000); + conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT, + CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY); + + // Queue c has higher priority than a/b + conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1); + + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 20 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 6, new ArrayList<>()); + + // Do allocation for node1/node2 + for (int i = 0; i < 3; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // App1 should have 7 containers now, so the abs-used-cap of b is + // 7 / 40 = 17.5% < 20% (guaranteed) + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + // 4 from n1 and 3 from n2 + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()), + am1.getApplicationAttemptId(), 4); + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()), + am1.getApplicationAttemptId(), 3); + + // Submit app2 to queue-c and asks for a 1G container for AM + RMApp app2 = rm1.submitApp(18 * GB, "app", "user", null, "c"); + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + ApplicationAttemptId.newInstance(app2.getApplicationId(), 1)); + + while (cs.getNode(rmNode1.getNodeID()).getReservedContainer() == null) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Thread.sleep(10); + } + + // Call editSchedule immediately: containers are not selected + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + editPolicy.editSchedule(); + Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); + + // Sleep the timeout interval, we should be able to see containers selected + Thread.sleep(1000); + editPolicy.editSchedule(); + Assert.assertEquals(2, editPolicy.getToPreemptContainers().size()); + + // Call editSchedule again: selected containers are killed, and new AM + // container launched + editPolicy.editSchedule(); + + // Do allocation till reserved container allocated + while (cs.getNode(rmNode1.getNodeID()).getReservedContainer() != null) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Thread.sleep(10); + } + + waitNumberOfLiveContainersFromApp(schedulerApp2, 1); + + rm1.close(); + } + + @Test(timeout = 300000) + public void testPriorityPreemptionRequiresMoveReservation() + throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + *
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * 
+ * + * 1) 3 nodes in the cluster, 10G for each + * + * 2) app1 submit to queue-a first, it asked 2G each, + * it can get 2G on n1 (AM), 2 * 2G on n2 + * + * 3) app2 submit to queue-c, with 2G AM container (allocated on n3) + * app2 requires 9G resource, which will be reserved on n3 + * + * We should expect container unreserved from n3 and allocated on n1/n2 + */ + conf.setPriorityUtilizationOrderingPolicyPreemptionEnabled(true); + conf.setPriorityUtilizationOrderingPolicyPreemptionTimeoutMs(1000); + conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT, + CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY); + + // Queue c has higher priority than a/b + conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1); + + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); + MockNM nm3 = rm1.registerNode("h3:1234", 10 * GB); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + RMNode rmNode3 = rm1.getRMContext().getRMNodes().get(nm3.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "b"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 2 * GB, 2, new ArrayList<>()); + + // Do allocation for node2 twice + for (int i = 0; i < 2; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(3, schedulerApp1.getLiveContainers().size()); + + // 1 from n1 and 2 from n2 + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()), + am1.getApplicationAttemptId(), 1); + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()), + am1.getApplicationAttemptId(), 2); + + // Submit app2 to queue-c and asks for a 2G container for AM, on n3 + RMApp app2 = rm1.submitApp(2 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3); + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + ApplicationAttemptId.newInstance(app2.getApplicationId(), 1)); + + // Asks 1 * 9G container + am2.allocate("*", 9 * GB, 1, new ArrayList<>()); + + // Do allocation for node3 once + cs.handle(new NodeUpdateSchedulerEvent(rmNode3)); + + // Make sure container reserved on node3 + Assert.assertNotNull( + cs.getNode(rmNode3.getNodeID()).getReservedContainer()); + + // Call editSchedule immediately: nothing happens + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + editPolicy.editSchedule(); + Assert.assertNotNull( + cs.getNode(rmNode3.getNodeID()).getReservedContainer()); + + // Sleep the timeout interval, we should be able to see reserved container + // moved to n2 (n1 occupied by AM) + Thread.sleep(1000); + editPolicy.editSchedule(); + Assert.assertNull( + cs.getNode(rmNode3.getNodeID()).getReservedContainer()); + Assert.assertNotNull( + cs.getNode(rmNode2.getNodeID()).getReservedContainer()); + Assert.assertEquals(am2.getApplicationAttemptId(), cs.getNode( + rmNode2.getNodeID()).getReservedContainer().getApplicationAttemptId()); + + // Do it again, we should see containers marked to be preempt + editPolicy.editSchedule(); + Assert.assertEquals(2, editPolicy.getToPreemptContainers().size()); + + // Call editSchedule again: selected containers are killed + editPolicy.editSchedule(); + + // Do allocation till reserved container allocated + while (schedulerApp2.getLiveContainers().size() < 2) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + Thread.sleep(200); + } + + waitNumberOfLiveContainersFromApp(schedulerApp1, 1); + + rm1.close(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index cf91841..0b3172e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -98,9 +98,6 @@ public void setUp() throws Exception { Resources.createResource(16*GB, 32)); when(csContext.getClusterResource()). thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); - when(csContext.getNonPartitionedQueueComparator()). - thenReturn( - CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR); when(csContext.getResourceCalculator()). thenReturn(resourceComparator); when(csContext.getRMContext()).thenReturn(rmContext); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index 1ab29dd..46df197 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -691,4 +691,114 @@ public void testContinuousReservationLookingWhenUsedEqualsMax() throws Exception rm1.close(); } + + @Test(timeout = 60000) + public void testQueuePriorityOrdering() throws Exception { + CapacitySchedulerConfiguration newConf = + (CapacitySchedulerConfiguration) TestUtils + .getConfigurationWithMultipleQueues(conf); + + // Set ordering policy + newConf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT, + CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY); + + // Set maximum capacity of A to 20 + newConf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + ".a", 20); + newConf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1); + newConf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".b", 2); + newConf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".a", 3); + + MockRM rm1 = new MockRM(newConf); + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 100 * GB); + + // launch an app to queue A, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // launch an app to queue B, AM container should be launched in nm1 + RMApp app2 = rm1.submitApp(2 * GB, "app", "user", null, "b"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); + + // launch an app to queue C, AM container should be launched in nm1 + RMApp app3 = rm1.submitApp(2 * GB, "app", "user", null, "c"); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1); + + // Each application asks 10 * 5GB containers + am1.allocate("*", 5 * GB, 10, null); + am2.allocate("*", 5 * GB, 10, null); + am3.allocate("*", 5 * GB, 10, null); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + FiCaSchedulerApp schedulerApp1 = + cs.getApplicationAttempt(am1.getApplicationAttemptId()); + FiCaSchedulerApp schedulerApp2 = + cs.getApplicationAttempt(am2.getApplicationAttemptId()); + FiCaSchedulerApp schedulerApp3 = + cs.getApplicationAttempt(am3.getApplicationAttemptId()); + + // container will be allocated to am1 + // App1 will get 2 container allocated (plus AM container) + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Assert.assertEquals(2, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp3.getLiveContainers().size()); + + // container will be allocated to am1 again, + // App1 will get 3 container allocated (plus AM container) + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Assert.assertEquals(3, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp3.getLiveContainers().size()); + + // (Now usages of queues: a=12G (satisfied), b=2G, c=2G) + + // container will be allocated to am2 (since app1 reaches its guaranteed + // capacity) + // App2 will get 2 container allocated (plus AM container) + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Assert.assertEquals(3, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(2, schedulerApp2.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp3.getLiveContainers().size()); + + // Do this 3 times + // container will be allocated to am2 (since app1 reaches its guaranteed + // capacity) + // App2 will get 2 container allocated (plus AM container) + for (int i = 0; i < 3; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + Assert.assertEquals(3, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(5, schedulerApp2.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp3.getLiveContainers().size()); + + // (Now usages of queues: a=12G (satisfied), b=22G (satisfied), c=2G)) + + // Do this 10 times + for (int i = 0; i < 10; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + Assert.assertEquals(3, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(5, schedulerApp2.getLiveContainers().size()); + Assert.assertEquals(11, schedulerApp3.getLiveContainers().size()); + + // (Now usages of queues: a=12G (satisfied), b=22G (satisfied), + // c=52G (satisfied and no pending)) + + // Do this 20 times, we can only allocate 3 containers, 1 to A and 3 to B + for (int i = 0; i < 20; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + Assert.assertEquals(4, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(6, schedulerApp2.getLiveContainers().size()); + Assert.assertEquals(11, schedulerApp3.getLiveContainers().size()); + + // (Now usages of queues: a=17G (satisfied), b=27G (satisfied), c=52G)) + + rm1.close(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index b2695bc..7d9e4a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -175,9 +175,6 @@ private void setUpInternal(ResourceCalculator rC) throws Exception { thenReturn(Resources.createResource(16*GB, 32)); when(csContext.getClusterResource()). thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); - when(csContext.getNonPartitionedQueueComparator()). - thenReturn( - CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR); when(csContext.getResourceCalculator()). thenReturn(resourceCalculator); when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); @@ -414,7 +411,7 @@ public void testPolicyConfiguration() throws Exception { "testPolicyRoot" + System.currentTimeMillis(); OrderingPolicy comPol = - testConf.getOrderingPolicy(tproot); + testConf.getAppOrderingPolicy(tproot); } @@ -489,16 +486,16 @@ public void testFairConfiguration() throws Exception { "testPolicyRoot" + System.currentTimeMillis(); OrderingPolicy schedOrder = - testConf.getOrderingPolicy(tproot); + testConf.getAppOrderingPolicy(tproot); //override default to fair String policyType = CapacitySchedulerConfiguration.PREFIX + tproot + "." + CapacitySchedulerConfiguration.ORDERING_POLICY; testConf.set(policyType, - CapacitySchedulerConfiguration.FAIR_ORDERING_POLICY); + CapacitySchedulerConfiguration.FAIR_APP_ORDERING_POLICY); schedOrder = - testConf.getOrderingPolicy(tproot); + testConf.getAppOrderingPolicy(tproot); FairOrderingPolicy fop = (FairOrderingPolicy) schedOrder; assertFalse(fop.getSizeBasedWeight()); @@ -508,7 +505,7 @@ public void testFairConfiguration() throws Exception { FairOrderingPolicy.ENABLE_SIZE_BASED_WEIGHT; testConf.set(sbwConfig, "true"); schedOrder = - testConf.getOrderingPolicy(tproot); + testConf.getAppOrderingPolicy(tproot); fop = (FairOrderingPolicy) schedOrder; assertTrue(fop.getSizeBasedWeight()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index a36db44..a57d4e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -96,9 +96,6 @@ public void setUp() throws Exception { Resources.createResource(16*GB, 32)); when(csContext.getClusterResource()). thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); - when(csContext.getNonPartitionedQueueComparator()). - thenReturn( - CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR); when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); when(csContext.getResourceCalculator()). thenReturn(resourceComparator); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 3a154b2..5e6548b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -133,8 +133,6 @@ private void setup(CapacitySchedulerConfiguration csConf, Resources.createResource(16 * GB, 12)); when(csContext.getClusterResource()).thenReturn( Resources.createResource(100 * 16 * GB, 100 * 12)); - when(csContext.getNonPartitionedQueueComparator()).thenReturn( - CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); when(csContext.getRMContext()).thenReturn(rmContext); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/TestPriorityUtilizationQueueOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/TestPriorityUtilizationQueueOrderingPolicy.java new file mode 100644 index 0000000..e3c108a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/TestPriorityUtilizationQueueOrderingPolicy.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableTable; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestPriorityUtilizationQueueOrderingPolicy { + private List mockCSQueues(String[] queueNames, int[] priorities, + float[] utilizations, String partition) { + // sanity check + assert queueNames != null && priorities != null && utilizations != null + && queueNames.length > 0 && queueNames.length == priorities.length + && priorities.length == utilizations.length; + + List list = new ArrayList<>(); + for (int i = 0; i < queueNames.length; i++) { + CSQueue q = mock(CSQueue.class); + when(q.getQueueName()).thenReturn(queueNames[i]); + + QueueCapacities qc = new QueueCapacities(false); + qc.setUsedCapacity(partition, utilizations[i]); + + when(q.getQueueCapacities()).thenReturn(qc); + when(q.getPriority()).thenReturn(Priority.newInstance(priorities[i])); + + list.add(q); + } + + return list; + } + + private void verifyOrder(QueueOrderingPolicy orderingPolicy, String partition, + String[] expectedOrder) { + Iterator iter = orderingPolicy.getAssignmentIterator(partition); + int i = 0; + while (iter.hasNext()) { + CSQueue q = iter.next(); + Assert.assertEquals(expectedOrder[i], q.getQueueName()); + i++; + } + + assert i == expectedOrder.length; + } + + @Test + public void testUtilizationOrdering() { + PriorityUtilizationQueueOrderingPolicy policy = + new PriorityUtilizationQueueOrderingPolicy(false); + + // Case 1, one queue + policy.setQueues(mockCSQueues(new String[] { "a" }, new int[] { 0 }, + new float[] { 0.1f }, "")); + verifyOrder(policy, "", new String[] { "a" }); + + // Case 2, 2 queues + policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 0, 0 }, + new float[] { 0.1f, 0.0f }, "")); + verifyOrder(policy, "", new String[] { "b", "a" }); + + // Case 3, 3 queues + policy.setQueues( + mockCSQueues(new String[] { "a", "b", "c" }, new int[] { 0, 0, 0 }, + new float[] { 0.1f, 0.0f, 0.2f }, "")); + verifyOrder(policy, "", new String[] { "b", "a", "c" }); + + // Case 4, 3 queues, ignore priority + policy.setQueues( + mockCSQueues(new String[] { "a", "b", "c" }, new int[] { 2, 1, 0 }, + new float[] { 0.1f, 0.0f, 0.2f }, "")); + verifyOrder(policy, "", new String[] { "b", "a", "c" }); + + // Case 5, 3 queues, look at partition (default) + policy.setQueues( + mockCSQueues(new String[] { "a", "b", "c" }, new int[] { 2, 1, 0 }, + new float[] { 0.1f, 0.0f, 0.2f }, "x")); + verifyOrder(policy, "", new String[] { "a", "b", "c" }); + + // Case 5, 3 queues, look at partition (x) + policy.setQueues( + mockCSQueues(new String[] { "a", "b", "c" }, new int[] { 2, 1, 0 }, + new float[] { 0.1f, 0.0f, 0.2f }, "x")); + verifyOrder(policy, "x", new String[] { "b", "a", "c" }); + + // Case 6, 3 queues, with different accessibility to partition + List queues = mockCSQueues(new String[] { "a", "b", "c" }, new int[] { 2, 1, 0 }, + new float[] { 0.1f, 0.0f, 0.2f }, "x"); + // a can access "x" + when(queues.get(0).getAccessibleNodeLabels()).thenReturn(ImmutableSet.of("x", "y")); + // c can access "x" + when(queues.get(2).getAccessibleNodeLabels()).thenReturn(ImmutableSet.of("x", "y")); + policy.setQueues(queues); + verifyOrder(policy, "x", new String[] { "a", "c", "b" }); + } + + @Test + public void testPriorityUtilizationOrdering() { + PriorityUtilizationQueueOrderingPolicy policy = + new PriorityUtilizationQueueOrderingPolicy(true); + + // Case 1, one queue + policy.setQueues(mockCSQueues(new String[] { "a" }, new int[] { 1 }, + new float[] { 0.1f }, "")); + verifyOrder(policy, "", new String[] { "a" }); + + // Case 2, 2 queues, both under utilized, same priority + policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 1 }, + new float[] { 0.2f, 0.1f }, "")); + verifyOrder(policy, "", new String[] { "b", "a" }); + + // Case 3, 2 queues, both over utilized, same priority + policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 1 }, + new float[] { 1.1f, 1.2f }, "")); + verifyOrder(policy, "", new String[] { "a", "b" }); + + // Case 4, 2 queues, one under and one over, same priority + policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 1 }, + new float[] { 0.1f, 1.2f }, "")); + verifyOrder(policy, "", new String[] { "a", "b" }); + + // Case 5, 2 queues, both over utilized, different priority + policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 2 }, + new float[] { 1.1f, 1.2f }, "")); + verifyOrder(policy, "", new String[] { "b", "a" }); + + // Case 6, 2 queues, both under utilized, different priority + policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 2 }, + new float[] { 0.1f, 0.2f }, "")); + verifyOrder(policy, "", new String[] { "b", "a" }); + + // Case 7, 2 queues, one under utilized and one over utilized, + // different priority (1) + policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 2 }, + new float[] { 0.1f, 1.2f }, "")); + verifyOrder(policy, "", new String[] { "a", "b" }); + + // Case 8, 2 queues, one under utilized and one over utilized, + // different priority (1) + policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 2, 1 }, + new float[] { 0.1f, 1.2f }, "")); + verifyOrder(policy, "", new String[] { "a", "b" }); + + // Case 9, 2 queues, one under utilized and one meet, different priority (1) + policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 2 }, + new float[] { 0.1f, 1.0f }, "")); + verifyOrder(policy, "", new String[] { "a", "b" }); + + // Case 10, 2 queues, one under utilized and one meet, different priority (2) + policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 2, 1 }, + new float[] { 0.1f, 1.0f }, "")); + verifyOrder(policy, "", new String[] { "a", "b" }); + + // Case 11, 2 queues, one under utilized and one meet, same priority + policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 1 }, + new float[] { 0.1f, 1.0f }, "")); + verifyOrder(policy, "", new String[] { "a", "b" }); + + // Case 12, 2 queues, both meet, different priority + policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 2 }, + new float[] { 1.0f, 1.0f }, "")); + verifyOrder(policy, "", new String[] { "b", "a" }); + + // Case 13, 5 queues, different priority + policy.setQueues(mockCSQueues(new String[] { "a", "b", "c", "d", "e" }, + new int[] { 1, 2, 0, 0, 3 }, + new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, "")); + verifyOrder(policy, "", new String[] { "e", "c", "b", "a", "d" }); + + // Case 14, 5 queues, different priority, partition default; + policy.setQueues(mockCSQueues(new String[] { "a", "b", "c", "d", "e" }, + new int[] { 1, 2, 0, 0, 3 }, + new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, "x")); + verifyOrder(policy, "", new String[] { "e", "b", "a", "c", "d" }); + + // Case 15, 5 queues, different priority, partition x; + policy.setQueues(mockCSQueues(new String[] { "a", "b", "c", "d", "e" }, + new int[] { 1, 2, 0, 0, 3 }, + new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, "x")); + verifyOrder(policy, "x", new String[] { "e", "c", "b", "a", "d" }); + + // Case 16, 5 queues, different priority, partition x; and different + // accessibility + List queues = mockCSQueues(new String[] { "a", "b", "c", "d", "e" }, + new int[] { 1, 2, 0, 0, 3 }, + new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, "x"); + // Only a/d has access to x + when(queues.get(0).getAccessibleNodeLabels()).thenReturn( + ImmutableSet.of("x")); + when(queues.get(3).getAccessibleNodeLabels()).thenReturn( + ImmutableSet.of("x")); + policy.setQueues(queues); + verifyOrder(policy, "x", new String[] { "a", "d", "e", "c", "b" }); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java index 98cfdab..37fc3b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java @@ -20,23 +20,14 @@ import java.util.*; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeLabel; -import org.apache.hadoop.yarn.server.resourcemanager.MockAM; -import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; -import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.junit.Assert; import org.junit.Test; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; @@ -157,7 +148,7 @@ public void testSizeBasedWeightNotAffectAppActivation() throws Exception { // Define top-level queues String queuePath = CapacitySchedulerConfiguration.ROOT + ".default"; - csConf.setOrderingPolicy(queuePath, CapacitySchedulerConfiguration.FAIR_ORDERING_POLICY); + csConf.setOrderingPolicy(queuePath, CapacitySchedulerConfiguration.FAIR_APP_ORDERING_POLICY); csConf.setOrderingPolicyParameter(queuePath, FairOrderingPolicy.ENABLE_SIZE_BASED_WEIGHT, "true"); csConf.setMaximumApplicationMasterResourcePerQueuePercent(queuePath, 0.1f);