diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java new file mode 100644 index 0000000..a0f5cda --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; + +import java.util.Collection; +import java.util.Set; + +interface CapacitySchedulerPreemptionContext { + CapacityScheduler getScheduler(); + + TempQueuePerPartition getQueueByPartition(String queueName, + String partition); + + Collection getQueuePartitions(String queueName); + + ResourceCalculator getResourceCalculator(); + + RMContext getRMContext(); + + boolean isObserveOnly(); + + Set getKillableContainers(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java new file mode 100644 index 0000000..9f45c69 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.HashMap; +import java.util.Map; + +public class CapacitySchedulerPreemptionUtils { + public static Map getResToObtainByPartitionForLeafQueue( + CapacitySchedulerPreemptionContext context, String queueName, + Resource clusterResource) { + Map resToObtain = new HashMap<>(); + // compute resToObtainByPartition considered inter-queue preemption + for (TempQueuePerPartition qT : context.getQueuePartitions(queueName)) { + // Only add resToObtain when it >= 0 + if (Resources.greaterThan(context.getResourceCalculator(), + clusterResource, qT.actuallyToBePreempted, Resources.none())) { + resToObtain.put(qT.partition, qT.actuallyToBePreempted); + } + } + + return resToObtain; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoPreemptionCandidatesSelectionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoPreemptionCandidatesSelectionPolicy.java new file mode 100644 index 0000000..19fbae7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoPreemptionCandidatesSelectionPolicy.java @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +public class FifoPreemptionCandidatesSelectionPolicy + extends PreemptionCandidatesSelectionPolicy { + private static final Log LOG = + LogFactory.getLog(FifoPreemptionCandidatesSelectionPolicy.class); + + FifoPreemptionCandidatesSelectionPolicy( + CapacitySchedulerPreemptionContext preemptionContext) { + super(preemptionContext); + } + + @Override + public Map> getPreemptionCandidates( + Map> selectedCandidates, + Set leafQueueNames, Resource clusterResource) { + + Map> preemptMap = + new HashMap<>(); + List skippedAMContainerlist = new ArrayList<>(); + + // Loop all leaf queues + for (String queueName : leafQueueNames) { + // check if preemption disabled for the queue + if (preemptionContext.getQueueByPartition(queueName, + RMNodeLabelsManager.NO_LABEL).preemptionDisabled) { + if (LOG.isDebugEnabled()) { + LOG.debug("skipping from queue=" + queueName + + " because it's a non-preemptable queue"); + } + continue; + } + + // compute resToObtainByPartition considered inter-queue preemption + LeafQueue leafQueue = preemptionContext.getQueueByPartition(queueName, + RMNodeLabelsManager.NO_LABEL).leafQueue; + + Map resToObtainByPartition = + CapacitySchedulerPreemptionUtils + .getResToObtainByPartitionForLeafQueue(preemptionContext, + queueName, clusterResource); + + synchronized (leafQueue) { + // go through all ignore-partition-exclusivity containers first to make + // sure such containers will be preemptionCandidates first + Map> ignorePartitionExclusivityContainers = + leafQueue.getIgnoreExclusivityRMContainers(); + for (String partition : resToObtainByPartition.keySet()) { + if (ignorePartitionExclusivityContainers.containsKey(partition)) { + TreeSet rmContainers = + ignorePartitionExclusivityContainers.get(partition); + // We will check container from reverse order, so latter submitted + // application's containers will be preemptionCandidates first. + for (RMContainer c : rmContainers.descendingSet()) { + boolean preempted = + tryPreemptContainerAndDeductResToObtain( + resToObtainByPartition, c, clusterResource, preemptMap); + if (!preempted) { + break; + } + } + } + } + + // preempt other containers + Resource skippedAMSize = Resource.newInstance(0, 0); + Iterator desc = + leafQueue.getOrderingPolicy().getPreemptionIterator(); + while (desc.hasNext()) { + FiCaSchedulerApp fc = desc.next(); + // When we complete preempt from one partition, we will remove from + // resToObtainByPartition, so when it becomes empty, we can get no + // more preemption is needed + if (resToObtainByPartition.isEmpty()) { + break; + } + + preemptFrom(fc, clusterResource, resToObtainByPartition, + skippedAMContainerlist, skippedAMSize, preemptMap); + } + + // Can try preempting AMContainers (still saving atmost + // maxAMCapacityForThisQueue AMResource's) if more resources are + // required to be preemptionCandidates from this Queue. + Resource maxAMCapacityForThisQueue = Resources.multiply( + Resources.multiply(clusterResource, + leafQueue.getAbsoluteCapacity()), + leafQueue.getMaxAMResourcePerQueuePercent()); + + preemptAMContainers(clusterResource, preemptMap, skippedAMContainerlist, + resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue); + } + } + + return preemptMap; + } + + /** + * As more resources are needed for preemption, saved AMContainers has to be + * rescanned. Such AMContainers can be preemptionCandidates based on resToObtain, but + * maxAMCapacityForThisQueue resources will be still retained. + * + * @param clusterResource + * @param preemptMap + * @param skippedAMContainerlist + * @param skippedAMSize + * @param maxAMCapacityForThisQueue + */ + private void preemptAMContainers(Resource clusterResource, + Map> preemptMap, + List skippedAMContainerlist, + Map resToObtainByPartition, Resource skippedAMSize, + Resource maxAMCapacityForThisQueue) { + for (RMContainer c : skippedAMContainerlist) { + // Got required amount of resources for preemption, can stop now + if (resToObtainByPartition.isEmpty()) { + break; + } + // Once skippedAMSize reaches down to maxAMCapacityForThisQueue, + // container selection iteration for preemption will be stopped. + if (Resources.lessThanOrEqual(rc, clusterResource, skippedAMSize, + maxAMCapacityForThisQueue)) { + break; + } + + boolean preempted = + tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, + clusterResource, preemptMap); + if (preempted) { + Resources.subtractFrom(skippedAMSize, c.getAllocatedResource()); + } + } + skippedAMContainerlist.clear(); + } + + private boolean preemptMapContains( + Map> preemptMap, + ApplicationAttemptId attemptId, RMContainer rmContainer) { + Set rmContainers; + if (null == (rmContainers = preemptMap.get(attemptId))) { + return false; + } + return rmContainers.contains(rmContainer); + } + + /** + * Return should we preempt rmContainer. If we should, deduct from + * resourceToObtainByPartition + */ + private boolean tryPreemptContainerAndDeductResToObtain( + Map resourceToObtainByPartitions, + RMContainer rmContainer, Resource clusterResource, + Map> preemptMap) { + ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId(); + + // We will not account resource of a container twice or more + if (preemptMapContains(preemptMap, attemptId, rmContainer)) { + return false; + } + + String nodePartition = getPartitionByNodeId(rmContainer.getAllocatedNode()); + Resource toObtainByPartition = + resourceToObtainByPartitions.get(nodePartition); + + if (null != toObtainByPartition + && Resources.greaterThan(rc, clusterResource, toObtainByPartition, + Resources.none())) { + Resources.subtractFrom(toObtainByPartition, + rmContainer.getAllocatedResource()); + // When we have no more resource need to obtain, remove from map. + if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition, + Resources.none())) { + resourceToObtainByPartitions.remove(nodePartition); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Marked container=" + rmContainer.getContainerId() + + " in partition=" + nodePartition + " will be preemptionCandidates"); + } + // Add to preemptMap + addToPreemptMap(preemptMap, attemptId, rmContainer); + return true; + } + + return false; + } + + private String getPartitionByNodeId(NodeId nodeId) { + return preemptionContext.getScheduler().getSchedulerNode(nodeId) + .getPartition(); + } + + /** + * Given a target preemption for a specific application, select containers + * to preempt (after unreserving all reservation for that app). + */ + @SuppressWarnings("unchecked") + private void preemptFrom(FiCaSchedulerApp app, + Resource clusterResource, Map resToObtainByPartition, + List skippedAMContainerlist, Resource skippedAMSize, + Map> preemptMap) { + ApplicationAttemptId appId = app.getApplicationAttemptId(); + if (LOG.isDebugEnabled()) { + LOG.debug("Looking at application=" + app.getApplicationAttemptId() + + " resourceToObtain=" + resToObtainByPartition); + } + + // first drop reserved containers towards rsrcPreempt + List reservedContainers = + new ArrayList<>(app.getReservedContainers()); + for (RMContainer c : reservedContainers) { + if (resToObtainByPartition.isEmpty()) { + return; + } + + // Try to preempt this container + tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, + clusterResource, preemptMap); + + if (!preemptionContext.isObserveOnly()) { + preemptionContext.getRMContext().getDispatcher().getEventHandler() + .handle(new ContainerPreemptEvent(appId, c, + SchedulerEventType.KILL_RESERVED_CONTAINER)); + } + } + + // if more resources are to be freed go through all live containers in + // reverse priority and reverse allocation order and mark them for + // preemption + List liveContainers = + new ArrayList<>(app.getLiveContainers()); + + sortContainers(liveContainers); + + for (RMContainer c : liveContainers) { + if (resToObtainByPartition.isEmpty()) { + return; + } + + // Skip already marked to killable containers + if (preemptionContext.getKillableContainers().contains( + c.getContainerId())) { + continue; + } + + // Skip AM Container from preemption for now. + if (c.isAMContainer()) { + skippedAMContainerlist.add(c); + Resources.addTo(skippedAMSize, c.getAllocatedResource()); + continue; + } + + // Try to preempt this container + tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, + clusterResource, preemptMap); + } + } + + /** + * Compare by reversed priority order first, and then reversed containerId + * order + * @param containers + */ + @VisibleForTesting + static void sortContainers(List containers){ + Collections.sort(containers, new Comparator() { + @Override + public int compare(RMContainer a, RMContainer b) { + Comparator c = new org.apache.hadoop.yarn.server + .resourcemanager.resource.Priority.Comparator(); + int priorityComp = c.compare(b.getContainer().getPriority(), + a.getContainer().getPriority()); + if (priorityComp != 0) { + return priorityComp; + } + return b.getContainerId().compareTo(a.getContainerId()); + } + }); + } + + private void addToPreemptMap( + Map> preemptMap, + ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) { + Set set; + if (null == (set = preemptMap.get(appAttemptId))) { + set = new HashSet<>(); + preemptMap.put(appAttemptId, set); + } + set.add(containerToPreempt); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelectionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelectionPolicy.java new file mode 100644 index 0000000..cc29738 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelectionPolicy.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; + +import java.util.Map; +import java.util.Set; + +public abstract class PreemptionCandidatesSelectionPolicy { + protected CapacitySchedulerPreemptionContext preemptionContext; + protected ResourceCalculator rc; + + PreemptionCandidatesSelectionPolicy( + CapacitySchedulerPreemptionContext preemptionContext) { + this.preemptionContext = preemptionContext; + this.rc = preemptionContext.getResourceCalculator(); + } + + /** + * Get preemption candidates from computed resource sharing and already + * selected candiates. + * + * @param selectedCandidates already selected candidates from previous policies + * @param leafQueueNames + * @param clusterResource + * @return merged selected candidates. + */ + public abstract Map> getPreemptionCandidates( + Map> selectedCandidates, + Set leafQueueNames, Resource clusterResource); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 9b499c8..a83d223 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -17,26 +17,13 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.PriorityQueue; -import java.util.Set; -import java.util.TreeSet; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -50,7 +37,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptableQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.util.Clock; @@ -58,8 +44,18 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSet; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; /** * This class implement a {@link SchedulingEditPolicy} that is designed to be @@ -80,7 +76,8 @@ * this policy will trigger forced termination of containers (again by generating * {@link ContainerPreemptEvent}). */ -public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolicy { +public class ProportionalCapacityPreemptionPolicy + implements SchedulingEditPolicy, CapacitySchedulerPreemptionContext { private static final Log LOG = LogFactory.getLog(ProportionalCapacityPreemptionPolicy.class); @@ -96,7 +93,7 @@ * and killing the container. */ public static final String WAIT_TIME_BEFORE_KILL = "yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill"; - /** Maximum percentage of resources preempted in a single round. By + /** Maximum percentage of resources preemptionCandidates in a single round. By * controlling this value one can throttle the pace at which containers are * reclaimed from the cluster. After computing the total desired preemption, * the policy scales it back within this limit. */ @@ -126,7 +123,9 @@ private long maxWaitTime; private CapacityScheduler scheduler; private long monitoringInterval; - private final Map preempted = new HashMap<>(); + + private final Map preemptionCandidates = + new HashMap<>(); private ResourceCalculator rc; private float percentageClusterPreemptionAllowed; @@ -135,6 +134,8 @@ private Map> queueToPartitions = new HashMap<>(); private RMNodeLabelsManager nlm; + private List + candidatesSelectionPolicies = new ArrayList<>(); // Preemptable Entities, synced from scheduler at every run private Map preemptableEntities = null; @@ -176,9 +177,13 @@ public void init(Configuration config, RMContext context, observeOnly = config.getBoolean(OBSERVE_ONLY, false); rc = scheduler.getResourceCalculator(); nlm = scheduler.getRMContext().getNodeLabelManager(); + + // initialize candidates preemption selection policies + candidatesSelectionPolicies.add( + new FifoPreemptionCandidatesSelectionPolicy(this)); } - @VisibleForTesting + @Override public ResourceCalculator getResourceCalculator() { return rc; } @@ -199,7 +204,8 @@ private void cleanupStaledKillableContainers(Resource cluster, // to check if any of killable containers needs to be reverted if (Resources.lessThanOrEqual(rc, cluster, Resources.subtract(tq.current, tq.killable), tq.idealAssigned) - && Resources.greaterThan(rc, cluster, tq.killable, Resources.none())) { + && Resources.greaterThan(rc, cluster, tq.killable, + Resources.none())) { // How many killable resources need to be reverted // need-to-revert = already-marked-killable - (current - ideal) Resource toBeRevertedFromKillable = Resources.subtract(tq.killable, @@ -226,7 +232,43 @@ private void cleanupStaledKillableContainers(Resource cluster, SchedulerEventType.MARK_CONTAINER_FOR_NONKILLABLE)); } } + } + } + } + } + @SuppressWarnings("unchecked") + private void killSelectedContainerAfterWait( + Map> selectedCandidates) { + // preempt (or kill) the selected containers + for (Map.Entry> e : selectedCandidates + .entrySet()) { + ApplicationAttemptId appAttemptId = e.getKey(); + if (LOG.isDebugEnabled()) { + LOG.debug("Send to scheduler: in app=" + appAttemptId + + " #containers-to-be-preemptionCandidates=" + e.getValue().size()); + } + for (RMContainer container : e.getValue()) { + // if we tried to preempt this for more than maxWaitTime + if (preemptionCandidates.get(container) != null + && preemptionCandidates.get(container) + maxWaitTime < clock + .getTime()) { + // kill it + rmContext.getDispatcher().getEventHandler().handle( + new ContainerPreemptEvent(appAttemptId, container, + SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE)); + preemptionCandidates.remove(container); + } else { + if (preemptionCandidates.get(container) != null) { + // We already updated the information to scheduler earlier, we need + // not have to raise another event. + continue; + } + //otherwise just send preemption events + rmContext.getDispatcher().getEventHandler().handle( + new ContainerPreemptEvent(appAttemptId, container, + SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION)); + preemptionCandidates.put(container, clock.getTime()); } } } @@ -247,9 +289,61 @@ private void syncKillableContainersFromScheduler() { } } } + + private void cleanupStaledPreemptionCandidates() { + // Keep the preemptionCandidates list clean + for (Iterator i = preemptionCandidates.keySet().iterator(); + i.hasNext(); ) { + RMContainer id = i.next(); + // garbage collect containers that are irrelevant for preemption + if (preemptionCandidates.get(id) + 2 * maxWaitTime < clock.getTime()) { + i.remove(); + } + } + } + + private void calculateResToObtainByPartitionForLeafQueues( + Set leafQueueNames, Resource clusterResource) { + // Loop all leaf queues + for (String queueName : leafQueueNames) { + // check if preemption disabled for the queue + if (getQueueByPartition(queueName, + RMNodeLabelsManager.NO_LABEL).preemptionDisabled) { + if (LOG.isDebugEnabled()) { + LOG.debug("skipping from queue=" + queueName + + " because it's a non-preemptable queue"); + } + continue; + } + + // compute resToObtainByPartition considered inter-queue preemption + for (TempQueuePerPartition qT : getQueuePartitions(queueName)) { + // we act only if we are violating balance by more than + // maxIgnoredOverCapacity + if (Resources.greaterThan(rc, clusterResource, qT.current, + Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) { + // we introduce a dampening factor naturalTerminationFactor that + // accounts for natural termination of containers + Resource resToObtain = Resources.multiply(qT.toBePreempted, + naturalTerminationFactor); + // Only add resToObtain when it >= 0 + if (Resources.greaterThan(rc, clusterResource, resToObtain, + Resources.none())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Queue=" + queueName + " partition=" + qT.partition + + " resource-to-obtain=" + resToObtain); + } + } + qT.actuallyToBePreempted = Resources.clone(resToObtain); + } else { + qT.actuallyToBePreempted = Resources.none(); + } + } + } + } /** - * This method selects and tracks containers to be preempted. If a container + * This method selects and tracks containers to be preemptionCandidates. If a container * is in the target list for more than maxWaitTime it is killed. * * @param root the root of the CapacityScheduler queue hierarchy @@ -298,9 +392,18 @@ private void containerBasedPreemptOrKill(CSQueue root, cleanupStaledKillableContainers(clusterResources, leafQueueNames); // based on ideal allocation select containers to be preempted from each + // calculate resource-to-obtain by partition for each leaf queues + calculateResToObtainByPartitionForLeafQueues(leafQueueNames, + clusterResources); + + // based on ideal allocation select containers to be preemptionCandidates from each // queue and each application - Map> toPreempt = - getContainersToPreempt(leafQueueNames, clusterResources); + Map> toPreempt = null; + for (PreemptionCandidatesSelectionPolicy selectionPolicy : + candidatesSelectionPolicies) { + toPreempt = selectionPolicy.getPreemptionCandidates(toPreempt, + leafQueueNames, clusterResources); + } if (LOG.isDebugEnabled()) { logToCSV(new ArrayList<>(leafQueueNames)); @@ -312,51 +415,16 @@ private void containerBasedPreemptOrKill(CSQueue root, } // preempt (or kill) the selected containers - for (Map.Entry> e - : toPreempt.entrySet()) { - ApplicationAttemptId appAttemptId = e.getKey(); - if (LOG.isDebugEnabled()) { - LOG.debug("Send to scheduler: in app=" + appAttemptId - + " #containers-to-be-preempted=" + e.getValue().size()); - } - for (RMContainer container : e.getValue()) { - // if we tried to preempt this for more than maxWaitTime - if (preempted.get(container) != null && - preempted.get(container) + maxWaitTime < clock.getTime()) { - // mark container killable - rmContext.getDispatcher().getEventHandler().handle( - new ContainerPreemptEvent(appAttemptId, container, - SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE)); - preempted.remove(container); - } else { - if (preempted.get(container) != null) { - // We already updated the information to scheduler earlier, we need - // not have to raise another event. - continue; - } - //otherwise just send preemption events - rmContext.getDispatcher().getEventHandler().handle( - new ContainerPreemptEvent(appAttemptId, container, - SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION)); - preempted.put(container, clock.getTime()); - } - } - } + killSelectedContainerAfterWait(toPreempt); - // Keep the preempted list clean - for (Iterator i = preempted.keySet().iterator(); i.hasNext();){ - RMContainer id = i.next(); - // garbage collect containers that are irrelevant for preemption - if (preempted.get(id) + 2 * maxWaitTime < clock.getTime()) { - i.remove(); - } - } + // cleanup staled preemption candidates + cleanupStaledPreemptionCandidates(); } /** * This method recursively computes the ideal assignment of resources to each * level of the hierarchy. This ensures that leafs that are over-capacity but - * with parents within capacity will not be preempted. Preemptions are allowed + * with parents within capacity will not be preemptionCandidates. Preemptions are allowed * within each subtree according to local over/under capacity. * * @param root the root of the cloned queue hierachy @@ -589,306 +657,6 @@ private String getPartitionByRMContainer(RMContainer rmContainer) { .getPartition(); } - /** - * Return should we preempt rmContainer. If we should, deduct from - * resourceToObtainByPartition - */ - private boolean tryPreemptContainerAndDeductResToObtain( - Map resourceToObtainByPartitions, - RMContainer rmContainer, Resource clusterResource, - Map> preemptMap) { - ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId(); - - // We will not account resource of a container twice or more - if (preemptMapContains(preemptMap, attemptId, rmContainer)) { - return false; - } - - String nodePartition = getPartitionByRMContainer(rmContainer); - Resource toObtainByPartition = - resourceToObtainByPartitions.get(nodePartition); - - if (null != toObtainByPartition - && Resources.greaterThan(rc, clusterResource, toObtainByPartition, - Resources.none())) { - Resources.subtractFrom(toObtainByPartition, - rmContainer.getAllocatedResource()); - // When we have no more resource need to obtain, remove from map. - if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition, - Resources.none())) { - resourceToObtainByPartitions.remove(nodePartition); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Marked container=" + rmContainer.getContainerId() - + " in partition=" + nodePartition + " will be preempted"); - } - // Add to preemptMap - addToPreemptMap(preemptMap, attemptId, rmContainer); - return true; - } - - return false; - } - - private boolean preemptMapContains( - Map> preemptMap, - ApplicationAttemptId attemptId, RMContainer rmContainer) { - Set rmContainers; - if (null == (rmContainers = preemptMap.get(attemptId))) { - return false; - } - return rmContainers.contains(rmContainer); - } - - private void addToPreemptMap( - Map> preemptMap, - ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) { - Set set; - if (null == (set = preemptMap.get(appAttemptId))) { - set = new HashSet<>(); - preemptMap.put(appAttemptId, set); - } - set.add(containerToPreempt); - } - - /** - * Based a resource preemption target drop reservations of containers and - * if necessary select containers for preemption from applications in each - * over-capacity queue. It uses {@link #NATURAL_TERMINATION_FACTOR} to - * account for containers that will naturally complete. - * - * @param leafQueueNames set of leaf queues to preempt from - * @param clusterResource total amount of cluster resources - * @return a map of applciationID to set of containers to preempt - */ - private Map> getContainersToPreempt( - Set leafQueueNames, Resource clusterResource) { - - Map> preemptMap = - new HashMap<>(); - List skippedAMContainerlist = new ArrayList<>(); - - // Loop all leaf queues - for (String queueName : leafQueueNames) { - // check if preemption disabled for the queue - if (getQueueByPartition(queueName, - RMNodeLabelsManager.NO_LABEL).preemptionDisabled) { - if (LOG.isDebugEnabled()) { - LOG.debug("skipping from queue=" + queueName - + " because it's a non-preemptable queue"); - } - continue; - } - - // compute resToObtainByPartition considered inter-queue preemption - LeafQueue leafQueue = null; - - Map resToObtainByPartition = - new HashMap<>(); - for (TempQueuePerPartition qT : getQueuePartitions(queueName)) { - leafQueue = qT.leafQueue; - // we act only if we are violating balance by more than - // maxIgnoredOverCapacity - if (Resources.greaterThan(rc, clusterResource, qT.current, - Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) { - // we introduce a dampening factor naturalTerminationFactor that - // accounts for natural termination of containers - Resource resToObtain = - Resources.multiply(qT.toBePreempted, naturalTerminationFactor); - // Only add resToObtain when it >= 0 - if (Resources.greaterThan(rc, clusterResource, resToObtain, - Resources.none())) { - resToObtainByPartition.put(qT.partition, resToObtain); - if (LOG.isDebugEnabled()) { - LOG.debug("Queue=" + queueName + " partition=" + qT.partition - + " resource-to-obtain=" + resToObtain); - } - } - qT.actuallyPreempted = Resources.clone(resToObtain); - } else { - qT.actuallyPreempted = Resources.none(); - } - } - - synchronized (leafQueue) { - // go through all ignore-partition-exclusivity containers first to make - // sure such containers will be preempted first - Map> ignorePartitionExclusivityContainers = - leafQueue.getIgnoreExclusivityRMContainers(); - for (String partition : resToObtainByPartition.keySet()) { - if (ignorePartitionExclusivityContainers.containsKey(partition)) { - TreeSet rmContainers = - ignorePartitionExclusivityContainers.get(partition); - // We will check container from reverse order, so latter submitted - // application's containers will be preempted first. - for (RMContainer c : rmContainers.descendingSet()) { - boolean preempted = - tryPreemptContainerAndDeductResToObtain( - resToObtainByPartition, c, clusterResource, preemptMap); - if (!preempted) { - break; - } - } - } - } - - // preempt other containers - Resource skippedAMSize = Resource.newInstance(0, 0); - Iterator desc = - leafQueue.getOrderingPolicy().getPreemptionIterator(); - while (desc.hasNext()) { - FiCaSchedulerApp fc = desc.next(); - // When we complete preempt from one partition, we will remove from - // resToObtainByPartition, so when it becomes empty, we can get no - // more preemption is needed - if (resToObtainByPartition.isEmpty()) { - break; - } - - preemptFrom(fc, clusterResource, resToObtainByPartition, - skippedAMContainerlist, skippedAMSize, preemptMap); - } - - // Can try preempting AMContainers (still saving atmost - // maxAMCapacityForThisQueue AMResource's) if more resources are - // required to be preempted from this Queue. - Resource maxAMCapacityForThisQueue = Resources.multiply( - Resources.multiply(clusterResource, - leafQueue.getAbsoluteCapacity()), - leafQueue.getMaxAMResourcePerQueuePercent()); - - preemptAMContainers(clusterResource, preemptMap, skippedAMContainerlist, - resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue); - } - } - - return preemptMap; - } - - /** - * As more resources are needed for preemption, saved AMContainers has to be - * rescanned. Such AMContainers can be preempted based on resToObtain, but - * maxAMCapacityForThisQueue resources will be still retained. - * - * @param clusterResource - * @param preemptMap - * @param skippedAMContainerlist - * @param skippedAMSize - * @param maxAMCapacityForThisQueue - */ - private void preemptAMContainers(Resource clusterResource, - Map> preemptMap, - List skippedAMContainerlist, - Map resToObtainByPartition, Resource skippedAMSize, - Resource maxAMCapacityForThisQueue) { - for (RMContainer c : skippedAMContainerlist) { - // Got required amount of resources for preemption, can stop now - if (resToObtainByPartition.isEmpty()) { - break; - } - // Once skippedAMSize reaches down to maxAMCapacityForThisQueue, - // container selection iteration for preemption will be stopped. - if (Resources.lessThanOrEqual(rc, clusterResource, skippedAMSize, - maxAMCapacityForThisQueue)) { - break; - } - - boolean preempted = - tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, - clusterResource, preemptMap); - if (preempted) { - Resources.subtractFrom(skippedAMSize, c.getAllocatedResource()); - } - } - skippedAMContainerlist.clear(); - } - - /** - * Given a target preemption for a specific application, select containers - * to preempt (after unreserving all reservation for that app). - */ - @SuppressWarnings("unchecked") - private void preemptFrom(FiCaSchedulerApp app, - Resource clusterResource, Map resToObtainByPartition, - List skippedAMContainerlist, Resource skippedAMSize, - Map> preemptMap) { - ApplicationAttemptId appId = app.getApplicationAttemptId(); - if (LOG.isDebugEnabled()) { - LOG.debug("Looking at application=" + app.getApplicationAttemptId() - + " resourceToObtain=" + resToObtainByPartition); - } - - // first drop reserved containers towards rsrcPreempt - List reservedContainers = - new ArrayList<>(app.getReservedContainers()); - for (RMContainer c : reservedContainers) { - if (resToObtainByPartition.isEmpty()) { - return; - } - - // Try to preempt this container - tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, - clusterResource, preemptMap); - - if (!observeOnly) { - rmContext.getDispatcher().getEventHandler().handle( - new ContainerPreemptEvent( - appId, c, SchedulerEventType.KILL_RESERVED_CONTAINER)); - } - } - - // if more resources are to be freed go through all live containers in - // reverse priority and reverse allocation order and mark them for - // preemption - List liveContainers = new ArrayList<>(app.getLiveContainers()); - - sortContainers(liveContainers); - - for (RMContainer c : liveContainers) { - if (resToObtainByPartition.isEmpty()) { - return; - } - - // Skip AM Container from preemption for now. - if (c.isAMContainer()) { - skippedAMContainerlist.add(c); - Resources.addTo(skippedAMSize, c.getAllocatedResource()); - continue; - } - - // Skip already marked to killable containers - if (killableContainers.contains(c.getContainerId())) { - continue; - } - - // Try to preempt this container - tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, - clusterResource, preemptMap); - } - } - - /** - * Compare by reversed priority order first, and then reversed containerId - * order - * @param containers - */ - @VisibleForTesting - static void sortContainers(List containers){ - Collections.sort(containers, new Comparator() { - @Override - public int compare(RMContainer a, RMContainer b) { - Comparator c = new org.apache.hadoop.yarn.server - .resourcemanager.resource.Priority.Comparator(); - int priorityComp = c.compare(b.getContainer().getPriority(), - a.getContainer().getPriority()); - if (priorityComp != 0) { - return priorityComp; - } - return b.getContainerId().compareTo(a.getContainerId()); - } - }); - } - @Override public long getMonitoringInterval() { return monitoringInterval; @@ -901,7 +669,7 @@ public String getPolicyName() { @VisibleForTesting public Map getToPreemptContainers() { - return preempted; + return preemptionCandidates; } /** @@ -1023,7 +791,8 @@ private void addTempQueuePartition(TempQueuePerPartition queuePartition) { /** * Get queue partition by given queueName and partitionName */ - private TempQueuePerPartition getQueueByPartition(String queueName, + @Override + public TempQueuePerPartition getQueueByPartition(String queueName, String partition) { Map partitionToQueues = null; if (null == (partitionToQueues = queueToPartitions.get(queueName))) { @@ -1035,142 +804,32 @@ private TempQueuePerPartition getQueueByPartition(String queueName, /** * Get all queue partitions by given queueName */ - private Collection getQueuePartitions(String queueName) { + @Override + public Collection getQueuePartitions(String queueName) { if (!queueToPartitions.containsKey(queueName)) { return null; } return queueToPartitions.get(queueName).values(); } - /** - * Temporary data-structure tracking resource availability, pending resource - * need, current utilization. This is per-queue-per-partition data structure - */ - static class TempQueuePerPartition { - final String queueName; - final Resource current; - final Resource pending; - final Resource guaranteed; - final Resource maxCapacity; - final String partition; - final Resource killable; - Resource idealAssigned; - Resource toBePreempted; - - // For logging purpose - Resource actuallyPreempted; - Resource untouchableExtra; - Resource preemptableExtra; - - double normalizedGuarantee; - - final ArrayList children; - LeafQueue leafQueue; - boolean preemptionDisabled; - - TempQueuePerPartition(String queueName, Resource current, Resource pending, - Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled, - String partition, Resource killableResource) { - this.queueName = queueName; - this.current = current; - this.pending = pending; - this.guaranteed = guaranteed; - this.maxCapacity = maxCapacity; - this.idealAssigned = Resource.newInstance(0, 0); - this.actuallyPreempted = Resource.newInstance(0, 0); - this.toBePreempted = Resource.newInstance(0, 0); - this.normalizedGuarantee = Float.NaN; - this.children = new ArrayList<>(); - this.untouchableExtra = Resource.newInstance(0, 0); - this.preemptableExtra = Resource.newInstance(0, 0); - this.preemptionDisabled = preemptionDisabled; - this.partition = partition; - this.killable = killableResource; - } - - public void setLeafQueue(LeafQueue l){ - assert children.size() == 0; - this.leafQueue = l; - } - - /** - * When adding a child we also aggregate its pending resource needs. - * @param q the child queue to add to this queue - */ - public void addChild(TempQueuePerPartition q) { - assert leafQueue == null; - children.add(q); - Resources.addTo(pending, q.pending); - } - - public ArrayList getChildren(){ - return children; - } - - // This function "accepts" all the resources it can (pending) and return - // the unused ones - Resource offer(Resource avail, ResourceCalculator rc, - Resource clusterResource) { - Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax( - Resources.subtract(maxCapacity, idealAssigned), - Resource.newInstance(0, 0)); - // remain = avail - min(avail, (max - assigned), (current + pending - assigned)) - Resource accepted = - Resources.min(rc, clusterResource, - absMaxCapIdealAssignedDelta, - Resources.min(rc, clusterResource, avail, Resources.subtract( - Resources.add(current, pending), idealAssigned))); - Resource remain = Resources.subtract(avail, accepted); - Resources.addTo(idealAssigned, accepted); - return remain; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(" NAME: " + queueName) - .append(" CUR: ").append(current) - .append(" PEN: ").append(pending) - .append(" GAR: ").append(guaranteed) - .append(" NORM: ").append(normalizedGuarantee) - .append(" IDEAL_ASSIGNED: ").append(idealAssigned) - .append(" IDEAL_PREEMPT: ").append(toBePreempted) - .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted) - .append(" UNTOUCHABLE: ").append(untouchableExtra) - .append(" PREEMPTABLE: ").append(preemptableExtra) - .append("\n"); - - return sb.toString(); - } + @Override + public CapacityScheduler getScheduler() { + return scheduler; + } - public void assignPreemption(float scalingFactor, - ResourceCalculator rc, Resource clusterResource) { - if (Resources.greaterThan(rc, clusterResource, - Resources.subtract(current, killable), idealAssigned)) { - toBePreempted = Resources.multiply(Resources.subtract( - Resources.subtract(current, killable), idealAssigned), - scalingFactor); - } else { - toBePreempted = Resource.newInstance(0, 0); - } - } + @Override + public RMContext getRMContext() { + return rmContext; + } - void appendLogString(StringBuilder sb) { - sb.append(queueName).append(", ") - .append(current.getMemory()).append(", ") - .append(current.getVirtualCores()).append(", ") - .append(pending.getMemory()).append(", ") - .append(pending.getVirtualCores()).append(", ") - .append(guaranteed.getMemory()).append(", ") - .append(guaranteed.getVirtualCores()).append(", ") - .append(idealAssigned.getMemory()).append(", ") - .append(idealAssigned.getVirtualCores()).append(", ") - .append(toBePreempted.getMemory()).append(", ") - .append(toBePreempted.getVirtualCores() ).append(", ") - .append(actuallyPreempted.getMemory()).append(", ") - .append(actuallyPreempted.getVirtualCores()); - } + @Override + public boolean isObserveOnly() { + return observeOnly; + } + @Override + public Set getKillableContainers() { + return killableContainers; } static class TQComparator implements Comparator { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java new file mode 100644 index 0000000..e2b8beb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.ArrayList; + +/** + * Temporary data-structure tracking resource availability, pending resource + * need, current utilization. This is per-queue-per-partition data structure + */ +public class TempQueuePerPartition { + final String queueName; + final Resource current; + final Resource pending; + final Resource guaranteed; + final Resource maxCapacity; + final Resource killable; + final String partition; + Resource idealAssigned; + Resource toBePreempted; + // For logging purpose + Resource actuallyToBePreempted; + Resource untouchableExtra; + Resource preemptableExtra; + + double normalizedGuarantee; + + final ArrayList children; + LeafQueue leafQueue; + boolean preemptionDisabled; + + TempQueuePerPartition(String queueName, Resource current, Resource pending, + Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled, + String partition, Resource killable) { + this.queueName = queueName; + this.current = current; + this.pending = pending; + this.guaranteed = guaranteed; + this.maxCapacity = maxCapacity; + this.idealAssigned = Resource.newInstance(0, 0); + this.actuallyToBePreempted = Resource.newInstance(0, 0); + this.toBePreempted = Resource.newInstance(0, 0); + this.normalizedGuarantee = Float.NaN; + this.children = new ArrayList<>(); + this.untouchableExtra = Resource.newInstance(0, 0); + this.preemptableExtra = Resource.newInstance(0, 0); + this.preemptionDisabled = preemptionDisabled; + this.partition = partition; + this.killable = killable; + } + + public void setLeafQueue(LeafQueue l){ + assert children.size() == 0; + this.leafQueue = l; + } + + /** + * When adding a child we also aggregate its pending resource needs. + * @param q the child queue to add to this queue + */ + public void addChild(TempQueuePerPartition q) { + assert leafQueue == null; + children.add(q); + Resources.addTo(pending, q.pending); + } + + public void addChildren(ArrayList queues) { + assert leafQueue == null; + children.addAll(queues); + } + + + public ArrayList getChildren(){ + return children; + } + + // This function "accepts" all the resources it can (pending) and return + // the unused ones + Resource offer(Resource avail, ResourceCalculator rc, + Resource clusterResource) { + Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax( + Resources.subtract(maxCapacity, idealAssigned), + Resource.newInstance(0, 0)); + // remain = avail - min(avail, (max - assigned), (current + pending - assigned)) + Resource accepted = + Resources.min(rc, clusterResource, + absMaxCapIdealAssignedDelta, + Resources.min(rc, clusterResource, avail, Resources.subtract( + Resources.add(current, pending), idealAssigned))); + Resource remain = Resources.subtract(avail, accepted); + Resources.addTo(idealAssigned, accepted); + return remain; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(" NAME: " + queueName) + .append(" CUR: ").append(current) + .append(" PEN: ").append(pending) + .append(" GAR: ").append(guaranteed) + .append(" NORM: ").append(normalizedGuarantee) + .append(" IDEAL_ASSIGNED: ").append(idealAssigned) + .append(" IDEAL_PREEMPT: ").append(toBePreempted) + .append(" ACTUAL_PREEMPT: ").append(actuallyToBePreempted) + .append(" UNTOUCHABLE: ").append(untouchableExtra) + .append(" PREEMPTABLE: ").append(preemptableExtra) + .append("\n"); + + return sb.toString(); + } + + public void assignPreemption(float scalingFactor, ResourceCalculator rc, + Resource clusterResource) { + if (Resources.greaterThan(rc, clusterResource, + Resources.subtract(current, killable), idealAssigned)) { + toBePreempted = Resources.multiply(Resources + .subtract(Resources.subtract(current, killable), idealAssigned), + scalingFactor); + } else { + toBePreempted = Resource.newInstance(0, 0); + } + } + + void appendLogString(StringBuilder sb) { + sb.append(queueName).append(", ") + .append(current.getMemory()).append(", ") + .append(current.getVirtualCores()).append(", ") + .append(pending.getMemory()).append(", ") + .append(pending.getVirtualCores()).append(", ") + .append(guaranteed.getMemory()).append(", ") + .append(guaranteed.getVirtualCores()).append(", ") + .append(idealAssigned.getMemory()).append(", ") + .append(idealAssigned.getVirtualCores()).append(", ") + .append(toBePreempted.getMemory()).append(", ") + .append(toBePreempted.getVirtualCores() ).append(", ") + .append(actuallyToBePreempted.getMemory()).append(", ") + .append(actuallyToBePreempted.getVirtualCores()); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index e9129de..f56f6d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -17,38 +17,6 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.argThat; -import static org.mockito.Matchers.isA; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.util.ArrayList; -import java.util.Comparator; -import java.util.Deque; -import java.util.LinkedList; -import java.util.List; -import java.util.NavigableSet; -import java.util.Random; -import java.util.StringTokenizer; -import java.util.TreeSet; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -63,7 +31,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; -import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TempQueuePerPartition; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -95,6 +62,38 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.NavigableSet; +import java.util.Random; +import java.util.StringTokenizer; +import java.util.TreeSet; + +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + public class TestProportionalCapacityPreemptionPolicy { static final long TS = 3141592653L; @@ -735,7 +734,7 @@ public void testContainerOrdering(){ containers.add(rm4); // sort them - ProportionalCapacityPreemptionPolicy.sortContainers(containers); + FifoPreemptionCandidatesSelectionPolicy.sortContainers(containers); // verify the "priority"-first, "reverse container-id"-second // ordering is enforced correctly