diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityMonitorPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityMonitorPolicy.java new file mode 100644 index 0000000..10113a0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityMonitorPolicy.java @@ -0,0 +1,809 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptableQueuePartitionEntity; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; + +/** + * This class implement a {@link SchedulingEditPolicy} that is designed to be + * paired with the {@code CapacityScheduler}. At every invocation of {@code + * editSchedule()} it computes the ideal amount of resources assigned to each + * queue (for each queue in the hierarchy), and determines whether preemption + * is needed. Overcapacity is distributed among queues in a weighted fair manner, + * where the weight is the amount of guaranteed capacity for the queue. + * Based on this ideal assignment it determines whether preemption is required + * and select a set of containers from each application that would be killed if + * the corresponding amount of resources is not freed up by the application. + * + * If not in {@code observeOnly} mode, it triggers preemption requests via a + * {@link ContainerPreemptEvent} that the {@code ResourceManager} will ensure + * to deliver to the application (or to execute). + * + * If the deficit of resources is persistent over a long enough period of time + * this policy will trigger forced termination of containers (again by generating + * {@link ContainerPreemptEvent}). + */ +public class ProportionalCapacityMonitorPolicy implements SchedulingEditPolicy { + + private static final Log LOG = + LogFactory.getLog(ProportionalCapacityMonitorPolicy.class); + + /** If true, run the policy but do not affect the cluster with preemption and + * kill events. */ + public static final String OBSERVE_ONLY = + "yarn.resourcemanager.monitor.capacity.preemption.observe_only"; + /** Time in milliseconds between invocations of this policy */ + public static final String MONITORING_INTERVAL = + "yarn.resourcemanager.monitor.capacity.preemption.monitoring_interval"; + /** Time in milliseconds between requesting a preemption from an application + * and killing the container. */ + public static final String WAIT_TIME_BEFORE_KILL = + "yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill"; + /** Maximum percentage of resources preempted in a single round. By + * controlling this value one can throttle the pace at which containers are + * reclaimed from the cluster. After computing the total desired preemption, + * the policy scales it back within this limit. */ + public static final String TOTAL_PREEMPTION_PER_ROUND = + "yarn.resourcemanager.monitor.capacity.preemption.total_preemption_per_round"; + /** Maximum amount of resources above the target capacity ignored for + * preemption. This defines a deadzone around the target capacity that helps + * prevent thrashing and oscillations around the computed target balance. + * High values would slow the time to capacity and (absent natural + * completions) it might prevent convergence to guaranteed capacity. */ + public static final String MAX_IGNORED_OVER_CAPACITY = + "yarn.resourcemanager.monitor.capacity.preemption.max_ignored_over_capacity"; + /** + * Given a computed preemption target, account for containers naturally + * expiring and preempt only this percentage of the delta. This determines + * the rate of geometric convergence into the deadzone ({@link + * #MAX_IGNORED_OVER_CAPACITY}). For example, a termination factor of 0.5 + * will reclaim almost 95% of resources within 5 * {@link + * #WAIT_TIME_BEFORE_KILL}, even absent natural termination. */ + public static final String NATURAL_TERMINATION_FACTOR = + "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor"; + + private final Clock clock; + private double maxIgnoredOverCapacity; + private CapacityScheduler scheduler; + private long monitoringInterval; + private ResourceCalculator rc; + private float percentageClusterPreemptionAllowed; + private double naturalTerminationFactor; + private Map> queueToPartitions = + new HashMap<>(); + private RMNodeLabelsManager nlm; + private PreemptionManager preemptionManager; + + public ProportionalCapacityMonitorPolicy() { + clock = new SystemClock(); + } + + public ProportionalCapacityMonitorPolicy(Configuration config, + RMContext context, CapacityScheduler scheduler) { + this(config, context, scheduler, new SystemClock()); + } + + public ProportionalCapacityMonitorPolicy(Configuration config, + RMContext context, CapacityScheduler scheduler, Clock clock) { + init(config, context, scheduler); + this.clock = clock; + } + + public void init(Configuration config, RMContext context, + PreemptableResourceScheduler sched) { + LOG.info("Preemption monitor:" + this.getClass().getCanonicalName()); + assert null == scheduler : "Unexpected duplicate call to init"; + if (!(sched instanceof CapacityScheduler)) { + throw new YarnRuntimeException("Class " + + sched.getClass().getCanonicalName() + " not instance of " + + CapacityScheduler.class.getCanonicalName()); + } + scheduler = (CapacityScheduler) sched; + preemptionManager = scheduler.getPreemptionManager(); + maxIgnoredOverCapacity = config.getDouble(MAX_IGNORED_OVER_CAPACITY, 0.1); + naturalTerminationFactor = + config.getDouble(NATURAL_TERMINATION_FACTOR, 1.0); + monitoringInterval = config.getLong(MONITORING_INTERVAL, 3000); + percentageClusterPreemptionAllowed = + config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 1.0); + rc = scheduler.getResourceCalculator(); + nlm = scheduler.getRMContext().getNodeLabelManager(); + } + + @VisibleForTesting + public ResourceCalculator getResourceCalculator() { + return rc; + } + + @Override + public void editSchedule() { + CSQueue root = scheduler.getRootQueue(); + Resource clusterResources = Resources.clone(scheduler.getClusterResource()); + containerBasedPreemptOrKill(root, clusterResources); + } + + /** + * This method selects and tracks containers to be preempted. If a container + * is in the target list for more than maxWaitTime it is killed. + * + * @param root the root of the CapacityScheduler queue hierarchy + * @param clusterResources the total amount of resources in the cluster + */ + private void containerBasedPreemptOrKill(CSQueue root, + Resource clusterResources) { + // All partitions to look at + Set allPartitions = new HashSet<>(); + allPartitions.addAll(scheduler.getRMContext() + .getNodeLabelManager().getClusterNodeLabelNames()); + allPartitions.add(RMNodeLabelsManager.NO_LABEL); + + // extract a summary of the queues from scheduler + synchronized (scheduler) { + queueToPartitions.clear(); + + for (String partitionToLookAt : allPartitions) { + cloneQueues(root, + nlm.getResourceByLabel(partitionToLookAt, clusterResources), + partitionToLookAt); + } + } + + // compute total preemption allowed + Resource totalPreemptionAllowed = Resources.multiply(clusterResources, + percentageClusterPreemptionAllowed); + + Set leafQueueNames = null; + for (String partition : allPartitions) { + TempQueuePerPartition tRoot = + getQueueByPartition(CapacitySchedulerConfiguration.ROOT, partition); + // compute the ideal distribution of resources among queues + // updates cloned queues state accordingly + tRoot.idealAssigned = tRoot.guaranteed; + + leafQueueNames = + recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed); + } + + // based on ideal allocation select containers to be preempted from each + // queue and each application + getContainersToPreempt(leafQueueNames, clusterResources); + } + + /** + * This method recursively computes the ideal assignment of resources to each + * level of the hierarchy. This ensures that leafs that are over-capacity but + * with parents within capacity will not be preempted. Preemptions are allowed + * within each subtree according to local over/under capacity. + * + * @param root the root of the cloned queue hierachy + * @param totalPreemptionAllowed maximum amount of preemption allowed + * @return a list of leaf queues updated with preemption targets + */ + private Set recursivelyComputeIdealAssignment( + TempQueuePerPartition root, Resource totalPreemptionAllowed) { + Set leafQueueNames = new HashSet<>(); + if (root.getChildren() != null && + root.getChildren().size() > 0) { + // compute ideal distribution at this level + computeIdealResourceDistribution(rc, root.getChildren(), + totalPreemptionAllowed, root.idealAssigned); + // compute recursively for lower levels and build list of leafs + for(TempQueuePerPartition t : root.getChildren()) { + leafQueueNames.addAll(recursivelyComputeIdealAssignment(t, + totalPreemptionAllowed)); + } + } else { + // we are in a leaf nothing to do, just return yourself + return ImmutableSet.of(root.queueName); + } + return leafQueueNames; + } + + /** + * This method computes (for a single level in the tree, passed as a {@code + * List}) the ideal assignment of resources. This is done + * recursively to allocate capacity fairly across all queues with pending + * demands. It terminates when no resources are left to assign, or when all + * demand is satisfied. + * + * @param rc resource calculator + * @param queues a list of cloned queues to be assigned capacity to (this is + * an out param) + * @param totalPreemptionAllowed total amount of preemption we allow + * @param tot_guarant the amount of capacity assigned to this pool of queues + */ + private void computeIdealResourceDistribution(ResourceCalculator rc, + List queues, Resource totalPreemptionAllowed, + Resource tot_guarant) { + + // qAlloc tracks currently active queues (will decrease progressively as + // demand is met) + List qAlloc = new ArrayList(queues); + // unassigned tracks how much resources are still to assign, initialized + // with the total capacity for this set of queues + Resource unassigned = Resources.clone(tot_guarant); + + // group queues based on whether they have non-zero guaranteed capacity + Set nonZeroGuarQueues = new HashSet(); + Set zeroGuarQueues = new HashSet(); + + for (TempQueuePerPartition q : qAlloc) { + if (Resources + .greaterThan(rc, tot_guarant, q.guaranteed, Resources.none())) { + nonZeroGuarQueues.add(q); + } else { + zeroGuarQueues.add(q); + } + } + + // first compute the allocation as a fixpoint based on guaranteed capacity + computeFixpointAllocation(rc, tot_guarant, nonZeroGuarQueues, unassigned, + false); + + // if any capacity is left unassigned, distributed among zero-guarantee + // queues uniformly (i.e., not based on guaranteed capacity, as this is zero) + if (!zeroGuarQueues.isEmpty() + && Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) { + computeFixpointAllocation(rc, tot_guarant, zeroGuarQueues, unassigned, + true); + } + + // based on ideal assignment computed above and current assignment we derive + // how much preemption is required overall + Resource totPreemptionNeeded = Resource.newInstance(0, 0); + for (TempQueuePerPartition t:queues) { + if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned)) { + Resources.addTo(totPreemptionNeeded, + Resources.subtract(t.current, t.idealAssigned)); + } + } + + // if we need to preempt more than is allowed, compute a factor (0 qAlloc, + Resource unassigned, boolean ignoreGuarantee) { + // Prior to assigning the unused resources, process each queue as follows: + // If current > guaranteed, idealAssigned = guaranteed + untouchable extra + // Else idealAssigned = current; + // Subtract idealAssigned resources from unassigned. + // If the queue has all of its needs met (that is, if + // idealAssigned >= current + pending), remove it from consideration. + // Sort queues from most under-guaranteed to most over-guaranteed. + TQComparator tqComparator = new TQComparator(rc, tot_guarant); + PriorityQueue orderedByNeed = + new PriorityQueue(10, tqComparator); + for (Iterator i = qAlloc.iterator(); i.hasNext();) { + TempQueuePerPartition q = i.next(); + if (Resources.greaterThan(rc, tot_guarant, q.current, q.guaranteed)) { + q.idealAssigned = Resources.add(q.guaranteed, q.untouchableExtra); + } else { + q.idealAssigned = Resources.clone(q.current); + } + Resources.subtractFrom(unassigned, q.idealAssigned); + // If idealAssigned < (current + pending), q needs more resources, so + // add it to the list of underserved queues, ordered by need. + Resource curPlusPend = Resources.add(q.current, q.pending); + if (Resources.lessThan(rc, tot_guarant, q.idealAssigned, curPlusPend)) { + orderedByNeed.add(q); + } + } + + //assign all cluster resources until no more demand, or no resources are left + while (!orderedByNeed.isEmpty() + && Resources.greaterThan(rc,tot_guarant, unassigned,Resources.none())) { + Resource wQassigned = Resource.newInstance(0, 0); + // we compute normalizedGuarantees capacity based on currently active + // queues + resetCapacity(rc, unassigned, orderedByNeed, ignoreGuarantee); + + // For each underserved queue (or set of queues if multiple are equally + // underserved), offer its share of the unassigned resources based on its + // normalized guarantee. After the offer, if the queue is not satisfied, + // place it back in the ordered list of queues, recalculating its place + // in the order of most under-guaranteed to most over-guaranteed. In this + // way, the most underserved queue(s) are always given resources first. + Collection underserved = + getMostUnderservedQueues(orderedByNeed, tqComparator); + for (Iterator i = underserved.iterator(); i + .hasNext();) { + TempQueuePerPartition sub = i.next(); + Resource wQavail = Resources.multiplyAndNormalizeUp(rc, + unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1)); + Resource wQidle = sub.offer(wQavail, rc, tot_guarant); + Resource wQdone = Resources.subtract(wQavail, wQidle); + + if (Resources.greaterThan(rc, tot_guarant, + wQdone, Resources.none())) { + // The queue is still asking for more. Put it back in the priority + // queue, recalculating its order based on need. + orderedByNeed.add(sub); + } + Resources.addTo(wQassigned, wQdone); + } + Resources.subtractFrom(unassigned, wQassigned); + } + } + + // Take the most underserved TempQueue (the one on the head). Collect and + // return the list of all queues that have the same idealAssigned + // percentage of guaranteed. + protected Collection getMostUnderservedQueues( + PriorityQueue orderedByNeed, TQComparator tqComparator) { + ArrayList underserved = new ArrayList(); + while (!orderedByNeed.isEmpty()) { + TempQueuePerPartition q1 = orderedByNeed.remove(); + underserved.add(q1); + TempQueuePerPartition q2 = orderedByNeed.peek(); + // q1's pct of guaranteed won't be larger than q2's. If it's less, then + // return what has already been collected. Otherwise, q1's pct of + // guaranteed == that of q2, so add q2 to underserved list during the + // next pass. + if (q2 == null || tqComparator.compare(q1,q2) < 0) { + return underserved; + } + } + return underserved; + } + + /** + * Computes a normalizedGuaranteed capacity based on active queues + * @param rc resource calculator + * @param clusterResource the total amount of resources in the cluster + * @param queues the list of queues to consider + */ + private void resetCapacity(ResourceCalculator rc, Resource clusterResource, + Collection queues, boolean ignoreGuar) { + Resource activeCap = Resource.newInstance(0, 0); + + if (ignoreGuar) { + for (TempQueuePerPartition q : queues) { + q.normalizedGuarantee = (float) 1.0f / ((float) queues.size()); + } + } else { + for (TempQueuePerPartition q : queues) { + Resources.addTo(activeCap, q.guaranteed); + } + for (TempQueuePerPartition q : queues) { + q.normalizedGuarantee = Resources.divide(rc, clusterResource, + q.guaranteed, activeCap); + } + } + } + + /** + * Based a resource preemption target drop reservations of containers and + * if necessary select containers for preemption from applications in each + * over-capacity queue. It uses {@link #NATURAL_TERMINATION_FACTOR} to + * account for containers that will naturally complete. + * + * @param queues set of leaf queues to preempt from + * @param clusterResource total amount of cluster resources + * @return a map of applciationID to set of containers to preempt + */ + private void getContainersToPreempt( + Set leafQueueNames, Resource clusterResource) { + // Loop all leaf queues + List queuePartitionsToUpdate = + new ArrayList<>(); + for (String queueName : leafQueueNames) { + new HashMap(); + for (TempQueuePerPartition qT : getQueuePartitions(queueName)) { + if (Resources.greaterThanOrEqual(rc, clusterResource, qT.current, + qT.idealAssigned)) { + + // we act only if we are violating balance by more than + // maxIgnoredOverCapacity + if (Resources.greaterThan(rc, clusterResource, qT.current, Resources + .multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) { + // we introduce a dampening factor naturalTerminationFactor that + // accounts for natural termination of containers + Resource resToObtain = + Resources.multiply(qT.toBePreempted, naturalTerminationFactor); + // Only add resToObtain when it >= 0 + if (Resources.greaterThan(rc, clusterResource, resToObtain, + Resources.none())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Queue=" + queueName + " partition=" + qT.partition + + " resource-to-obtain=" + resToObtain); + } + } + qT.toBePreempted = Resources.clone(resToObtain); + } else { + qT.toBePreempted = Resources.none(); + } + } + + queuePartitionsToUpdate.add(new PreemptableQueuePartitionEntity( + queueName, qT.partition, qT.idealAssigned, qT.toBePreempted)); + } + } + + preemptionManager.updatePreemptableQueuePartitions(queuePartitionsToUpdate); + } + + @Override + public long getMonitoringInterval() { + return monitoringInterval; + } + + @Override + public String getPolicyName() { + return "ProportionalCapacityPreemptionPolicy"; + } + + + /** + * This method walks a tree of CSQueue and clones the portion of the state + * relevant for preemption in TempQueue(s). It also maintains a pointer to + * the leaves. Finally it aggregates pending resources in each queue and rolls + * it up to higher levels. + * + * @param curQueue current queue which I'm looking at now + * @param partitionResource the total amount of resources in the cluster + * @return the root of the cloned queue hierarchy + */ + private TempQueuePerPartition cloneQueues(CSQueue curQueue, + Resource partitionResource, String partitionToLookAt) { + TempQueuePerPartition ret; + synchronized (curQueue) { + String queueName = curQueue.getQueueName(); + QueueCapacities qc = curQueue.getQueueCapacities(); + float absCap = qc.getAbsoluteCapacity(partitionToLookAt); + float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt); + boolean preemptionDisabled = curQueue.getPreemptionDisabled(); + + Resource current = curQueue.getQueueResourceUsage().getUsed( + partitionToLookAt); + Resource guaranteed = Resources.multiply(partitionResource, absCap); + Resource maxCapacity = Resources.multiply(partitionResource, absMaxCap); + + // when partition is a non-exclusive partition, the actual maxCapacity + // could more than specified maxCapacity + try { + if (!scheduler.getRMContext().getNodeLabelManager() + .isExclusiveNodeLabel(partitionToLookAt)) { + maxCapacity = + Resources.max(rc, partitionResource, maxCapacity, current); + } + } catch (IOException e) { + // This may cause by partition removed when running capacity monitor, + // just ignore the error, this will be corrected when doing next check. + } + + Resource extra = Resource.newInstance(0, 0); + if (Resources.greaterThan(rc, partitionResource, current, guaranteed)) { + extra = Resources.subtract(current, guaranteed); + } + if (curQueue instanceof LeafQueue) { + LeafQueue l = (LeafQueue) curQueue; + Resource pending = + l.getQueueResourceUsage().getPending(partitionToLookAt); + ret = new TempQueuePerPartition(queueName, current, pending, guaranteed, + maxCapacity, preemptionDisabled, partitionToLookAt); + if (preemptionDisabled) { + ret.untouchableExtra = extra; + } else { + ret.preemptableExtra = extra; + } + ret.setLeafQueue(l); + } else { + Resource pending = Resource.newInstance(0, 0); + ret = + new TempQueuePerPartition(curQueue.getQueueName(), current, pending, + guaranteed, maxCapacity, false, partitionToLookAt); + Resource childrensPreemptable = Resource.newInstance(0, 0); + for (CSQueue c : curQueue.getChildQueues()) { + TempQueuePerPartition subq = + cloneQueues(c, partitionResource, partitionToLookAt); + Resources.addTo(childrensPreemptable, subq.preemptableExtra); + ret.addChild(subq); + } + // untouchableExtra = max(extra - childrenPreemptable, 0) + if (Resources.greaterThanOrEqual( + rc, partitionResource, childrensPreemptable, extra)) { + ret.untouchableExtra = Resource.newInstance(0, 0); + } else { + ret.untouchableExtra = + Resources.subtract(extra, childrensPreemptable); + } + ret.preemptableExtra = Resources.min( + rc, partitionResource, childrensPreemptable, extra); + } + } + addTempQueuePartition(ret); + return ret; + } + + private void addTempQueuePartition(TempQueuePerPartition queuePartition) { + String queueName = queuePartition.queueName; + + Map queuePartitions; + if (null == (queuePartitions = queueToPartitions.get(queueName))) { + queuePartitions = new HashMap(); + queueToPartitions.put(queueName, queuePartitions); + } + queuePartitions.put(queuePartition.partition, queuePartition); + } + + /** + * Get queue partition by given queueName and partitionName + */ + private TempQueuePerPartition getQueueByPartition(String queueName, + String partition) { + Map partitionToQueues = null; + if (null == (partitionToQueues = queueToPartitions.get(queueName))) { + return null; + } + return partitionToQueues.get(partition); + } + + /** + * Get all queue partitions by given queueName + */ + private Collection getQueuePartitions(String queueName) { + if (!queueToPartitions.containsKey(queueName)) { + return null; + } + return queueToPartitions.get(queueName).values(); + } + + /** + * Temporary data-structure tracking resource availability, pending resource + * need, current utilization. This is per-queue-per-partition data structure + */ + static class TempQueuePerPartition { + final String queueName; + final Resource current; + final Resource pending; + final Resource guaranteed; + final Resource maxCapacity; + final String partition; + Resource idealAssigned; + Resource toBePreempted; + // For logging purpose + Resource actuallyPreempted; + Resource untouchableExtra; + Resource preemptableExtra; + + double normalizedGuarantee; + + final ArrayList children; + LeafQueue leafQueue; + boolean preemptionDisabled; + + TempQueuePerPartition(String queueName, Resource current, Resource pending, + Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled, + String partition) { + this.queueName = queueName; + this.current = current; + this.pending = pending; + this.guaranteed = guaranteed; + this.maxCapacity = maxCapacity; + this.idealAssigned = Resource.newInstance(0, 0); + this.actuallyPreempted = Resource.newInstance(0, 0); + this.toBePreempted = Resource.newInstance(0, 0); + this.normalizedGuarantee = Float.NaN; + this.children = new ArrayList(); + this.untouchableExtra = Resource.newInstance(0, 0); + this.preemptableExtra = Resource.newInstance(0, 0); + this.preemptionDisabled = preemptionDisabled; + this.partition = partition; + } + + public void setLeafQueue(LeafQueue l){ + assert children.size() == 0; + this.leafQueue = l; + } + + /** + * When adding a child we also aggregate its pending resource needs. + * @param q the child queue to add to this queue + */ + public void addChild(TempQueuePerPartition q) { + assert leafQueue == null; + children.add(q); + Resources.addTo(pending, q.pending); + } + + public void addChildren(ArrayList queues) { + assert leafQueue == null; + children.addAll(queues); + } + + + public ArrayList getChildren(){ + return children; + } + + // This function "accepts" all the resources it can (pending) and return + // the unused ones + Resource offer(Resource avail, ResourceCalculator rc, + Resource clusterResource) { + Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax( + Resources.subtract(maxCapacity, idealAssigned), + Resource.newInstance(0, 0)); + // remain = avail - min(avail, (max - assigned), (current + pending - assigned)) + Resource accepted = + Resources.min(rc, clusterResource, + absMaxCapIdealAssignedDelta, + Resources.min(rc, clusterResource, avail, Resources.subtract( + Resources.add(current, pending), idealAssigned))); + Resource remain = Resources.subtract(avail, accepted); + Resources.addTo(idealAssigned, accepted); + return remain; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(" NAME: " + queueName) + .append(" CUR: ").append(current) + .append(" PEN: ").append(pending) + .append(" GAR: ").append(guaranteed) + .append(" NORM: ").append(normalizedGuarantee) + .append(" IDEAL_ASSIGNED: ").append(idealAssigned) + .append(" IDEAL_PREEMPT: ").append(toBePreempted) + .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted) + .append(" UNTOUCHABLE: ").append(untouchableExtra) + .append(" PREEMPTABLE: ").append(preemptableExtra) + .append("\n"); + + return sb.toString(); + } + + public void printAll() { + LOG.info(this.toString()); + for (TempQueuePerPartition sub : this.getChildren()) { + sub.printAll(); + } + } + + public void assignPreemption(float scalingFactor, + ResourceCalculator rc, Resource clusterResource) { + if (Resources.greaterThan(rc, clusterResource, current, idealAssigned)) { + toBePreempted = Resources.multiply( + Resources.subtract(current, idealAssigned), scalingFactor); + } else { + // ToBePreempted will be a negative value, it means this queue needs to + // preempt resource from other queue + toBePreempted = Resources.subtract(current, idealAssigned); + } + } + + void appendLogString(StringBuilder sb) { + sb.append(queueName).append(", ") + .append(current.getMemory()).append(", ") + .append(current.getVirtualCores()).append(", ") + .append(pending.getMemory()).append(", ") + .append(pending.getVirtualCores()).append(", ") + .append(guaranteed.getMemory()).append(", ") + .append(guaranteed.getVirtualCores()).append(", ") + .append(idealAssigned.getMemory()).append(", ") + .append(idealAssigned.getVirtualCores()).append(", ") + .append(toBePreempted.getMemory()).append(", ") + .append(toBePreempted.getVirtualCores() ).append(", ") + .append(actuallyPreempted.getMemory()).append(", ") + .append(actuallyPreempted.getVirtualCores()); + } + + } + + static class TQComparator implements Comparator { + private ResourceCalculator rc; + private Resource clusterRes; + + TQComparator(ResourceCalculator rc, Resource clusterRes) { + this.rc = rc; + this.clusterRes = clusterRes; + } + + @Override + public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) { + if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) { + return -1; + } + if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) { + return 1; + } + return 0; + } + + // Calculates idealAssigned / guaranteed + // TempQueues with 0 guarantees are always considered the most over + // capacity and therefore considered last for resources. + private double getIdealPctOfGuaranteed(TempQueuePerPartition q) { + double pctOver = Integer.MAX_VALUE; + if (q != null && Resources.greaterThan( + rc, clusterRes, q.guaranteed, Resources.none())) { + pctOver = + Resources.divide(rc, clusterRes, q.idealAssigned, q.guaranteed); + } + return (pctOver); + } + } + + @VisibleForTesting + public Map> getQueuePartitions() { + return queueToPartitions; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java index dc0d9ba..ddaf54c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java @@ -86,4 +86,8 @@ boolean hasIncreaseReservation(); void cancelIncreaseReservation(); + + String getUser(); + + String getQueue(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 96c4f27..b149b33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -178,26 +178,33 @@ private List resourceRequests; private volatile boolean hasIncreaseReservation = false; + private String queueName; + private boolean saveNonAMContainerMetaInfo; + // Used by test only public RMContainerImpl(Container container, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, RMContext rmContext) { - this(container, appAttemptId, nodeId, user, rmContext, System - .currentTimeMillis(), ""); + this(container, appAttemptId, nodeId, user, rmContext, ""); } - - private boolean saveNonAMContainerMetaInfo; - + public RMContainerImpl(Container container, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, - RMContext rmContext, String nodeLabelExpression) { + RMContext rmContext, String queueName) { this(container, appAttemptId, nodeId, user, rmContext, System - .currentTimeMillis(), nodeLabelExpression); + .currentTimeMillis(), "", queueName); } public RMContainerImpl(Container container, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, - RMContext rmContext, long creationTime, String nodeLabelExpression) { + RMContext rmContext, String nodeLabelExpression, String queueName) { + this(container, appAttemptId, nodeId, user, rmContext, System + .currentTimeMillis(), nodeLabelExpression, queueName); + } + + public RMContainerImpl(Container container, ApplicationAttemptId appAttemptId, + NodeId nodeId, String user, RMContext rmContext, long creationTime, + String nodeLabelExpression, String queueName) { this.stateMachine = stateMachineFactory.make(this); this.containerId = container.getId(); this.nodeId = nodeId; @@ -211,6 +218,7 @@ public RMContainerImpl(Container container, this.isAMContainer = false; this.resourceRequests = null; this.nodeLabelExpression = nodeLabelExpression; + this.queueName = queueName; ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); @@ -763,4 +771,16 @@ public boolean hasIncreaseReservation() { public void cancelIncreaseReservation() { hasIncreaseReservation = false; } + + @Override + public String getUser() { + return user; + } + + @Override + public String getQueue() { + // TODO, queueName could be changed if we moved app from one queue to + // another, need handle this case also. + return queueName; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index abd72bf..bf2ef89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -431,7 +431,8 @@ public synchronized void recoverContainersOnNode( } // create container - RMContainer rmContainer = recoverAndCreateContainer(container, nm); + RMContainer rmContainer = recoverAndCreateContainer(container, nm, + schedulerAttempt.getQueueName()); // recover RMContainer rmContainer.handle(new RMContainerRecoverEvent(container.getContainerId(), @@ -478,7 +479,7 @@ public synchronized void recoverContainersOnNode( } private RMContainer recoverAndCreateContainer(NMContainerStatus status, - RMNode node) { + RMNode node, String queueName) { Container container = Container.newInstance(status.getContainerId(), node.getNodeID(), node.getHttpAddress(), status.getAllocatedResource(), @@ -488,7 +489,7 @@ private RMContainer recoverAndCreateContainer(NMContainerStatus status, RMContainer rmContainer = new RMContainerImpl(container, attemptId, node.getNodeID(), applications.get(attemptId.getApplicationId()).getUser(), rmContext, - status.getCreationTime(), status.getNodeLabelExpression()); + status.getCreationTime(), status.getNodeLabelExpression(), queueName); return rmContainer; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index c5f8def..bc531d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -389,9 +389,9 @@ public synchronized RMContainer reserve(SchedulerNode node, Priority priority, RMContainer rmContainer, Container container) { // Create RMContainer if necessary if (rmContainer == null) { - rmContainer = - new RMContainerImpl(container, getApplicationAttemptId(), - node.getNodeID(), appSchedulingInfo.getUser(), rmContext); + rmContainer = new RMContainerImpl(container, getApplicationAttemptId(), + node.getNodeID(), appSchedulingInfo.getUser(), rmContext, + getQueueName()); attemptResourceUsage.incReserved(node.getPartition(), container.getResource()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index e8e1238..70ddf31 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -202,6 +202,10 @@ public synchronized void decreaseContainer(ContainerId containerId, public synchronized Resource getAvailableResource() { return this.availableResource; } + + public synchronized void setAvailableResource(Resource availableResource) { + this.availableResource = availableResource; + } /** * Get used resources on the node. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index b40ac27..dc00942 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -46,8 +46,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -609,4 +609,12 @@ public Priority getDefaultApplicationPriority() { // TODO add dummy implementation return null; } + + @Override + public CSAssignment assignContainers(Resource clusterResource, + FiCaSchedulerNode node, ResourceLimits resourceLimits, + SchedulingMode schedulingMode) { + return assignContainers(clusterResource, node, resourceLimits, + schedulingMode, false); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index e90deeb..7f2d166 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -200,6 +200,10 @@ public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits resourceLimits, SchedulingMode schedulingMode); + public CSAssignment assignContainers(Resource clusterResource, + FiCaSchedulerNode node, ResourceLimits resourceLimits, + SchedulingMode schedulingMode, boolean dryRun); + /** * A container assigned to the queue has completed. * @param clusterResource the resource of the cluster diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 782ed03..a09629e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -105,6 +105,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -139,6 +140,7 @@ private static final Log LOG = LogFactory.getLog(CapacityScheduler.class); private YarnAuthorizationProvider authorizer; + private PreemptionManager preemptionManager; private CSQueue root; // timeout to join when we stop this service @@ -236,6 +238,7 @@ public Configuration getConf() { public CapacityScheduler() { super(CapacityScheduler.class.getName()); + preemptionManager = new PreemptionManager(); } @Override @@ -302,6 +305,7 @@ private synchronized void initScheduler(Configuration configuration) throws this.labelManager = rmContext.getNodeLabelManager(); authorizer = YarnAuthorizationProvider.getInstance(yarnConf); initializeQueues(this.conf); + preemptionManager.queueRefreshed(root); scheduleAsynchronously = this.conf.getScheduleAynschronously(); asyncScheduleInterval = @@ -311,6 +315,9 @@ private synchronized void initScheduler(Configuration configuration) throws asyncSchedulerThread = new AsyncScheduleThread(this); } + // Init preemption manager + preemptionManager.init(this.calculator); + LOG.info("Initialized CapacityScheduler with " + "calculator=" + getResourceCalculator().getClass() + ", " + "minimumAllocation=<" + getMinimumResourceCapability() + ">, " + @@ -367,6 +374,9 @@ public void serviceStop() throws Exception { refreshMaximumAllocation(this.conf.getMaximumAllocation()); throw new IOException("Failed to re-init queues", t); } + + // Notify preemption manager when queue refreshed. + preemptionManager.queueRefreshed(root); } long getAsyncScheduleInterval() { @@ -375,6 +385,18 @@ long getAsyncScheduleInterval() { private final static Random random = new Random(System.currentTimeMillis()); + private static boolean dryrunForUpcomingAllocation(FiCaSchedulerNode node) { + // Do we need a dryrun to check preemption? + // TODO, now hard coded dryrun to once per 10 ticks. + long tick = node.addAndGetTick(); + boolean dryrun = false; + if (tick % 10 == 0) { + dryrun = true; + } + + return dryrun; + } + /** * Schedule on all nodes by starting at a random point. * @param cs @@ -386,12 +408,22 @@ static void schedule(CapacityScheduler cs) { int start = random.nextInt(nodes.size()); for (FiCaSchedulerNode node : nodes) { if (current++ >= start) { - cs.allocateContainersToNode(node); + boolean dryrun = CapacityScheduler.dryrunForUpcomingAllocation(node); + if (dryrun) { + cs.allocateContainersToNode(node, true); + } + + cs.allocateContainersToNode(node, false); } } // Now, just get everyone to be safe for (FiCaSchedulerNode node : nodes) { - cs.allocateContainersToNode(node); + boolean dryrun = CapacityScheduler.dryrunForUpcomingAllocation(node); + if (dryrun) { + cs.allocateContainersToNode(node, true); + } + + cs.allocateContainersToNode(node, false); } try { Thread.sleep(cs.getAsyncScheduleInterval()); @@ -844,6 +876,10 @@ private synchronized void doneApplicationAttempt( LOG.info("Application Attempt " + applicationAttemptId + " is done." + " finalState=" + rmAppAttemptFinalState); + // Tell preemption manager to unmark containers if the app is on the + // demanding apps list. + preemptionManager.unmarkDemandingApp(applicationAttemptId); + FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId); SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); @@ -1161,9 +1197,24 @@ private void updateSchedulerHealth(long now, FiCaSchedulerNode node, schedulerHealth.updateSchedulerRunDetails(now, assignment .getAssignmentInformation().getAllocated(), assignment .getAssignmentInformation().getReserved()); - } + } + + private synchronized void killPreemptedContainers() { + Set toKillContainerIds = + preemptionManager.pullContainersToKill(); + + for (ContainerId id : toKillContainerIds) { + killContainer(getRMContainer(id)); + } + } - private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { + @VisibleForTesting + public synchronized void allocateContainersToNode(FiCaSchedulerNode node, + boolean dryrun) { + if (!dryrun) { + killPreemptedContainers(); + } + if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext.isSchedulerReadyForAllocatingContainers()) { return; @@ -1197,25 +1248,33 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { // resources, should consider labeled resources as well. new ResourceLimits(labelManager.getResourceByLabel( RMNodeLabelsManager.NO_LABEL, clusterResource)), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, dryrun); if (assignment.isFulfilledReservation()) { - CSAssignment tmp = - new CSAssignment(reservedContainer.getReservedResource(), - assignment.getType()); - Resources.addTo(assignment.getAssignmentInformation().getAllocated(), - reservedContainer.getReservedResource()); - tmp.getAssignmentInformation().addAllocationDetails( - reservedContainer.getContainerId(), queue.getQueuePath()); - tmp.getAssignmentInformation().incrAllocations(); - updateSchedulerHealth(lastNodeUpdateTime, node, tmp); - schedulerHealth.updateSchedulerFulfilledReservationCounts(1); + if (!dryrun) { + CSAssignment tmp = new CSAssignment( + reservedContainer.getReservedResource(), assignment.getType()); + Resources.addTo(assignment.getAssignmentInformation().getAllocated(), + reservedContainer.getReservedResource()); + tmp.getAssignmentInformation().addAllocationDetails( + reservedContainer.getContainerId(), queue.getQueuePath()); + tmp.getAssignmentInformation().incrAllocations(); + updateSchedulerHealth(lastNodeUpdateTime, node, tmp); + schedulerHealth.updateSchedulerFulfilledReservationCounts(1); + } } } + // TODO: Should check if we can allocate resource if reserved container + // could be preempted. Currently logic cannot preempt reserved container. + // One approach in my mind is: + // If the reserved container from a debtor queue, we will set the reserved + // container to null temporally. If we can allocate resource to DemandingApp + // after unreserve the container, we will drop the reservation immediately. + // Try to schedule more if there are no reservations to fulfill if (node.getReservedContainer() == null) { if (calculator.computeAvailableContainers(node.getAvailableResource(), - minimumAllocation) > 0) { + minimumAllocation) > 0 || dryrun) { if (LOG.isDebugEnabled()) { LOG.debug("Trying to schedule on node: " + node.getNodeName() + ", available: " + node.getAvailableResource()); @@ -1228,10 +1287,12 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { // resources, should consider labeled resources as well. new ResourceLimits(labelManager.getResourceByLabel( RMNodeLabelsManager.NO_LABEL, clusterResource)), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, dryrun); if (Resources.greaterThan(calculator, clusterResource, assignment.getResource(), Resources.none())) { - updateSchedulerHealth(lastNodeUpdateTime, node, assignment); + if (!dryrun) { + updateSchedulerHealth(lastNodeUpdateTime, node, assignment); + } return; } @@ -1313,12 +1374,19 @@ public void handle(SchedulerEvent event) { break; case NODE_UPDATE: { - NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; + NodeUpdateSchedulerEvent nodeUpdatedEvent = + (NodeUpdateSchedulerEvent) event; RMNode node = nodeUpdatedEvent.getRMNode(); setLastNodeUpdateTime(Time.now()); nodeUpdate(node); if (!scheduleAsynchronously) { - allocateContainersToNode(getNode(node.getNodeID())); + FiCaSchedulerNode schedulerNode = getNode(node.getNodeID()); + boolean dryrun = CapacityScheduler.dryrunForUpcomingAllocation(schedulerNode); + if (dryrun) { + allocateContainersToNode(schedulerNode, true); + } + + allocateContainersToNode(schedulerNode, false); } } break; @@ -1493,6 +1561,8 @@ protected synchronized void completedContainer(RMContainer rmContainer, return; } + // tell preemptionManager to unmark if a container on the preemption list + preemptionManager.unmarkToPreemptContainer(rmContainer.getContainerId()); Container container = rmContainer.getContainer(); // Get the application for the finished container @@ -2015,4 +2085,8 @@ public void updateApplicationPriority(Priority newPriority, + rmApp.getQueue() + " for application: " + applicationId + " for the user: " + rmApp.getUser()); } + + public PreemptionManager getPreemptionManager() { + return preemptionManager; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index e5ac072..0c22ab4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -810,7 +810,13 @@ private void handleExcessReservedContainer(Resource clusterResource, @Override public synchronized CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits currentResourceLimits, - SchedulingMode schedulingMode) { + SchedulingMode schedulingMode, boolean dryrun) { + // Overwrite resource limit to INF, when we doing preemption check (dryrun = true), + // we will check calculated max-preemptable resource, which already + // considered parents' guaranteed/max resource. + if (dryrun) { + currentResourceLimits.setLimit(Resources.unbounded()); + } updateCurrentResourceLimits(currentResourceLimits, clusterResource); if (LOG.isDebugEnabled()) { @@ -824,11 +830,13 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerApp application = getApplication(reservedContainer.getApplicationAttemptId()); synchronized (application) { - CSAssignment assignment = - application.assignContainers(clusterResource, node, - currentResourceLimits, schedulingMode, reservedContainer); - handleExcessReservedContainer(clusterResource, assignment, node, - application); + CSAssignment assignment = application.assignContainers(clusterResource, + node, currentResourceLimits, schedulingMode, reservedContainer, + dryrun); + if (!dryrun) { + handleExcessReservedContainer(clusterResource, assignment, node, + application); + } return assignment; } } @@ -875,7 +883,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // Try to schedule CSAssignment assignment = application.assignContainers(clusterResource, node, - currentResourceLimits, schedulingMode, null); + currentResourceLimits, schedulingMode, null, dryrun); if (LOG.isDebugEnabled()) { LOG.debug("post-assignContainers for application " @@ -885,22 +893,26 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // Did we schedule or reserve a container? Resource assigned = assignment.getResource(); - - handleExcessReservedContainer(clusterResource, assignment, node, - application); + + if (!dryrun) { + handleExcessReservedContainer(clusterResource, assignment, node, + application); + } if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { - // Get reserved or allocated container from application - RMContainer reservedOrAllocatedRMContainer = - application.getRMContainer(assignment.getAssignmentInformation() - .getFirstAllocatedOrReservedContainerId()); - - // Book-keeping - // Note: Update headroom to account for current allocation too... - allocateResource(clusterResource, application, assigned, - node.getPartition(), reservedOrAllocatedRMContainer, - assignment.isIncreasedAllocation()); + if (!dryrun) { + // Get reserved or allocated container from application + RMContainer reservedOrAllocatedRMContainer = application.getRMContainer( + assignment.getAssignmentInformation() + .getFirstAllocatedOrReservedContainerId()); + + // Book-keeping + // Note: Update headroom to account for current allocation too... + allocateResource(clusterResource, application, assigned, + node.getPartition(), reservedOrAllocatedRMContainer, + assignment.isIncreasedAllocation()); + } // Done return assignment; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index badab72..8c70706 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -381,7 +381,7 @@ private synchronized void removeApplication(ApplicationId applicationId, @Override public synchronized CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits resourceLimits, - SchedulingMode schedulingMode) { + SchedulingMode schedulingMode, boolean dryrun) { // if our queue cannot access this node, just return if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY && !accessibleToPartition(node.getPartition())) { @@ -402,100 +402,131 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); + Resource nodeAvailClone = Resources.clone(node.getAvailableResource()); - while (canAssign(clusterResource, node)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to assign containers to child-queue of " - + getQueueName()); - } - - // Are we over maximum-capacity for this queue? - // This will also consider parent's limits and also continuous reservation - // looking - if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), - resourceLimits, Resources.createResource( - getMetrics().getReservedMB(), getMetrics() - .getReservedVirtualCores()), schedulingMode)) { - break; - } - - // Schedule - CSAssignment assignedToChild = - assignContainersToChildQueues(clusterResource, node, resourceLimits, - schedulingMode); - assignment.setType(assignedToChild.getType()); - - // Done if no child-queue assigned anything - if (Resources.greaterThan( - resourceCalculator, clusterResource, - assignedToChild.getResource(), Resources.none())) { - // Track resource utilization for the parent-queue - super.allocateResource(clusterResource, assignedToChild.getResource(), - node.getPartition(), assignedToChild.isIncreasedAllocation()); - - // Track resource utilization in this pass of the scheduler - Resources - .addTo(assignment.getResource(), assignedToChild.getResource()); - Resources.addTo(assignment.getAssignmentInformation().getAllocated(), - assignedToChild.getAssignmentInformation().getAllocated()); - Resources.addTo(assignment.getAssignmentInformation().getReserved(), - assignedToChild.getAssignmentInformation().getReserved()); - assignment.getAssignmentInformation().incrAllocations( - assignedToChild.getAssignmentInformation().getNumAllocations()); - assignment.getAssignmentInformation().incrReservations( - assignedToChild.getAssignmentInformation().getNumReservations()); - assignment - .getAssignmentInformation() - .getAllocationDetails() - .addAll( - assignedToChild.getAssignmentInformation().getAllocationDetails()); - assignment - .getAssignmentInformation() - .getReservationDetails() - .addAll( - assignedToChild.getAssignmentInformation() - .getReservationDetails()); - assignment.setIncreasedAllocation(assignedToChild - .isIncreasedAllocation()); - - LOG.info("assignedContainer" + - " queue=" + getQueueName() + - " usedCapacity=" + getUsedCapacity() + - " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + - " used=" + queueUsage.getUsed() + - " cluster=" + clusterResource); + try { + while (canAssign(clusterResource, node, dryrun)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to assign containers to child-queue of " + + getQueueName()); + } - } else { - break; - } + // Are we over maximum-capacity for this queue? + // This will also consider parent's limits and also continuous + // reservation looking + + // Don't check this if we're parent queue, when we doing preemption, + // we will check calculated max-preemptable resource, which already + // considered parents' guaranteed/max resource. + if (!dryrun && !super.canAssignToThisQueue(clusterResource, node.getPartition(), + resourceLimits, + Resources.createResource(getMetrics().getReservedMB(), + getMetrics().getReservedVirtualCores()), + schedulingMode)) { + break; + } - if (LOG.isDebugEnabled()) { - LOG.debug("ParentQ=" + getQueueName() - + " assignedSoFarInThisIteration=" + assignment.getResource() - + " usedCapacity=" + getUsedCapacity() - + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity()); - } + // Schedule + CSAssignment assignedToChild = assignContainersToChildQueues( + clusterResource, node, resourceLimits, schedulingMode, dryrun); + assignment.setType(assignedToChild.getType()); + + // Done if no child-queue assigned anything + if (Resources.greaterThan(resourceCalculator, clusterResource, + assignedToChild.getResource(), Resources.none())) { + if (!dryrun) { + // Track resource utilization for the parent-queue + super.allocateResource(clusterResource, + assignedToChild.getResource(), node.getPartition(), + assignedToChild.isIncreasedAllocation()); + + // Track resource utilization in this pass of the scheduler + Resources.addTo(assignment.getResource(), + assignedToChild.getResource()); + Resources.addTo( + assignment.getAssignmentInformation().getAllocated(), + assignedToChild.getAssignmentInformation().getAllocated()); + Resources.addTo(assignment.getAssignmentInformation().getReserved(), + assignedToChild.getAssignmentInformation().getReserved()); + assignment.getAssignmentInformation().incrAllocations( + assignedToChild.getAssignmentInformation().getNumAllocations()); + assignment.getAssignmentInformation() + .incrReservations(assignedToChild.getAssignmentInformation() + .getNumReservations()); + assignment.getAssignmentInformation().getAllocationDetails() + .addAll(assignedToChild.getAssignmentInformation() + .getAllocationDetails()); + assignment.getAssignmentInformation().getReservationDetails() + .addAll(assignedToChild.getAssignmentInformation() + .getReservationDetails()); + assignment.setIncreasedAllocation( + assignedToChild.isIncreasedAllocation()); + + LOG.info("assignedContainer" + " queue=" + getQueueName() + + " usedCapacity=" + getUsedCapacity() + + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + + " used=" + queueUsage.getUsed() + " cluster=" + + clusterResource); + } else { + // Update node available resource if we "allocated" anything + if (CapacitySchedulerConfiguration.ROOT.equals(this.getQueueName())) { + Resource remainingResource = Resources.subtract( + node.getAvailableResource(), assignedToChild.getResource()); + if (Resources.fitsIn(resourceCalculator, clusterResource, + Resources.none(), remainingResource)) { + // We don't need preemption to allocate this container, so we + // won't check other containers. + break; + } else { + // Since I "borrow" some resources from other containers, set + // available resource to 0 so next container allocation will + // preempt correct resources. + node.setAvailableResource(Resources.none()); + } + } + } + } else { + break; + } - // Do not assign more than one container if this isn't the root queue - // or if we've already assigned an off-switch container - if (!rootQueue || assignment.getType() == NodeType.OFF_SWITCH) { if (LOG.isDebugEnabled()) { - if (rootQueue && assignment.getType() == NodeType.OFF_SWITCH) { - LOG.debug("Not assigning more than one off-switch container," + - " assignments so far: " + assignment); + LOG.debug("ParentQ=" + getQueueName() + + " assignedSoFarInThisIteration=" + assignment.getResource() + + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + + getAbsoluteUsedCapacity()); + } + + // Do not assign more than one container if this isn't the root queue + // or if we've already assigned an off-switch container + if (!rootQueue || assignment.getType() == NodeType.OFF_SWITCH) { + if (LOG.isDebugEnabled()) { + if (rootQueue && assignment.getType() == NodeType.OFF_SWITCH) { + LOG.debug("Not assigning more than one off-switch container," + + " assignments so far: " + assignment); + } } + break; } - break; } - } + } finally { + // Restore node's available resource if we're dryrun. + if (dryrun + && CapacitySchedulerConfiguration.ROOT.equals(this.getQueueName())) { + node.setAvailableResource(nodeAvailClone); + } + } return assignment; } - private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { - return (node.getReservedContainer() == null) && - Resources.greaterThanOrEqual(resourceCalculator, clusterResource, - node.getAvailableResource(), minimumAllocation); + private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node, + boolean dryrun) { + return (node.getReservedContainer() == null) + // Dryrun doesn't check if node's available resource can allocate at + // least one contaienr, we assume some containers could be preempted + // from this node + && (Resources.greaterThanOrEqual(resourceCalculator, clusterResource, + node.getAvailableResource(), minimumAllocation) || dryrun); } private ResourceLimits getResourceLimitsOfChild(CSQueue child, @@ -550,7 +581,7 @@ private ResourceLimits getResourceLimitsOfChild(CSQueue child, private synchronized CSAssignment assignContainersToChildQueues( Resource cluster, FiCaSchedulerNode node, ResourceLimits limits, - SchedulingMode schedulingMode) { + SchedulingMode schedulingMode, boolean dryrun) { CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); @@ -570,7 +601,7 @@ private synchronized CSAssignment assignContainersToChildQueues( getResourceLimitsOfChild(childQueue, cluster, limits); assignment = childQueue.assignContainers(cluster, node, - childLimits, schedulingMode); + childLimits, schedulingMode, dryrun); if(LOG.isDebugEnabled()) { LOG.debug("Assigned to queue: " + childQueue.getQueuePath() + " stats: " + childQueue + " --> " + @@ -581,16 +612,18 @@ private synchronized CSAssignment assignContainersToChildQueues( if (Resources.greaterThan( resourceCalculator, cluster, assignment.getResource(), Resources.none())) { - // Only update childQueues when we doing non-partitioned node - // allocation. - if (RMNodeLabelsManager.NO_LABEL.equals(node.getPartition())) { - // Remove and re-insert to sort - iter.remove(); - LOG.info("Re-sorting assigned queue: " + childQueue.getQueuePath() - + " stats: " + childQueue); - childQueues.add(childQueue); - if (LOG.isDebugEnabled()) { - printChildQueues(); + if (dryrun) { + // Only update childQueues when we doing non-partitioned node + // allocation. + if (RMNodeLabelsManager.NO_LABEL.equals(node.getPartition())) { + // Remove and re-insert to sort + iter.remove(); + LOG.info("Re-sorting assigned queue: " + childQueue.getQueuePath() + + " stats: " + childQueue); + childQueues.add(childQueue); + if (LOG.isDebugEnabled()) { + printChildQueues(); + } } } break; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java index ee01bd1..527d95f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -43,17 +44,20 @@ FiCaSchedulerApp application; final ResourceCalculator rc; final RMContext rmContext; - + PreemptionManager preemptionManager = null; + public AbstractContainerAllocator(FiCaSchedulerApp application, - ResourceCalculator rc, RMContext rmContext) { + ResourceCalculator rc, RMContext rmContext, + PreemptionManager preemptionMgr) { this.application = application; this.rc = rc; this.rmContext = rmContext; + this.preemptionManager = preemptionMgr; } protected CSAssignment getCSAssignmentFromAllocateResult( Resource clusterResource, ContainerAllocation result, - RMContainer rmContainer) { + RMContainer rmContainer, boolean dryrun) { // Handle skipped boolean skipped = (result.getAllocationState() == AllocationState.APP_SKIPPED); @@ -72,40 +76,42 @@ protected CSAssignment getCSAssignmentFromAllocateResult( assignment.setResource(allocatedResource); assignment.setType(result.getContainerNodeType()); - if (result.getAllocationState() == AllocationState.RESERVED) { - // This is a reserved container - LOG.info("Reserved container " + " application=" - + application.getApplicationId() + " resource=" + allocatedResource - + " queue=" + this.toString() + " cluster=" + clusterResource); - assignment.getAssignmentInformation().addReservationDetails( - updatedContainer.getId(), - application.getCSLeafQueue().getQueuePath()); - assignment.getAssignmentInformation().incrReservations(); - Resources.addTo(assignment.getAssignmentInformation().getReserved(), - allocatedResource); - } else if (result.getAllocationState() == AllocationState.ALLOCATED){ - // This is a new container - // Inform the ordering policy - LOG.info("assignedContainer" + " application attempt=" - + application.getApplicationAttemptId() + " container=" - + updatedContainer.getId() + " queue=" + this + " clusterResource=" - + clusterResource + " type=" + assignment.getType()); + if (!dryrun) { + // Only update following fields if it's not dryrun + if (result.getAllocationState() == AllocationState.RESERVED) { + // This is a reserved container + LOG.info("Reserved container " + " application=" + + application.getApplicationId() + " resource=" + + allocatedResource + " queue=" + this.toString() + " cluster=" + + clusterResource); + assignment.getAssignmentInformation().addReservationDetails( + updatedContainer.getId(), + application.getCSLeafQueue().getQueuePath()); + assignment.getAssignmentInformation().incrReservations(); + Resources.addTo(assignment.getAssignmentInformation().getReserved(), + allocatedResource); + } else if (result.getAllocationState() == AllocationState.ALLOCATED) { + // This is a new container + // Inform the ordering policy + LOG.info("assignedContainer" + " application attempt=" + + application.getApplicationAttemptId() + " container=" + + updatedContainer.getId() + " queue=" + this + + " clusterResource=" + clusterResource); + + application.getCSLeafQueue().getOrderingPolicy().containerAllocated( + application, + application.getRMContainer(updatedContainer.getId())); - application - .getCSLeafQueue() - .getOrderingPolicy() - .containerAllocated(application, - application.getRMContainer(updatedContainer.getId())); + assignment.getAssignmentInformation().addAllocationDetails( + updatedContainer.getId(), + application.getCSLeafQueue().getQueuePath()); + assignment.getAssignmentInformation().incrAllocations(); + Resources.addTo(assignment.getAssignmentInformation().getAllocated(), + allocatedResource); - assignment.getAssignmentInformation().addAllocationDetails( - updatedContainer.getId(), - application.getCSLeafQueue().getQueuePath()); - assignment.getAssignmentInformation().incrAllocations(); - Resources.addTo(assignment.getAssignmentInformation().getAllocated(), - allocatedResource); - - if (rmContainer != null) { - assignment.setFulfilledReservation(true); + if (rmContainer != null) { + assignment.setFulfilledReservation(true); + } } } } @@ -127,5 +133,6 @@ protected CSAssignment getCSAssignmentFromAllocateResult( */ public abstract CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, RMContainer reservedContainer); + ResourceLimits resourceLimits, RMContainer reservedContainer, + boolean dryrun); } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java index 3be8e0e..bcac99e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -35,28 +36,29 @@ AbstractContainerAllocator regularContainerAllocator; public ContainerAllocator(FiCaSchedulerApp application, - ResourceCalculator rc, RMContext rmContext) { - super(application, rc, rmContext); + ResourceCalculator rc, RMContext rmContext, PreemptionManager preemptionMgr) { + super(application, rc, rmContext, preemptionMgr); increaseContainerAllocator = new IncreaseContainerAllocator(application, rc, rmContext); regularContainerAllocator = - new RegularContainerAllocator(application, rc, rmContext); + new RegularContainerAllocator(application, rc, rmContext, preemptionMgr); } @Override public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, RMContainer reservedContainer) { + ResourceLimits resourceLimits, RMContainer reservedContainer, + boolean dryrun) { if (reservedContainer != null) { if (reservedContainer.getState() == RMContainerState.RESERVED) { // It's a regular container return regularContainerAllocator.assignContainers(clusterResource, - node, schedulingMode, resourceLimits, reservedContainer); + node, schedulingMode, resourceLimits, reservedContainer, dryrun); } else { // It's a increase container return increaseContainerAllocator.assignContainers(clusterResource, - node, schedulingMode, resourceLimits, reservedContainer); + node, schedulingMode, resourceLimits, reservedContainer, dryrun); } } else { /* @@ -65,14 +67,14 @@ public CSAssignment assignContainers(Resource clusterResource, */ CSAssignment assign = increaseContainerAllocator.assignContainers(clusterResource, node, - schedulingMode, resourceLimits, null); + schedulingMode, resourceLimits, null, dryrun); if (Resources.greaterThan(rc, clusterResource, assign.getResource(), Resources.none())) { return assign; } return regularContainerAllocator.assignContainers(clusterResource, node, - schedulingMode, resourceLimits, null); + schedulingMode, resourceLimits, null, dryrun); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java index 16cf6d3..18abb55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java @@ -51,7 +51,7 @@ public IncreaseContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc, RMContext rmContext) { - super(application, rc, rmContext); + super(application, rc, rmContext, null); } /** @@ -173,7 +173,13 @@ private CSAssignment allocateIncreaseRequest(FiCaSchedulerNode node, @Override public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, RMContainer reservedContainer) { + ResourceLimits resourceLimits, RMContainer reservedContainer, + boolean dryrun) { + // TODO, IncreaseContainerAllocator doesn't support dryrun for now. + if (dryrun) { + return CSAssignment.SKIP_ASSIGNMENT; + } + AppSchedulingInfo sinfo = application.getAppSchedulingInfo(); NodeId nodeId = node.getNodeID(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 6bd4d17..eedb1c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -33,9 +34,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.ResourceRequirement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -50,10 +54,11 @@ private static final Log LOG = LogFactory.getLog(RegularContainerAllocator.class); private ResourceRequest lastResourceRequest = null; - + public RegularContainerAllocator(FiCaSchedulerApp application, - ResourceCalculator rc, RMContext rmContext) { - super(application, rc, rmContext); + ResourceCalculator rc, RMContext rmContext, + PreemptionManager preemptionMgr) { + super(application, rc, rmContext, preemptionMgr); } private boolean checkHeadroom(Resource clusterResource, @@ -178,7 +183,7 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, ContainerAllocation preAllocation(Resource clusterResource, FiCaSchedulerNode node, SchedulingMode schedulingMode, ResourceLimits resourceLimits, Priority priority, - RMContainer reservedContainer) { + RMContainer reservedContainer, boolean dryrun) { ContainerAllocation result; if (null == reservedContainer) { // pre-check when allocating new container @@ -200,7 +205,7 @@ ContainerAllocation preAllocation(Resource clusterResource, // Try to allocate containers on node result = assignContainersOnNode(clusterResource, node, priority, - reservedContainer, schedulingMode, resourceLimits); + reservedContainer, schedulingMode, resourceLimits, dryrun); if (null == reservedContainer) { if (result.state == AllocationState.PRIORITY_SKIPPED) { @@ -283,11 +288,12 @@ private boolean canAssign(Priority priority, FiCaSchedulerNode node, private ContainerAllocation assignNodeLocalContainers( Resource clusterResource, ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer, - SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { + SchedulingMode schedulingMode, ResourceLimits currentResoureLimits, + boolean dryrun) { if (canAssign(priority, node, NodeType.NODE_LOCAL, reservedContainer)) { return assignContainer(clusterResource, node, priority, nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer, - schedulingMode, currentResoureLimits); + schedulingMode, currentResoureLimits, dryrun); } // Skip node-local request, go to rack-local request @@ -297,11 +303,12 @@ private ContainerAllocation assignNodeLocalContainers( private ContainerAllocation assignRackLocalContainers( Resource clusterResource, ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer, - SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { + SchedulingMode schedulingMode, ResourceLimits currentResoureLimits, + boolean dryrun) { if (canAssign(priority, node, NodeType.RACK_LOCAL, reservedContainer)) { return assignContainer(clusterResource, node, priority, rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer, - schedulingMode, currentResoureLimits); + schedulingMode, currentResoureLimits, dryrun); } // Skip rack-local request, go to off-switch request @@ -311,11 +318,12 @@ private ContainerAllocation assignRackLocalContainers( private ContainerAllocation assignOffSwitchContainers( Resource clusterResource, ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer, - SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { + SchedulingMode schedulingMode, ResourceLimits currentResoureLimits, + boolean dryrun) { if (canAssign(priority, node, NodeType.OFF_SWITCH, reservedContainer)) { return assignContainer(clusterResource, node, priority, offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer, - schedulingMode, currentResoureLimits); + schedulingMode, currentResoureLimits, dryrun); } return ContainerAllocation.APP_SKIPPED; @@ -323,7 +331,7 @@ private ContainerAllocation assignOffSwitchContainers( private ContainerAllocation assignContainersOnNode(Resource clusterResource, FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer, - SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { + SchedulingMode schedulingMode, ResourceLimits currentResoureLimits, boolean dryrun) { ContainerAllocation allocation; @@ -336,7 +344,7 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, allocation = assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, node, priority, reservedContainer, schedulingMode, - currentResoureLimits); + currentResoureLimits, dryrun); if (Resources.greaterThan(rc, clusterResource, allocation.getResourceToBeAllocated(), Resources.none())) { allocation.requestNodeType = requestType; @@ -359,7 +367,7 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, allocation = assignRackLocalContainers(clusterResource, rackLocalResourceRequest, node, priority, reservedContainer, schedulingMode, - currentResoureLimits); + currentResoureLimits, dryrun); if (Resources.greaterThan(rc, clusterResource, allocation.getResourceToBeAllocated(), Resources.none())) { allocation.requestNodeType = requestType; @@ -382,7 +390,7 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, allocation = assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, node, priority, reservedContainer, schedulingMode, - currentResoureLimits); + currentResoureLimits, dryrun); allocation.requestNodeType = requestType; // When a returned allocation is LOCALITY_SKIPPED, since we're in @@ -396,11 +404,80 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, return ContainerAllocation.PRIORITY_SKIPPED; } + + private boolean tryToPremeptWhenDryrun(ResourceRequest request, Resource cluster, + Resource nodeAvailable, SchedulerNode node) { + if (!checkExcessivePreemption(request, cluster, nodeAvailable, + node.getPartition())) { + return false; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("check excessive preemption passed for queue=" + application + .getQueue()); + } + + return preemptionManager.tryToPreempt( + new ResourceRequirement(application, request.getCapability(), + request.getPriority(), request.getResourceName()), + node.getRunningContainers(), cluster, node.getPartition()); + } + + private boolean checkExcessivePreemption(ResourceRequest request, + Resource cluster, Resource nodeAvailable, String nodePartition) { + // Check if the queue which the app belongs to can preempt anything? + if (!preemptionManager + .canQueueuPreemptResourceFromOthers(application.getQueueName(), + nodePartition)) { + return false; + } + + /* + * Two criterias of excessive preemption: + * - #reserved + #preempt(*) > #pending(*) + * - #preempt(resourceName) > min(#pending(*), #pending(resourceName), #preemptable) + * + * Notes: + * - #preemptable is number of containers for this application can preempt, it is + */ + int numReserved = + application.getNumReservedContainers(request.getPriority()); + int numPending = application.getTotalRequiredResources(request.getPriority()); + int numWillPreemptFromOther = Math.round(Resources.divide(rc, cluster, + preemptionManager.getTotalResourcesWillBePreemptedByApp( + application.getApplicationAttemptId(), request.getPriority(), + ResourceRequest.ANY), + request.getCapability())); + + if (numReserved + numWillPreemptFromOther > numPending) { + return false; + } + + if (!request.getResourceName().equals(ResourceRequest.ANY)) { + numPending = + Math.min(numPending, + Math.round(Resources.divide( + rc, cluster, application.getAppAttemptResourceUsage() + .getPending(request.getResourceName()), + request.getCapability()))); + numWillPreemptFromOther = Math.round(Resources.divide(rc, cluster, + preemptionManager.getTotalResourcesWillBePreemptedByApp( + application.getApplicationAttemptId(), request.getPriority(), + request.getResourceName()), + request.getCapability())); + + if (numWillPreemptFromOther > numPending) { + return false; + } + } + + return true; + } private ContainerAllocation assignContainer(Resource clusterResource, FiCaSchedulerNode node, Priority priority, ResourceRequest request, NodeType type, RMContainer rmContainer, SchedulingMode schedulingMode, - ResourceLimits currentResoureLimits) { + ResourceLimits currentResoureLimits, boolean dryrun) { lastResourceRequest = request; if (LOG.isDebugEnabled()) { @@ -433,8 +510,11 @@ private ContainerAllocation assignContainer(Resource clusterResource, return ContainerAllocation.LOCALITY_SKIPPED; } - assert Resources.greaterThan( - rc, clusterResource, available, Resources.none()); + if (!dryrun) { + // Only assert this condition if we're not dryrun + assert Resources + .greaterThan(rc, clusterResource, available, Resources.none()); + } boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer( priority, capability); @@ -442,6 +522,14 @@ private ContainerAllocation assignContainer(Resource clusterResource, // Can we allocate a container on this node? int availableContainers = rc.computeAvailableContainers(available, capability); + if (dryrun) { + if (availableContainers > 0) { + // This node has enough resource, we don't need preempt any container + // for next allocation, return LOCALITY_SKIPPED to terminate dryrun + return ContainerAllocation.LOCALITY_SKIPPED; + } + } + // How much need to unreserve equals to: // max(required - headroom, amountNeedUnreserve) @@ -458,7 +546,7 @@ private ContainerAllocation assignContainer(Resource clusterResource, boolean reservationsContinueLooking = application.getCSLeafQueue().getReservationContinueLooking(); - if (availableContainers > 0) { + if (availableContainers > 0 || dryrun) { // Allocate... // We will only do continuous reservation when this is not allocated from // reserved container @@ -492,6 +580,14 @@ private ContainerAllocation assignContainer(Resource clusterResource, } } } + + // When dryrun, check if we need preempt + if (dryrun) { + if (!tryToPremeptWhenDryrun(request, clusterResource, + node.getAvailableResource(), node)) { + return ContainerAllocation.LOCALITY_SKIPPED; + } + } ContainerAllocation result = new ContainerAllocation(unreservedContainer, request.getCapability(), @@ -515,6 +611,13 @@ private ContainerAllocation assignContainer(Resource clusterResource, return ContainerAllocation.LOCALITY_SKIPPED; } } + + if (dryrun) { + if (!tryToPremeptWhenDryrun(request, clusterResource, + node.getAvailableResource(), node)) { + return ContainerAllocation.LOCALITY_SKIPPED; + } + } ContainerAllocation result = new ContainerAllocation(null, request.getCapability(), @@ -667,10 +770,14 @@ ContainerAllocation doAllocation(ContainerAllocation allocationResult, private ContainerAllocation allocate(Resource clusterResource, FiCaSchedulerNode node, SchedulingMode schedulingMode, ResourceLimits resourceLimits, Priority priority, - RMContainer reservedContainer) { + RMContainer reservedContainer, boolean dryrun) { ContainerAllocation result = preAllocation(clusterResource, node, schedulingMode, resourceLimits, - priority, reservedContainer); + priority, reservedContainer, dryrun); + + if (dryrun) { + return result; + } if (AllocationState.ALLOCATED == result.state || AllocationState.RESERVED == result.state) { @@ -686,7 +793,7 @@ private ContainerAllocation allocate(Resource clusterResource, public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, SchedulingMode schedulingMode, ResourceLimits resourceLimits, - RMContainer reservedContainer) { + RMContainer reservedContainer, boolean dryrun) { if (reservedContainer == null) { // Check if application needs more resource, skip if it doesn't need more. if (!application.hasPendingResourceRequest(rc, @@ -703,25 +810,25 @@ public CSAssignment assignContainers(Resource clusterResource, for (Priority priority : application.getPriorities()) { ContainerAllocation result = allocate(clusterResource, node, schedulingMode, resourceLimits, - priority, null); + priority, null, dryrun); AllocationState allocationState = result.getAllocationState(); if (allocationState == AllocationState.PRIORITY_SKIPPED) { continue; } return getCSAssignmentFromAllocateResult(clusterResource, result, - null); + null, dryrun); } // We will reach here if we skipped all priorities of the app, so we will // skip the app. return CSAssignment.SKIP_ASSIGNMENT; } else { - ContainerAllocation result = - allocate(clusterResource, node, schedulingMode, resourceLimits, - reservedContainer.getReservedPriority(), reservedContainer); + ContainerAllocation result = allocate(clusterResource, node, + schedulingMode, resourceLimits, + reservedContainer.getReservedPriority(), reservedContainer, dryrun); return getCSAssignmentFromAllocateResult(clusterResource, result, - reservedContainer); + reservedContainer, dryrun); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueuePartitionEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueuePartitionEntity.java new file mode 100644 index 0000000..e21a729 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueuePartitionEntity.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption; + +import org.apache.hadoop.yarn.api.records.Resource; + +public class PreemptableQueuePartitionEntity { + private final String queueName; + private final String partitionName; + private final Resource ideal; + private final Resource preemptable; + + public PreemptableQueuePartitionEntity(String queueName, String partitionName, + Resource ideal, Resource preemptable) { + this.queueName = queueName; + this.partitionName = partitionName; + this.ideal = ideal; + this.preemptable = preemptable; + } + + public String getQueueName() { + return queueName; + } + + public String getPartitionName() { + return partitionName; + } + + public Resource getIdeal() { + return ideal; + } + + public Resource getPreemptable() { + return preemptable; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java new file mode 100644 index 0000000..7e23c35 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java @@ -0,0 +1,651 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +public class PreemptionManager { + enum PreemptionType { + // Currently only DIFFERENCE_QUEUE will be used. + DIFFERENT_QUEUE, + SAME_QUEUE_DIFFERENT_USER, + SAME_QUEUE_SAME_USER + } + + static class ToPreemptContainer { + RMContainer container; + ResourceRequirement resourceRequirement; + long startTimestamp; + long lastListedTimestamp = Long.MAX_VALUE; + PreemptionType preemptionType; + PreemptableEntityMeasure containerQueueMeasure; + PreemptableEntityMeasure demandingQueueMeasure; + + public ToPreemptContainer(RMContainer container, long startTimestamp, + PreemptionType preemptionType, ResourceRequirement resourceRequirement, + PreemptableEntityMeasure containerQueue, PreemptableEntityMeasure demandingQueue) { + this.container = container; + this.startTimestamp = startTimestamp; + this.preemptionType = preemptionType; + this.resourceRequirement = resourceRequirement; + this.containerQueueMeasure = containerQueue; + this.demandingQueueMeasure = demandingQueue; + } + } + + /* + * One preemptable entity, this could be a queue, a user or application. + * User/Application will be used only when intra-queue preemption supported + */ + static class PreemptableEntityMeasure { + String entityKey; + + // PlaceHolder: String parentKey; + // PlaceHolder: String user; + // PlaceHolder: String appId; + + Resource ideal; + + // When this is debtor (debtor == true), maxPreemptable means how much + // resource needs to be taken from this queue. + // When this is not debtor, maxPreemptable means how much resource this + // queue need to get. + Resource maxPreemptable; + Resource totalMarkedPreempted; + + // If someone should preempt resource from this entity + boolean debtor = false; + + private Resource totalMarkedPreemptedForDryRun; + private long timestamp; + + public PreemptableEntityMeasure(String entityKey) { + this.entityKey = entityKey; + this.totalMarkedPreempted = Resources.createResource(0); + } + + public Resource getTotalMarkedPreemptedForDryRun(long timestamp) { + if (this.timestamp != timestamp) { + totalMarkedPreemptedForDryRun = Resources.clone(totalMarkedPreempted); + this.timestamp = timestamp; + } + return totalMarkedPreemptedForDryRun; + } + } + + class PreemptableEntitiesManager { + private Map map = new HashMap<>(); + + PreemptableEntityMeasure getOrAddNew(String key) { + PreemptableEntityMeasure measure = map.get(key); + if (measure == null) { + measure = new PreemptableEntityMeasure(key); + map.put(key, measure); + } + return measure; + } + + PreemptableEntityMeasure get(String key) { + return map.get(key); + } + + public void updatePreemptableQueueEntity(String queue, String partition, + Resource ideal, Resource maxPreempt) { + String key = queue + "_" + partition; + + PreemptableEntityMeasure measure = getOrAddNew(key); + measure.ideal = ideal; + + // When maxPreempt is positive, it means this entity is a debtor + if (maxPreempt.getMemory() + maxPreempt.getVirtualCores() > 0) { + if (!measure.debtor) { + // The queue becomes a debtor, if there's any container will be + // preempted by apps belong to this queue, unmark them. + List unmarkContainers = new ArrayList<>(); + for (ToPreemptContainer c : preemptionReleationshipManager.toPreemptContainers + .values()) { + if (c.resourceRequirement.application.getQueueName() + .equals(queue)) { + unmarkContainers.add(c.container.getContainerId()); + } + } + + for (ContainerId id : unmarkContainers) { + preemptionReleationshipManager.unmarkToPreemptContainer(id); + } + } + + measure.maxPreemptable = maxPreempt; + measure.debtor = true; + } else { + // The queue becomes a non-debtor, if there's any container will be + // preempted by apps belong to this queue, unmark them. + List unmarkContainers = new ArrayList<>(); + for (ToPreemptContainer c : preemptionReleationshipManager.toPreemptContainers + .values()) { + if (c.container.getQueue().equals(queue)) { + unmarkContainers.add(c.container.getContainerId()); + } + } + + for (ContainerId id : unmarkContainers) { + preemptionReleationshipManager.unmarkToPreemptContainer(id); + } + + measure.maxPreemptable = Resources.negate(maxPreempt); + measure.debtor = false; + } + } + } + + static class DemandingApp { + ApplicationAttemptId appAttemptId; + SchedulerApplicationAttempt application; + // to-preempt containers for this app only + Map toPreemptContainers; + // to-preempt resources, priority -> + Map> toPreemptResources; + // container to reference of how much resource marked to be preemption + // classified by priority and resourceName (the reference of resource in + // above map) + Map containerIdToToPreemptResource; + + public DemandingApp(ApplicationAttemptId appAttemptId, + SchedulerApplicationAttempt application) { + this.appAttemptId = appAttemptId; + this.application = application; + + toPreemptContainers = new HashMap<>(); + toPreemptResources = new HashMap<>(); + containerIdToToPreemptResource = new HashMap<>(); + } + + private Resource getOrAddNewResource(Priority priority, String resourceName) { + if (!toPreemptResources.containsKey(priority)) { + toPreemptResources.put(priority, new HashMap()); + } + if (!toPreemptResources.get(priority).containsKey(resourceName)) { + toPreemptResources.get(priority).put(resourceName, + Resources.createResource(0)); + } + + return toPreemptResources.get(priority).get(resourceName); + } + + void addToPreemptContainer(ToPreemptContainer container, Priority priority, + String resourceName) { + ContainerId containerId = container.container.getContainerId(); + + toPreemptContainers.put(containerId, container); + + if (!resourceName.equals(ResourceRequest.ANY)) { + Resource resource = getOrAddNewResource(priority, resourceName); + containerIdToToPreemptResource.put(containerId, resource); + Resources.addTo(resource, container.container.getAllocatedResource()); + } + + // We will update ANY resource for all to-preempted container + Resource resource = getOrAddNewResource(priority, ResourceRequest.ANY); + Resources.addTo(resource, container.container.getAllocatedResource()); + } + + void unmarkToPreemptContainer(ContainerId containerId) { + if (toPreemptContainers.containsKey(containerId)) { + ToPreemptContainer container = + toPreemptContainers.remove(containerId); + + // Only demanding resource request's resourceName != ANY needs to deduct + // resource + if (containerIdToToPreemptResource.containsKey(containerId)) { + Resources.subtractFrom( + containerIdToToPreemptResource + .get(container.container.getContainerId()), + container.container.getAllocatedResource()); + } + + // Update ANY resource for the to-preempted container + Resource resource = toPreemptResources.get(container.resourceRequirement.priority) + .get(ResourceRequest.ANY); + Resources.subtractFrom(resource, + container.container.getAllocatedResource()); + } + } + } + + class PreemptionRelationshipManager { + Map toPreemptContainers = new HashMap<>(); + Map demandingApps = new HashMap<>(); + + void addToPreemptContainer(ToPreemptContainer container, + ResourceRequirement resourceRequirement) { + toPreemptContainers.put(container.container.getContainerId(), container); + + // Add or create demanding app if necessary + ApplicationAttemptId attemptId = + resourceRequirement.application.getApplicationAttemptId(); + if (!demandingApps.containsKey(attemptId)) { + demandingApps.put(attemptId, + new DemandingApp(attemptId, resourceRequirement.application)); + } + DemandingApp demandingApp = demandingApps.get(attemptId); + demandingApp.addToPreemptContainer(container, + resourceRequirement.priority, resourceRequirement.resourceName); + + // updated marked-preempted resource + Resource res = container.container.getAllocatedResource(); + Resources.addTo(container.containerQueueMeasure.totalMarkedPreempted, + res); + Resources.addTo(container.demandingQueueMeasure.totalMarkedPreempted, + res); + } + + void unmarkToPreemptContainer(ContainerId containerId) { + if (toPreemptContainers.containsKey(containerId)) { + ToPreemptContainer container = + toPreemptContainers.remove(containerId); + DemandingApp app = + demandingApps.get(container.resourceRequirement.application + .getApplicationAttemptId()); + if (app != null) { + app.unmarkToPreemptContainer(containerId); + // updated marked-preempted resource + Resource res = container.container.getAllocatedResource(); + Resources.subtractFrom( + container.containerQueueMeasure.totalMarkedPreempted, res); + Resources.subtractFrom( + container.demandingQueueMeasure.totalMarkedPreempted, res); + } + } + } + + void unmarkDemandingApp(ApplicationAttemptId appAttemptId) { + if (demandingApps.containsKey(appAttemptId)) { + DemandingApp demandingApp = demandingApps.remove(appAttemptId); + Set containersRequiredByDemandingApp = + demandingApp.toPreemptContainers.keySet(); + for (ContainerId id : containersRequiredByDemandingApp) { + ToPreemptContainer container = toPreemptContainers.remove(id); + if (null != container) { + // updated marked-preempted resource + Resource res = container.container.getAllocatedResource(); + Resources.subtractFrom( + container.containerQueueMeasure.totalMarkedPreempted, res); + Resources.subtractFrom( + container.demandingQueueMeasure.totalMarkedPreempted, res); + } + } + } + } + } + + private ResourceCalculator rc; + PreemptableEntitiesManager preemptableEntitiesManager = + new PreemptableEntitiesManager(); + PreemptionRelationshipManager preemptionReleationshipManager = + new PreemptionRelationshipManager(); + // selecting container for this preemption cycle, this will be cleared at + // beginning of every preemption cycle. + private Set selectingContainers = new HashSet<>(); + + // To-be-killed containers + Set toKillContainers = new HashSet<>(); + + // ResourceUsages + private Map queueResourceUsages = new HashMap<>(); + + private Clock clock = new SystemClock(); + private ReentrantReadWriteLock.ReadLock readLock; + private ReentrantReadWriteLock.WriteLock writeLock; + + // TODO change this to configurable + private final static int WAIT_BEFORE_KILL_SEC = 30; + + public PreemptionManager() { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + } + + public void init(ResourceCalculator rc) { + this.rc = rc; + } + + @VisibleForTesting + public void setClock(Clock clock) { + this.clock = clock; + } + + private boolean canPreempt(Resource cluster, Resource markedPreempted, + Resource maxPreemptable, Resource current, Resource ideal, + Resource newCandidate) { + Resource totalMarkedPreemptedResourceIfSelected = + Resources.add(markedPreempted, newCandidate); + + if (Resources.fitsIn(rc, cluster, totalMarkedPreemptedResourceIfSelected, + maxPreemptable) || Resources + .equals(markedPreempted, Resources.none())) { + // We allow total-marked-to-be-preempted resource less than max-preemptable + // OR one container + if (Resources.fitsIn(rc, cluster, totalMarkedPreemptedResourceIfSelected, + Resources.subtract(current, ideal))) { + // In addition, total-marked-preempted should <= current - ideal + return true; + } + } + + return false; + } + + private List selectContainersToPreempt(List candidates, + Resource required, Resource cluster, String nodePartition) { + long timestamp = System.nanoTime(); + + // Assume candidates is sorted by preemption order, first items will be preempted first. + // Scan the list to select which containers to be preempted. + Resource totalSelected = Resources.createResource(0); + List selected = new ArrayList<>(); + + // TODO + // Should sort candidates, selected containers should come before other containers + for (RMContainer candidateContainer : candidates) { + // Skip all AM containers OR already selected containers + if (candidateContainer.isAMContainer() || selectingContainers + .contains(candidateContainer.getContainerId())) { + continue; + } + + String key = candidateContainer.getQueue() + "_" + nodePartition; + PreemptableEntityMeasure measure = preemptableEntitiesManager.get(key); + if (measure == null || !measure.debtor) { + // Skip non-existed queue or non-debtor queue. + continue; + } + + Resource markedPreempted = + measure.getTotalMarkedPreemptedForDryRun(timestamp); + + // If the container is on our to-preempt list, deduct it from marked-to-preempt + // first (assume this container will not be preempted this time) + // Marked-to-preempt-resource will add resource of the container if it is selected. + if (preemptionReleationshipManager.toPreemptContainers + .containsKey(candidateContainer.getContainerId())) { + Resources.subtractFrom(markedPreempted, + candidateContainer.getAllocatedResource()); + } + + // We get enough preemption headroom for the candidate + ResourceUsage queueResourceUsage = + queueResourceUsages.get(candidateContainer.getQueue()); + if (canPreempt(cluster, markedPreempted, measure.maxPreemptable, + queueResourceUsage.getUsed(nodePartition), measure.ideal, + candidateContainer.getAllocatedResource())) { + Resources + .addTo(markedPreempted, candidateContainer.getAllocatedResource()); + selected.add(candidateContainer); + + // update total resource as well + Resources.addTo(totalSelected, candidateContainer.getAllocatedResource()); + if (Resources.fitsIn(rc, cluster, required, totalSelected)) { + return selected; + } + } + } + + return null; + } + + public Resource getTotalResourcesWillBePreemptedByApp( + ApplicationAttemptId attemptId, Priority priority, String resourceName) { + if (!preemptionReleationshipManager.demandingApps.containsKey(attemptId)) { + return Resources.none(); + } + + DemandingApp app = + preemptionReleationshipManager.demandingApps.get(attemptId); + if (!app.toPreemptResources.containsKey(priority)) { + return Resources.none(); + } + + Resource res = app.toPreemptResources.get(priority).get(resourceName); + return res != null ? res : Resources.none(); + } + + public boolean canQueueuPreemptResourceFromOthers(String queue, + String partition) { + String key = queue + "_" + partition; + PreemptableEntityMeasure measure = preemptableEntitiesManager.get(key); + if (measure != null) { + return !measure.debtor; + } + + return false; + } + + @SuppressWarnings("unchecked") + public Set pullContainersToKill() { + try { + writeLock.lock(); + + if (toKillContainers.isEmpty()) { + return Collections.EMPTY_SET; + } + Set ret = toKillContainers; + toKillContainers = new HashSet<>(); + return ret; + } finally { + writeLock.unlock(); + } + } + + @VisibleForTesting + public long getWaitBeforeKillMs() { + return WAIT_BEFORE_KILL_SEC * 30; + } + + public boolean tryToPreempt(ResourceRequirement resourceRequirement, + Collection candidatesToPreempt, Resource cluster, + String nodePartition) { + selectingContainers.clear(); + + List candidates = getContainers(PreemptionType.DIFFERENT_QUEUE, + resourceRequirement, candidatesToPreempt); + + List selectedContainers = selectContainersToPreempt(candidates, + resourceRequirement.getRequired(), cluster, nodePartition); + + PreemptableEntityMeasure demandingQueueEntity = null; + + if (selectedContainers != null) { + // Updates container preemption info + for (RMContainer c : selectedContainers) { + String key = c.getQueue() + "_" + nodePartition; + PreemptableEntityMeasure containerQueueMeasure = + preemptableEntitiesManager.get(key); + if (demandingQueueEntity == null) { + demandingQueueEntity = preemptableEntitiesManager + .getOrAddNew(resourceRequirement.application.getQueueName() + "_" + + nodePartition); + } + + // create ToPreempt container if necessary + ToPreemptContainer toPreemptContainer = + preemptionReleationshipManager.toPreemptContainers + .get(c.getContainerId()); + + if (toPreemptContainer != null) { + // If this container is already on our to-preempt containers list... + + if (!toPreemptContainer.resourceRequirement + .equals(resourceRequirement)) { + // If this container will be used by a different resource + // requirement, we will cancel the previous one + preemptionReleationshipManager + .unmarkToPreemptContainer(c.getContainerId()); + + // Create a new ToPreemptContainer, the new ToPreempt container will + // inherit previous startTime so we don't need to wait another + // timeout to kill the container + ToPreemptContainer newToPreemptContainer = + new ToPreemptContainer(c, toPreemptContainer.startTimestamp, + PreemptionType.DIFFERENT_QUEUE, resourceRequirement, + containerQueueMeasure, demandingQueueEntity); + preemptionReleationshipManager.addToPreemptContainer( + newToPreemptContainer, resourceRequirement); + } else { + long currentTime = clock.getTime(); + + if (currentTime + - toPreemptContainer.startTimestamp > WAIT_BEFORE_KILL_SEC) { + toKillContainers.add(c.getContainerId()); + } + + // Update last listed timestamp for this container, this will be + // used to decide if a ToPreemptContainer could be removed from list + toPreemptContainer.lastListedTimestamp = currentTime; + } + } else { + // Create a new ToPreemptContainer + ToPreemptContainer newToPreemptContainer = new ToPreemptContainer(c, + clock.getTime(), PreemptionType.DIFFERENT_QUEUE, + resourceRequirement, containerQueueMeasure, demandingQueueEntity); + preemptionReleationshipManager.addToPreemptContainer( + newToPreemptContainer, resourceRequirement); + } + } + + return true; + } + + return false; + } + + public void updatePreemptableQueuePartitions( + Collection entities) { + try { + writeLock.lock(); + for (PreemptableQueuePartitionEntity entity : entities) { + preemptableEntitiesManager.updatePreemptableQueueEntity( + entity.getQueueName(), entity.getPartitionName(), entity.getIdeal(), + entity.getPreemptable()); + } + } finally { + writeLock.unlock(); + } + } + + + public void unmarkToPreemptContainer(ContainerId containerId) { + try { + writeLock.lock(); + preemptionReleationshipManager.unmarkToPreemptContainer(containerId); + } finally{ + writeLock.unlock(); + } + } + + public void unmarkDemandingApp(ApplicationAttemptId attempId) { + try { + writeLock.lock(); + preemptionReleationshipManager.unmarkDemandingApp(attempId); + } finally{ + writeLock.unlock(); + } + } + + private void updateResourceUsages(CSQueue root) { + queueResourceUsages.clear(); + + Queue bfsQueue = new LinkedList<>(); + bfsQueue.offer(root); + while (!bfsQueue.isEmpty()) { + CSQueue cur = bfsQueue.remove(); + + if (null == cur.getChildQueues() || cur.getChildQueues().isEmpty()) { + // add ResourceUsage of leaf queues to map + queueResourceUsages.put(cur.getQueueName(), cur.getQueueResourceUsage()); + } else { + // add children to bfs queue + for (CSQueue q : cur.getChildQueues()) { + bfsQueue.offer(q); + } + } + } + } + + public void queueRefreshed(CSQueue root) { + try { + writeLock.lock(); + updateResourceUsages(root); + } finally { + writeLock.unlock(); + } + } + + PreemptionType getPreemptionType(ResourceRequirement requirement, + RMContainer preemptionCandidate) { + if (!requirement.getApplication().getQueue() + .equals(preemptionCandidate.getQueue())) { + return PreemptionType.DIFFERENT_QUEUE; + } else if (!requirement.getApplication().getUser() + .equals(preemptionCandidate.getUser())) { + return PreemptionType.SAME_QUEUE_DIFFERENT_USER; + } else { + return PreemptionType.SAME_QUEUE_SAME_USER; + } + } + + List getContainers(PreemptionType preemptionType, + ResourceRequirement resourceRequirement, + Collection candidates) { + List containers = new ArrayList<>(); + for (RMContainer container : candidates) { + if (getPreemptionType(resourceRequirement, container) == preemptionType) { + containers.add(container); + } + } + + return containers; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/ResourceRequirement.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/ResourceRequirement.java new file mode 100644 index 0000000..be9f740 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/ResourceRequirement.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; + +public class ResourceRequirement { + final SchedulerApplicationAttempt application; + final Resource required; + final Priority priority; + final String resourceName; + + public ResourceRequirement(SchedulerApplicationAttempt application, + Resource required, Priority priority, String resourceName) { + this.application = application; + this.required = required; + this.priority = priority; + this.resourceName = resourceName; + } + + public SchedulerApplicationAttempt getApplication() { + return application; + } + + public Resource getRequired() { + return required; + } + + public Priority getPriority() { + return priority; + } + + public String getResourceName() { + return resourceName; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof ResourceRequirement)) { + return false; + } + + ResourceRequirement other = (ResourceRequirement)obj; + + return application == other.application && required.equals(other.required) + && priority.equals(other.priority) && resourceName + .equals(other.resourceName); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 6f4bfe5..5d4ec58 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -127,8 +128,14 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, if (scheduler.getResourceCalculator() != null) { rc = scheduler.getResourceCalculator(); } - - containerAllocator = new ContainerAllocator(this, rc, rmContext); + + PreemptionManager preemptionManager = null; + if (scheduler instanceof CapacityScheduler) { + preemptionManager = + ((CapacityScheduler) scheduler).getPreemptionManager(); + } + containerAllocator = + new ContainerAllocator(this, rc, rmContext, preemptionManager); } synchronized public boolean containerCompleted(RMContainer rmContainer, @@ -185,7 +192,7 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, RMContainer rmContainer = new RMContainerImpl(container, this.getApplicationAttemptId(), node.getNodeID(), appSchedulingInfo.getUser(), this.rmContext, - request.getNodeLabelExpression()); + request.getNodeLabelExpression(), getQueueName()); // Add it to allContainers list. newlyAllocatedContainers.add(rmContainer); @@ -472,7 +479,8 @@ public LeafQueue getCSLeafQueue() { public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits currentResourceLimits, - SchedulingMode schedulingMode, RMContainer reservedContainer) { + SchedulingMode schedulingMode, RMContainer reservedContainer, + boolean dryrun) { if (LOG.isDebugEnabled()) { LOG.debug("pre-assignContainers for application " + getApplicationId()); @@ -481,7 +489,7 @@ public CSAssignment assignContainers(Resource clusterResource, synchronized (this) { return containerAllocator.assignContainers(clusterResource, node, - schedulingMode, currentResourceLimits, reservedContainer); + schedulingMode, currentResourceLimits, reservedContainer, dryrun); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index fe6db47..0e4140c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -35,6 +35,9 @@ private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class); + // How many times this scheduler node accessed by scheduler + private volatile long tick = 0; + public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName, Set nodeLabels) { super(node, usePortForNodeName, nodeLabels); @@ -43,6 +46,10 @@ public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName, public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName) { this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET); } + + public long addAndGetTick() { + return tick++; + } @Override public synchronized void reserveResource( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index abbf77a..9256c8b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -344,9 +344,9 @@ else if (allowed.equals(NodeType.RACK_LOCAL) && } // Create RMContainer - RMContainer rmContainer = new RMContainerImpl(container, + RMContainer rmContainer = new RMContainerImpl(container, getApplicationAttemptId(), node.getNodeID(), - appSchedulingInfo.getUser(), rmContext); + appSchedulingInfo.getUser(), rmContext, getQueueName()); // Add it to allContainers list. newlyAllocatedContainers.add(rmContainer); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 489ef77..6a330e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -227,6 +227,42 @@ public static Container getMockContainer( Set set = Sets.newHashSet(elements); return set; } + + /** + * Get a queue structure: + *
+   *             Root
+   *            /  |  \
+   *           a   b   c
+   *          10   20  70
+   * 
+ */ + public static Configuration + getConfigurationWithMultipleQueues(Configuration config) { + CapacitySchedulerConfiguration conf = + new CapacitySchedulerConfiguration(config); + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "a", "b", "c" }); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + conf.setCapacity(A, 10); + conf.setMaximumCapacity(A, 100); + conf.setUserLimitFactor(A, 100); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + conf.setCapacity(B, 20); + conf.setMaximumCapacity(B, 100); + conf.setUserLimitFactor(B, 100); + + final String C = CapacitySchedulerConfiguration.ROOT + ".c"; + conf.setCapacity(C, 70); + conf.setMaximumCapacity(C, 100); + conf.setUserLimitFactor(C, 100); + + return conf; + } /** * Get a queue structure: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/TestCapacitySchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/TestCapacitySchedulerPreemption.java new file mode 100644 index 0000000..09c4aea --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/TestCapacitySchedulerPreemption.java @@ -0,0 +1,170 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption; + +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMActiveServices; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityMonitorPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.util.Clock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestCapacitySchedulerPreemption { + private static final Log LOG = LogFactory + .getLog(TestCapacitySchedulerPreemption.class); + + private final int GB = 1024; + + private YarnConfiguration conf; + + RMNodeLabelsManager mgr; + + Clock clock; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, + ProportionalCapacityMonitorPolicy.class, + SchedulingEditPolicy.class); + mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + clock = mock(Clock.class); + when(clock.getTime()).thenReturn(0L); + } + + private SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) { + RMActiveServices activeServices = rm.getRMActiveService(); + SchedulingMonitor mon = null; + for (Service service : activeServices.getServices()) { + if (service instanceof SchedulingMonitor) { + mon = (SchedulingMonitor) service; + break; + } + } + + if (mon != null) { + return mon.getSchedulingEditPolicy(); + } + return null; + } + + + @Test //(timeout = 60000) + public void testSimplePreemptionWithDifferentContainersSize() throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + *
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * 
+ * + * Two nodes in the cluster, each of them has 4G. + * + * app1 submit to queue-a first, it asked 8 * 1G containers, so there's no + * more resource available. + * + * app2 submit to queue-b, ask for one 4G container, it should preempt 4 * + * 1G containers + */ + MockRM rm1 = new MockRM(TestUtils.getConfigurationWithMultipleQueues(conf)); + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 7, new ArrayList()); + + // Do allocation 4 times for node1/node2 + for (int i = 0; i < 4; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // App1 should have 8 containers now, and no available resource for cluster + FiCaSchedulerApp schedulerApp1 = + cs.getApplicationAttempt(am1.getApplicationAttemptId()); + + // Check if a 4G container allocated for app1, and nothing allocated for app2 + Assert.assertEquals(8, schedulerApp1.getLiveContainers().size()); + + // NM1/NM2 has available resource = 0G + Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) + .getAvailableResource().getMemory()); + Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId()) + .getAvailableResource().getMemory()); + + // Submit app2 to queue-b and asks for a 4G container for AM + RMApp app2 = rm1.submitApp(4 * GB, "app", "user", null, "b"); + + // Get edit policy and do one update + SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); + editPolicy.editSchedule(); + PreemptionManager pm = cs.getPreemptionManager(); + pm.setClock(clock); + + // Do a dryrun node update for nm1 + // It shouldn't update to-preempt container since AM container is on nm1 + cs.allocateContainersToNode(cs.getNode(nm1.getNodeId()), true); + Assert.assertEquals(0, + pm.preemptionReleationshipManager.toPreemptContainers.size()); + + // Do a dryrun node update for nm2 + // All 4 containers from app1 should be listed on to-preempt containers + cs.allocateContainersToNode(cs.getNode(nm2.getNodeId()), true); + Assert.assertEquals(4, + pm.preemptionReleationshipManager.toPreemptContainers.size()); + + // After wait-before-kill time, do another dryrun for nm2, check if containers added + // to to-kill list + when(clock.getTime()).thenReturn(pm.getWaitBeforeKillMs() + 1); + cs.allocateContainersToNode(cs.getNode(nm2.getNodeId()), true); + Assert.assertEquals(4, pm.toKillContainers.size()); + + cs.allocateContainersToNode(cs.getNode(nm2.getNodeId()), false); + FiCaSchedulerApp schedulerApp2 = + cs.getSchedulerApplications().get(app2.getApplicationId()) + .getCurrentAppAttempt(); + Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); + Assert.assertEquals(4, schedulerApp1.getLiveContainers().size()); + + rm1.close(); + } +}