diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index f9b017d..80c5296 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -131,6 +131,15 @@ RM_PREFIX + "scheduler.client.thread-count"; public static final int DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT = 50; + /** Enable preemption-related monitor threads. */ + public static final String RM_SCHEDULER_ENABLE_PREEMPTION = + RM_PREFIX + "scheduler.preemption.enable"; + public static final boolean DEFAULT_RM_SCHEDULER_ENABLE_PREEMPTION = false; + + /** List of ScheduleEditPolicy classes affecting scheduler preemption. */ + public static final String RM_SCHEDULER_PREEMPTION_POLICIES = + RM_PREFIX + "scheduler.preemption.policies"; + /** The address of the RM web application.*/ public static final String RM_WEBAPP_ADDRESS = RM_PREFIX + "webapp.address"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/PreemptableResourceScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/PreemptableResourceScheduler.java new file mode 100644 index 0000000..772afa6 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/PreemptableResourceScheduler.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; + +/** + * Interface for a scheduler that supports preemption/killing + * + */ +public interface PreemptableResourceScheduler extends ResourceScheduler { + + /** + * If the scheduler support container reservations, this method is used to + * ask the scheduler to drop the reservation for the given container. + */ + void dropContainerReservation(RMContainer container); + + /** + * Ask the scheduler to obtain back the container from a specific application + * by issuing a preemption request + * @param aid the application from which we want to get a container back + * @param container the container we want back + */ + void preemptContainer(ApplicationAttemptId aid, RMContainer container); + + /** + * Ask the scheduler to forcibly interrupt the container given as input + * @param container + */ + void killContainer(RMContainer container); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContainerPreemptEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContainerPreemptEvent.java new file mode 100644 index 0000000..ebda500 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContainerPreemptEvent.java @@ -0,0 +1,56 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + +/** + * Simple event class used to communicate containers unreservations, preemption, killing + */ +public class RMContainerPreemptEvent extends AbstractEvent{ + + private final ApplicationAttemptId aid; + private final RMContainer container; + + public RMContainerPreemptEvent(ApplicationAttemptId aid, RMContainer container, + RMContainerPreemptEventType type) { + super(type); + this.aid = aid; + this.container = container; + } + + public RMContainer getContainer(){ + return this.container; + } + + public ApplicationAttemptId getAppId() { + return aid; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(super.toString()); + sb.append(" ").append(getAppId()); + sb.append(" ").append(getContainer().getContainerId()); + return sb.toString(); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContainerPreemptEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContainerPreemptEventType.java new file mode 100644 index 0000000..934724a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContainerPreemptEventType.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager; + +public enum RMContainerPreemptEventType { + + DROP_RESERVATION, + PREEMPT_CONTAINER, + KILL_CONTAINER + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 4cd1969..1df9e7f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -18,8 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager; - import java.io.IOException; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; @@ -46,6 +47,8 @@ import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.ScheduleEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.ScheduleMonitor; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; @@ -59,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; @@ -229,6 +233,9 @@ public synchronized void init(Configuration conf) { } catch (IOException ioe) { throw new RuntimeException("Failed to initialize scheduler", ioe); } + + // creating monitors that handle preemption + createPreemptionPolicyMonitors(); masterService = createApplicationMasterService(); addService(masterService) ; @@ -303,7 +310,8 @@ protected ResourceScheduler createScheduler() { } catch (ClassNotFoundException e) { throw new YarnException("Could not instantiate Scheduler: " + schedulerClassName, e); - } } + } + } protected ApplicationMasterLauncher createAMLauncher() { return new ApplicationMasterLauncher(this.rmContext); @@ -501,6 +509,35 @@ public void handle(RMAppEvent event) { } @Private + public static final class + RMContainerPreemptEventDispatcher + implements EventHandler { + + private final T scheduler; + + public RMContainerPreemptEventDispatcher(ResourceScheduler scheduler) { + this.scheduler = (T) scheduler; + } + + @Override + public void handle(RMContainerPreemptEvent event) { + ApplicationAttemptId aid = event.getAppId(); + RMContainer container = event.getContainer(); + switch (event.getType()) { + case DROP_RESERVATION: + scheduler.dropContainerReservation(container); + break; + case PREEMPT_CONTAINER: + scheduler.preemptContainer(aid, container); + break; + case KILL_CONTAINER: + scheduler.killContainer(container); + break; + } + } + } + + @Private public static final class ApplicationAttemptEventDispatcher implements EventHandler { @@ -689,7 +726,37 @@ protected ClientRMService createClientRMService() { protected ApplicationMasterService createApplicationMasterService() { return new ApplicationMasterService(this.rmContext, scheduler); } - + + protected void createPreemptionPolicyMonitors() { + if (this.scheduler instanceof PreemptableResourceScheduler + && conf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_PREEMPTION, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_PREEMPTION)) { + LOG.info("Turning on preemption"); + List policies = conf.getInstances( + YarnConfiguration.RM_SCHEDULER_PREEMPTION_POLICIES, + ScheduleEditPolicy.class); + if (policies.size() > 0) { + this.rmDispatcher.register(RMContainerPreemptEventType.class, + new RMContainerPreemptEventDispatcher( + this.scheduler)); + for (ScheduleEditPolicy policy : policies) { + LOG.info("LOADING ScheduleEditPolicy:" + policy.toString()); + policy.init(conf, this.rmContext.getDispatcher().getEventHandler(), + (PreemptableResourceScheduler) scheduler); + // preemption service, periodically check whether we need to + // preempt to guarantee capacity constraints + ScheduleMonitor mon = new ScheduleMonitor(policy); + addService(mon); + + } + } else { + LOG.warn("Preemption policies configured (" + + YarnConfiguration.RM_SCHEDULER_ENABLE_PREEMPTION + + ") but none specified (" + + YarnConfiguration.RM_SCHEDULER_PREEMPTION_POLICIES + ")"); + } + } + } protected AdminService createAdminService( ClientRMService clientRMService, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/ScheduleEditPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/ScheduleEditPolicy.java new file mode 100644 index 0000000..7573fcb --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/ScheduleEditPolicy.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.monitor; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.PreemptableResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.RMContainerPreemptEvent; + +public interface ScheduleEditPolicy { + + public void init(Configuration config, + EventHandler dispatcher, + PreemptableResourceScheduler scheduler); + + /** + * This method is invoked at regular intervals. Internally the policy is + * allowed to track containers and affect the scheduler. The "actions" + * performed are passed back through an EventHandler. + */ + public void editSchedule(); + + public long getMonitoringInterval(); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/ScheduleMonitor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/ScheduleMonitor.java new file mode 100644 index 0000000..50d7985 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/ScheduleMonitor.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.monitor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.service.AbstractService; + +import com.google.common.annotations.VisibleForTesting; + +public class ScheduleMonitor extends AbstractService { + + private final ScheduleEditPolicy scheduleEditPolicy; + private static final Log LOG = LogFactory.getLog(ScheduleMonitor.class); + + //thread which runs periodically to see the last time since a heartbeat is + //received. + private Thread checkerThread; + private volatile boolean stopped; + private long monitorInterval; + + public ScheduleMonitor(ScheduleEditPolicy scheduleEditPolicy) { + super("ScheduleMonitor"); + this.scheduleEditPolicy = scheduleEditPolicy; + this.monitorInterval = scheduleEditPolicy.getMonitoringInterval(); + } + + public long getMonitorInterval() { + return monitorInterval; + } + + @Override + public void start() { + assert !stopped : "starting when already stopped"; + checkerThread = new Thread(new PreepmtionChecker()); + checkerThread.setName("Preemption Checker"); + checkerThread.start(); + super.start(); + } + + @Override + public void stop() { + stopped = true; + if (checkerThread != null) { + checkerThread.interrupt(); + } + super.stop(); + } + + protected void setMonitorInterval(int monitorInterval) { + this.monitorInterval = monitorInterval; + } + + + public void init(Configuration conf) { + super.init(conf); + } + + @VisibleForTesting + public void invokePolicy(){ + scheduleEditPolicy.editSchedule(); + } + + private class PreepmtionChecker implements Runnable { + @Override + public void run() { + while (!stopped && !Thread.currentThread().isInterrupted()) { + synchronized (ScheduleMonitor.this) { + + //invoke the preemption policy at a regular pace + //the policy will generate preemption or kill events + //managed by the dispatcher + invokePolicy(); + } + try { + Thread.sleep(monitorInterval); + } catch (InterruptedException e) { + LOG.info(getName() + " thread interrupted"); + break; + } + } + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java new file mode 100644 index 0000000..a6b8d23 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -0,0 +1,524 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.Clock; +import org.apache.hadoop.yarn.SystemClock; +import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.PreemptableResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.RMContainerPreemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.RMContainerPreemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.ScheduleEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.BuilderUtils; + +import com.google.common.annotations.VisibleForTesting; + +public class ProportionalCapacityPreemptionPolicy implements ScheduleEditPolicy { + + private static final Log LOG = + LogFactory.getLog(ProportionalCapacityPreemptionPolicy.class); + + public static final String OBSERVE_ONLY = + "yarn.resourcemanager.monitor.capacity.preemption.observe_only"; + public static final String MONITORING_INTERVAL = + "yarn.resourcemanager.monitor.capacity.preemption.monitoring_interval"; + public static final String WAIT_TIME_BEFORE_KILL = + "yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill"; + public static final String TOTAL_PREEMPTION_PER_ROUND = + "yarn.resourcemanager.monitor.capacity.preemption.total_preemption_per_round"; + public static final String MAX_IGNORED_OVER_CAPACITY = + "yarn.resourcemanager.monitor.capacity.preemption.max_ignored_over_capacity"; + public static final String NATURAL_TERMINATION_FACTOR = + "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor"; + + //the dispatcher to send preempt and kill events + public EventHandler dispatcher; + + private final Clock clock; + public double maxIgnoredOverCapacity; + private long maxWaitTime; + private CapacityScheduler scheduler; + private long monitoringInterval; + private final Map preempted = + new HashMap(); + private ResourceCalculator rc; + private float percentageClusterPreemptionAllowed; + private double naturalTerminationFactor; + private boolean observeOnly; + + public ProportionalCapacityPreemptionPolicy() { + clock = new SystemClock(); + } + + public ProportionalCapacityPreemptionPolicy(Configuration config, + EventHandler dispatcher, + CapacityScheduler scheduler) { + this(config, dispatcher, scheduler, new SystemClock()); + } + + public ProportionalCapacityPreemptionPolicy(Configuration config, + EventHandler dispatcher, + CapacityScheduler scheduler, Clock clock) { + init(config, dispatcher, scheduler); + this.clock = clock; + } + + public void init(Configuration config, + EventHandler disp, + PreemptableResourceScheduler sched) { + LOG.info("Preemption monitor:" + this.getClass().getCanonicalName()); + assert null == scheduler : "Unexpected duplicate call to init"; + if (!(sched instanceof CapacityScheduler)) { + throw new YarnException("Class " + + sched.getClass().getCanonicalName() + " not instance of " + + CapacityScheduler.class.getCanonicalName()); + } + dispatcher = disp; + scheduler = (CapacityScheduler) sched; + maxIgnoredOverCapacity = config.getDouble(MAX_IGNORED_OVER_CAPACITY, 0.1); + naturalTerminationFactor = + config.getDouble(NATURAL_TERMINATION_FACTOR, 0.2); + maxWaitTime = config.getLong(WAIT_TIME_BEFORE_KILL, 10000); + monitoringInterval = config.getLong(MONITORING_INTERVAL, 3000); + percentageClusterPreemptionAllowed = + config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1); + observeOnly = config.getBoolean(OBSERVE_ONLY, false); + rc = scheduler.getResourceCalculator(); + } + + @Override + public void editSchedule(){ + CSQueue root = scheduler.getRootQueue(); + Resource clusterResources = + Resources.clone(scheduler.getClusterResources()); + containerBasedPreemptOrKill(root, clusterResources); + } + + /** + * This methods select and track containers to be preempted + * @param root + * @param clusterResources + */ + private void containerBasedPreemptOrKill(CSQueue root, + Resource clusterResources) { + + // extract a summary of the queues from scheduler + Map queues; + synchronized (scheduler) { + queues = cloneLeafQueues(root, clusterResources); + } + + Resource totalPreemptionAllowed = Resources.multiply(clusterResources, + percentageClusterPreemptionAllowed); + + // compute the ideal distribution of resources among queues + // updates queues accordingly + computeIdealResourceDistribution(rc, queues, totalPreemptionAllowed); + + // select containers it wants to preempt to move the current state + // towards the ideal one computed above + Map> toPreempt = + getContainersToPreempt(queues, clusterResources); + + logToCSV(queues.keySet()); + + // if we are not in observeOnly mode we preempt (and maybe kill) containers + if (observeOnly) { + return; + } + + // preempt (or kill) the containers + for (Map.Entry> e + : toPreempt.entrySet()) { + for (RMContainer container : e.getValue()) { + // if we tried to preempt this for more than maxWaitTime + if (preempted.get(container) != null && + preempted.get(container) + maxWaitTime < clock.getTime()) { + // kill it + dispatcher.handle(new RMContainerPreemptEvent(e.getKey(), container, + RMContainerPreemptEventType.KILL_CONTAINER)); + preempted.remove(container); + } else { + //otherwise just send preemption events + dispatcher.handle(new RMContainerPreemptEvent(e.getKey(), container, + RMContainerPreemptEventType.PREEMPT_CONTAINER)); + if (preempted.get(container) == null) { + preempted.put(container, clock.getTime()); + } + } + } + } + + // Keep the preempted list clean + for (Iterator i = preempted.keySet().iterator(); i.hasNext();){ + RMContainer id = i.next(); + // if a container has been in this list for more than 2*maxWaitTime, it + // means the preemption policy is not seeing it as a target for + // preemption anymore this could be because preemption succeeded, task + // completed naturally, capacity is now rebalanced in any case it should + // be removed from this list + if (preempted.get(id) + 2 * maxWaitTime < clock.getTime()) { + i.remove(); + } + } + } + + + /** + * Based on the ideal allocation and current allocation state present in queues + * it drop reservations of containers and select containers for preemption from + * applications within each overcapacity queue. + * + * @param queues + * @param clusterResource + * @return + */ + private Map> getContainersToPreempt( + Map queues, Resource clusterResource) { + + Map> list = + new HashMap>(); + + for (Map.Entry entry : queues.entrySet()) { + TempQueue qT = entry.getKey(); + LeafQueue qL = entry.getValue(); + + // we act only if we are violating balance by more than + // maxIgnoredOverCapacity + if (Resources.greaterThan(rc, clusterResource, qT.current, + Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) { + // we introduce a dampening factor naturalTerminationFactor that + // accounts for natural termination of tasks + Resource resToObtain = + Resources.multiply(qT.toBePreempted, naturalTerminationFactor); + NavigableSet ns = + (NavigableSet) qL.getApplications(); + Iterator desc = ns.descendingIterator(); + qT.actuallyPreempted = Resources.clone(resToObtain); + while (desc.hasNext()) { + FiCaSchedulerApp fc = desc.next(); + if (Resources.lessThanOrEqual(rc, clusterResource, + resToObtain, Resources.none())) { + break; + } + list.put(fc.getApplicationAttemptId(), + preemptFrom(fc, clusterResource, resToObtain)); + } + } + } + return list; + } + + // simple printout function that reports internal queue state (for plotting) + private void logToCSV(Set unorderedqueues){ + List queues = new ArrayList(unorderedqueues); + Collections.sort(queues, new Comparator(){ + @Override + public int compare(TempQueue o1, TempQueue o2) { + return o1.queueName.compareTo(o2.queueName); + }}); + String queueState = " QUEUESTATE: " + clock.getTime(); + StringBuilder sb = new StringBuilder(); + sb.append(queueState); + for (TempQueue tq : queues) { + sb.append(", "); + tq.appendLogString(sb); + } + LOG.info(sb.toString()); + } + + /** + * Given a target preemption from a specific application select which containers + * to preempt (after unreserving all reservation for that app). + * + * @param app + * @param clusterResource + * @param rsrcPreempt + * @return + */ + private Set preemptFrom( + FiCaSchedulerApp app, Resource clusterResource, Resource rsrcPreempt) { + Set ret = new HashSet(); + ApplicationAttemptId appId = app.getApplicationAttemptId(); + + // FIRST list all reservations and drop all those + List reservations = + new ArrayList(app.getReservedContainers()); + for (RMContainer c : reservations) { + if (Resources.lessThanOrEqual(rc, clusterResource, + rsrcPreempt, Resources.none())) { + return ret; + } + if (!observeOnly) { + dispatcher.handle(new RMContainerPreemptEvent(appId, c, + RMContainerPreemptEventType.DROP_RESERVATION)); + } + Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource()); + } + + // If still needed go through all live containers in reverse id order and + // preempt those + List containers = + new ArrayList(app.getLiveContainers()); + + sortContainers(containers); + + for (RMContainer c : containers) { + if (Resources.lessThanOrEqual(rc, clusterResource, + rsrcPreempt, Resources.none())) { + return ret; + } + ret.add(c); + Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource()); + } + + return ret; + } + + /** + * compare by reversed priority order first, and then reversed containerId order + * @param containers + */ + @VisibleForTesting + static void sortContainers(List containers){ + Collections.sort(containers, new Comparator() { + @Override + public int compare(RMContainer a, RMContainer b) { + Comparator c = new org.apache.hadoop.yarn.server + .resourcemanager.resource.Priority.Comparator(); + int priorityComp = c.compare(b.getContainer().getPriority(), + a.getContainer().getPriority()); + if (priorityComp != 0) { + return priorityComp; + } + return b.getContainerId().getId() - + a.getContainerId().getId(); + } + }); + } + + @Override + public long getMonitoringInterval() { + return monitoringInterval; + } + + public Map cloneLeafQueues(CSQueue root, + Resource clusterResources) { + Map ret = new HashMap(); + synchronized (root) { + if (root instanceof LeafQueue) { + LeafQueue l = (LeafQueue) root; + Resource current = + Resources.multiply(clusterResources, l.getAbsoluteUsedCapacity()); + Resource pending = l.getTotalResourcePending(); + Resource guaranteed = + Resources.multiply(clusterResources, l.getAbsoluteCapacity()); + TempQueue tq = + new TempQueue(l.getQueueName(), current, pending, guaranteed); + ret.put(tq, l); + } else { + for (CSQueue c : root.getChildQueues()) { + ret.putAll(cloneLeafQueues(c, clusterResources)); + } + } + } + return ret; + } + + + static class TempQueue { + final String queueName; + final Resource current; + final Resource pending; + final Resource guaranteed; + Resource idealAssigned; + Resource toBePreempted; + Resource actuallyPreempted; + + double normalizedGuarantee; + + + TempQueue(String queueName, Resource current, Resource pending, + Resource guaranteed) { + this.queueName = queueName; + this.current = current; + this.pending = pending; + this.guaranteed = guaranteed; + this.idealAssigned = BuilderUtils.newResource(0, 0); + this.actuallyPreempted = BuilderUtils.newResource(0, 0); + this.toBePreempted = BuilderUtils.newResource(0, 0); + this.normalizedGuarantee = Float.NaN; + } + + Resource offer(Resource avail, ResourceCalculator rc, + Resource clusterResource) { + // remain = avail - min(avail, current + pending - assigned) + Resource accepted = Resources.min(rc, clusterResource, + avail, + Resources.subtract( + Resources.add(current, pending), + idealAssigned)); + Resource remain = Resources.subtract(avail, accepted); + Resources.addTo(idealAssigned, accepted); + return remain; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("CUR: ").append(current) + .append(" PEN: ").append(pending) + .append(" GAR: ").append(guaranteed) + .append(" NORM: ").append(normalizedGuarantee) + .append(" IDEAL_ASSIGNED: ").append(idealAssigned) + .append(" IDEAL_PREEMPT: ").append(toBePreempted) + .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted); + + return sb.toString(); + } + public void assignPreemption(float scalingFactor, + ResourceCalculator rc, Resource clusterResource) { + if (Resources.greaterThan(rc, clusterResource, current, idealAssigned)) { + toBePreempted = Resources.multiply( + Resources.subtract(current, idealAssigned), scalingFactor); + } else { + toBePreempted = BuilderUtils.newResource(0, 0); + } + } + + void appendLogString(StringBuilder sb) { + sb.append(queueName).append(", ") + .append(current.getMemory()).append(", ") + .append(current.getVirtualCores()).append(", ") + .append(pending.getMemory()).append(", ") + .append(pending.getVirtualCores()).append(", ") + .append(guaranteed.getMemory()).append(", ") + .append(guaranteed.getVirtualCores()).append(", ") + .append(idealAssigned.getMemory()).append(", ") + .append(idealAssigned.getVirtualCores()).append(", ") + .append(toBePreempted.getMemory()).append(", ") + .append(toBePreempted.getVirtualCores() ).append(", ") + .append(actuallyPreempted.getMemory()).append(", ") + .append(actuallyPreempted.getVirtualCores()); + } + + } + + public void computeIdealResourceDistribution(ResourceCalculator rc, + Map queues, Resource totalPreemptionAllowed) { + Resource tot_pending = BuilderUtils.newResource(0, 0); + Resource tot_current = BuilderUtils.newResource(0, 0); + Resource tot_guarant = BuilderUtils.newResource(0, 0); + for (TempQueue q : queues.keySet()) { + tot_pending = Resources.add(tot_pending, q.pending); + tot_current = Resources.add(tot_current, q.current); + tot_guarant = Resources.add(tot_guarant, q.guaranteed); + } + + List qAlloc = new ArrayList(queues.keySet()); + Resource unassigned = Resources.clone(tot_guarant); + + //assign all cluster resources + while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant, + unassigned, Resources.none())) { + Resource wQassigned = BuilderUtils.newResource(0, 0); + + //based on normalizedGuarantees + resetCapacity(rc, unassigned, qAlloc); + for (Iterator i = qAlloc.iterator(); i.hasNext();) { + TempQueue sub = i.next(); + Resource wQavail = + Resources.multiply(unassigned, sub.normalizedGuarantee); + Resource wQidle = sub.offer(wQavail, rc, tot_guarant); + Resource wQdone = Resources.subtract(wQavail, wQidle); + if (!Resources.greaterThan(rc, tot_guarant, + wQdone, Resources.none())) { + i.remove(); + } + Resources.addTo(wQassigned, wQdone); + } + Resources.subtractFrom(unassigned, wQassigned); + } + + //compute needed preemption + Resource totPreemptionNeeded = BuilderUtils.newResource(0, 0); + for (TempQueue t:queues.keySet()) { + if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned)) { + Resources.addTo(totPreemptionNeeded, + Resources.subtract(t.current, t.idealAssigned)); + } + } + + // if we need to preempt more than is allowed, compute a factor (0 queues) { + Resource activeCap = BuilderUtils.newResource(0, 0); + for (TempQueue q : queues) { + Resources.addTo(activeCap, q.guaranteed); + } + for (TempQueue q : queues) { + q.normalizedGuarantee = Resources.divide(rc, clusterResource, + q.guaranteed, activeCap); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 645d34b..cb0f653 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -176,6 +177,7 @@ synchronized public void updateResourceRequests( } } } + synchronized public Collection getPriorities() { return priorities; @@ -185,6 +187,13 @@ synchronized public void updateResourceRequests( Priority priority) { return requests.get(priority); } + + synchronized public List getAllResourceRequests() { + List ret = new ArrayList(); + for(Map r :requests.values()) + ret.addAll(r.values()); + return ret; + } synchronized public ResourceRequest getResourceRequest(Priority priority, String nodeAddress) { @@ -331,6 +340,7 @@ synchronized private void checkForDeactivation() { } } + synchronized private void allocate(Container container) { // Update consumption and track allocations //TODO: fixme sharad diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index aca2a12..fa2b885 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -63,7 +64,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; @@ -81,8 +81,9 @@ @LimitedPrivate("yarn") @Evolving @SuppressWarnings("unchecked") -public class CapacityScheduler -implements ResourceScheduler, CapacitySchedulerContext, Configurable { +public class CapacityScheduler + implements PreemptableResourceScheduler, CapacitySchedulerContext, + Configurable { private static final Log LOG = LogFactory.getLog(CapacityScheduler.class); @@ -537,7 +538,8 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, return new Allocation( application.pullNewlyAllocatedContainers(), - application.getHeadroom()); + application.getHeadroom(), + application.pullContainerToPreempt()); } } @@ -769,7 +771,8 @@ private synchronized void completedContainer(RMContainer rmContainer, Container container = rmContainer.getContainer(); // Get the application for the finished container - ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId(); + ApplicationAttemptId applicationAttemptId = + container.getId().getApplicationAttemptId(); FiCaSchedulerApp application = getApplication(applicationAttemptId); if (application == null) { LOG.info("Container " + container + " of" + @@ -826,5 +829,44 @@ public SchedulerNodeReport getNodeReport(NodeId nodeId) { FiCaSchedulerNode node = getNode(nodeId); return node == null ? null : new SchedulerNodeReport(node); } - + + @Override + @Lock(CapacityScheduler.class) + public void dropContainerReservation(RMContainer container) { + if(LOG.isDebugEnabled()){ + LOG.debug("DROP_RESERVATION:" + container.toString()); + } + completedContainer(container, + SchedulerUtils.createAbnormalContainerStatus( + container.getContainerId(), + SchedulerUtils.UNRESERVED_CONTAINER), + RMContainerEventType.KILL); + } + + @Override + @Lock(Lock.NoLock.class) + public void preemptContainer(ApplicationAttemptId aid, RMContainer cont) { + if(LOG.isDebugEnabled()){ + LOG.debug("PREEMPT_CONTAINER: application:" + aid.toString() + + " container: " + cont.toString()); + } + FiCaSchedulerApp app = applications.get(aid); + if (app != null) { + app.addPreemptContainer(cont.getContainerId()); + } + } + + @Override + @Lock(CapacityScheduler.class) + public void killContainer(RMContainer cont) { + if(LOG.isDebugEnabled()){ + LOG.debug("KILL_CONTAINER: container" + cont.toString()); + } + completedContainer(cont, + SchedulerUtils.createAbnormalContainerStatus( + cont.getContainerId(),"Container being forcibly preempted:" + + cont.getContainerId()), + RMContainerEventType.KILL); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 58dcb73..af24cd1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -333,6 +333,7 @@ public synchronized float getAbsoluteCapacity() { return absoluteCapacity; } + @Override public synchronized float getMaximumCapacity() { return maximumCapacity; @@ -1362,18 +1363,20 @@ private void reserve(FiCaSchedulerApp application, Priority priority, node.reserveResource(application, priority, rmContainer); } - private void unreserve(FiCaSchedulerApp application, Priority priority, + private boolean unreserve(FiCaSchedulerApp application, Priority priority, FiCaSchedulerNode node, RMContainer rmContainer) { // Done with the reservation? - application.unreserve(node, priority); - node.unreserveResource(application); - + if (application.unreserve(node, priority)) { + node.unreserveResource(application); + // Update reserved metrics - getMetrics().unreserveResource( - application.getUser(), rmContainer.getContainer().getResource()); + getMetrics().unreserveResource( + application.getUser(), rmContainer.getContainer().getResource()); + return true; + } + return false; } - @Override public void completedContainer(Resource clusterResource, FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, @@ -1383,37 +1386,39 @@ public void completedContainer(Resource clusterResource, synchronized (this) { Container container = rmContainer.getContainer(); - + + boolean removed = false; // Inform the application & the node // Note: It's safe to assume that all state changes to RMContainer // happen under scheduler's lock... // So, this is, in effect, a transaction across application & node if (rmContainer.getState() == RMContainerState.RESERVED) { - unreserve(application, rmContainer.getReservedPriority(), + removed = unreserve(application, rmContainer.getReservedPriority(), node, rmContainer); } else { - application.containerCompleted(rmContainer, containerStatus, event); + removed = application.containerCompleted(rmContainer, containerStatus, event); node.releaseContainer(container); } - // Book-keeping - releaseResource(clusterResource, - application, container.getResource()); - - LOG.info("completedContainer" + - " container=" + container + - " resource=" + container.getResource() + - " queue=" + this + - " usedCapacity=" + getUsedCapacity() + - " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + - " used=" + usedResources + - " cluster=" + clusterResource); + if (removed) { + releaseResource(clusterResource, + application, container.getResource()); + LOG.info("completedContainer" + + " container=" + container + + " resource=" + container.getResource() + + " queue=" + this + + " usedCapacity=" + getUsedCapacity() + + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + + " used=" + usedResources + + " cluster=" + clusterResource); + // Inform the parent queue + getParent().completedContainer(clusterResource, application, + node, rmContainer, null, event); + } } - // Inform the parent queue - getParent().completedContainer(clusterResource, application, - node, rmContainer, null, event); + } } @@ -1560,5 +1565,19 @@ public void recoverContainer(Resource clusterResource, getParent().recoverContainer(clusterResource, application, container); } - + + // need to access the list of apps from the preemption monitor + public Set getApplications() { + return activeApplications; + } + + // return a single Resource capturing the overal amount of pending resources + public Resource getTotalResourcePending() { + Resource ret = BuilderUtils.newResource(0, 0); + for (FiCaSchedulerApp f : activeApplications) { + Resources.addTo(ret, f.getTotalPendingRequests()); + } + return ret; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index f7a8f7c..e8a5f8e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -20,9 +20,12 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -57,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.util.BuilderUtils; import com.google.common.collect.HashMultiset; import com.google.common.collect.Multiset; @@ -84,16 +88,19 @@ private Resource resourceLimit = recordFactory .newRecordInstance(Resource.class); - private Map liveContainers - = new HashMap(); - private List newlyAllocatedContainers = - new ArrayList(); + private Map liveContainers = + new HashMap(); + private List newlyAllocatedContainers = + new ArrayList(); final Map> reservedContainers = new HashMap>(); private boolean isStopped = false; + private final Set containerToPreempt = + new HashSet(); + /** * Count how many times the application has been given an opportunity @@ -212,12 +219,17 @@ public synchronized void containerLaunchedOnNode(ContainerId containerId, RMContainerEventType.LAUNCHED)); } - synchronized public void containerCompleted(RMContainer rmContainer, + synchronized public boolean containerCompleted(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { - + + // Remove from the list of containers + if (null == liveContainers.remove(rmContainer.getContainerId())) { + return false; + } + Container container = rmContainer.getContainer(); ContainerId containerId = container.getId(); - + // Inform the container rmContainer.handle( new RMContainerFinishedEvent( @@ -228,8 +240,8 @@ synchronized public void containerCompleted(RMContainer rmContainer, LOG.info("Completed container: " + rmContainer.getContainerId() + " in state: " + rmContainer.getState() + " event:" + event); - // Remove from the list of containers - liveContainers.remove(rmContainer.getContainerId()); + + containerToPreempt.remove(rmContainer.getContainerId()); RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER, "SchedulerApp", @@ -239,6 +251,8 @@ synchronized public void containerCompleted(RMContainer rmContainer, Resource containerResource = rmContainer.getContainer().getResource(); queue.getMetrics().releaseResources(getUser(), 1, containerResource); Resources.subtractFrom(currentConsumption, containerResource); + + return true; } synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, @@ -407,23 +421,36 @@ public synchronized RMContainer reserve(FiCaSchedulerNode node, Priority priorit return rmContainer; } - public synchronized void unreserve(FiCaSchedulerNode node, Priority priority) { - Map reservedContainers = - this.reservedContainers.get(priority); - RMContainer reservedContainer = reservedContainers.remove(node.getNodeID()); - if (reservedContainers.isEmpty()) { - this.reservedContainers.remove(priority); - } + public synchronized boolean unreserve(FiCaSchedulerNode node, Priority priority) { + Map reservedContainers = this.reservedContainers + .get(priority); - // Reset the re-reservation count - resetReReservations(priority); - - Resource resource = reservedContainer.getContainer().getResource(); - Resources.subtractFrom(currentReservation, resource); - - LOG.info("Application " + getApplicationId() + " unreserved " + " on node " - + node + ", currently has " + reservedContainers.size() + " at priority " - + priority + "; currentReservation " + currentReservation); + if (reservedContainers != null) { + RMContainer reservedContainer = reservedContainers.remove(node.getNodeID()); + + // unreserve is now triggered in new scenarios (preemption) + // as a consequence reservedcontainer might be null, adding NP-checks + if (reservedContainer != null + && reservedContainer.getContainer() != null + && reservedContainer.getContainer().getResource() != null) { + + if (reservedContainers.isEmpty()) { + this.reservedContainers.remove(priority); + } + // Reset the re-reservation count + resetReReservations(priority); + + Resource resource = reservedContainer.getContainer().getResource(); + Resources.subtractFrom(currentReservation, resource); + + LOG.info("Application " + getApplicationId() + " unreserved " + + " on node " + node + ", currently has " + reservedContainers.size() + + " at priority " + priority + "; currentReservation " + + currentReservation); + return true; + } + } + return false; } /** @@ -487,4 +514,36 @@ public synchronized Resource getHeadroom() { public Queue getQueue() { return queue; } + + public Resource getTotalPendingRequests() { + Resource ret = BuilderUtils.newResource(0, 0); + for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) { + // to avoid double counting we count only "ANY" resource requests + if (ResourceRequest.isAnyLocation(rr.getHostName())){ + Resources.addTo(ret, + Resources.multiply(rr.getCapability(), rr.getNumContainers())); + } + } + return ret; + } + + + /** + * @return Set of containers requested back in this quanta. + */ + public synchronized Set pullContainerToPreempt() { + Set ret = new HashSet(containerToPreempt); + containerToPreempt.clear(); + return Collections.unmodifiableSet(ret); + } + + + public synchronized void addPreemptContainer(ContainerId cont){ + // ignore already completed containers + if (liveContainers.containsKey(cont)) { + containerToPreempt.add(cont); + } + } + + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index f6bac00..776e8ac 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -142,8 +142,9 @@ public synchronized void releaseContainer(Container container) { } /* remove the containers from the nodemanger */ - launchedContainers.remove(container.getId()); - updateResource(container); + if (null != launchedContainers.remove(container.getId())) { + updateResource(container); + } LOG.info("Released container " + container.getId() + " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + @@ -226,18 +227,25 @@ public synchronized void reserveResource( public synchronized void unreserveResource( SchedulerApplication application) { - // Cannot unreserve for wrong application... - ApplicationAttemptId reservedApplication = - reservedContainer.getContainer().getId().getApplicationAttemptId(); - if (!reservedApplication.equals( - application.getApplicationAttemptId())) { - throw new IllegalStateException("Trying to unreserve " + - " for application " + application.getApplicationAttemptId() + - " when currently reserved " + - " for application " + reservedApplication.getApplicationId() + - " on node " + this); - } + // adding NP checks as this can now be called for preemption + if (reservedContainer != null + && reservedContainer.getContainer() != null + && reservedContainer.getContainer().getId() != null + && reservedContainer.getContainer().getId().getApplicationAttemptId() != null) { + + // Cannot unreserve for wrong application... + ApplicationAttemptId reservedApplication = + reservedContainer.getContainer().getId().getApplicationAttemptId(); + if (!reservedApplication.equals( + application.getApplicationAttemptId())) { + throw new IllegalStateException("Trying to unreserve " + + " for application " + application.getApplicationAttemptId() + + " when currently reserved " + + " for application " + reservedApplication.getApplicationId() + + " on node " + this); + } + } reservedContainer = null; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java new file mode 100644 index 0000000..9287bd0 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -0,0 +1,417 @@ +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.NavigableSet; +import java.util.Random; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.Clock; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.RMContainerPreemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.RMContainerPreemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.BuilderUtils; +import static org.apache.hadoop.yarn.server.resourcemanager.RMContainerPreemptEventType.*; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.*; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import static org.junit.Assert.*; + +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatcher; +import static org.mockito.Mockito.*; + +public class TestProportionalCapacityPreemptionPolicy { + + static final long TS = 3141592653L; + + Random rand = null; + Clock mClock = null; + Configuration conf = null; + CapacityScheduler mCS = null; + EventHandler mDisp = null; + ResourceCalculator rc = new DefaultResourceCalculator(); + RecordFactory rf = RecordFactoryProvider.getRecordFactory(null); + final ApplicationAttemptId appA = BuilderUtils.newApplicationAttemptId( + BuilderUtils.newApplicationId(TS, 0), 0); + final ApplicationAttemptId appB = BuilderUtils.newApplicationAttemptId( + BuilderUtils.newApplicationId(TS, 1), 0); + final ApplicationAttemptId appC = BuilderUtils.newApplicationAttemptId( + BuilderUtils.newApplicationId(TS, 2), 0); + final ApplicationAttemptId appD = BuilderUtils.newApplicationAttemptId( + BuilderUtils.newApplicationId(TS, 2), 0); + final ArgumentCaptor evtCaptor = + ArgumentCaptor.forClass(RMContainerPreemptEvent.class); + + @Rule public TestName name = new TestName(); + + @Before + @SuppressWarnings("unchecked") + public void setup() { + conf = new Configuration(false); + conf.setLong(WAIT_TIME_BEFORE_KILL, 10000); + conf.setLong(MONITORING_INTERVAL, 3000); + // report "ideal" preempt + conf.setFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 1.0); + conf.setFloat(NATURAL_TERMINATION_FACTOR, (float) 1.0); + + mClock = mock(Clock.class); + mCS = mock(CapacityScheduler.class); + when(mCS.getResourceCalculator()).thenReturn(rc); + mDisp = mock(EventHandler.class); + rand = new Random(); + long seed = rand.nextLong(); + System.out.println(name.getMethodName() + " SEED: " + seed); + rand.setSeed(seed); + } + + @Test + public void testIgnore() { + int[][] qData = new int[][]{ + // A B C + { 40, 40, 20 }, // abs + { 0, 60, 40 }, // used + { 0, 0, 0 }, // pending + { 0, 0, 0 }, // reserved + { 1, 1, 1 }, // apps + { 1, 1, 1 }, // req granularity + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // don't correct imbalances without demand + verify(mDisp, never()).handle(isA(RMContainerPreemptEvent.class)); + } + + @Test + public void testProportionalPreemption() { + int[][] qData = new int[][]{ + // A B C D + { 10, 40, 20, 30 }, // abs + { 30, 60, 10, 0 }, // used + { 20, 5, 20, 0 }, // pending + { 0, 0, 0, 0 }, // reserved + { 1, 1, 1, 0 }, // apps + { 1, 1, 1, 1 }, // req granularity + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA))); + } + + @Test + public void testPreemptCycle() { + int[][] qData = new int[][]{ + // A B C + { 40, 40, 20 }, // abs + { 0, 60, 40 }, // used + { 10, 0, 0 }, // pending + { 0, 0, 0 }, // reserved + { 1, 1, 1 }, // apps + { 1, 1, 1 }, // req granularity + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // ensure all pending rsrc from A get preempted from other queues + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC))); + } + + @Test + public void testExpireKill() { + final long killTime = 10000L; + int[][] qData = new int[][]{ + // A B C + { 40, 40, 20 }, // abs + { 0, 60, 40 }, // used + { 10, 0, 0 }, // pending + { 0, 0, 0 }, // reserved + { 1, 1, 1 }, // apps + { 1, 1, 1 }, // req granularity + }; + conf.setLong(WAIT_TIME_BEFORE_KILL, killTime); + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + + // ensure all pending rsrc from A get preempted from other queues + when(mClock.getTime()).thenReturn(0L); + policy.editSchedule(); + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // requests reiterated + when(mClock.getTime()).thenReturn(killTime / 2); + policy.editSchedule(); + verify(mDisp, times(20)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // kill req sent + when(mClock.getTime()).thenReturn(killTime + 1); + policy.editSchedule(); + verify(mDisp, times(30)).handle(evtCaptor.capture()); + List events = evtCaptor.getAllValues(); + for (RMContainerPreemptEvent e : events.subList(20, 30)) { + assertEquals(appC, e.getAppId()); + assertEquals(KILL_CONTAINER, e.getType()); + } + } + + @Test + public void testDeadzone() { + int[][] qData = new int[][]{ + // A B C + { 40, 40, 20 }, // abs + { 39, 43, 21 }, // used + { 10, 0, 0 }, // pending + { 0, 0, 0 }, // reserved + { 1, 1, 1 }, // apps + { 1, 1, 1 }, // req granularity + }; + conf.setFloat(MAX_IGNORED_OVER_CAPACITY, (float) 0.1); + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // ignore 10% overcapacity to avoid jitter + verify(mDisp, never()).handle(isA(RMContainerPreemptEvent.class)); + } + + @Test + public void testOverCapacityImbalance() { + int[][] qData = new int[][]{ + // A B C + { 40, 40, 20 }, // abs + { 55, 45, 0 }, // used + { 10, 10, 0 }, // pending + { 0, 0, 0 }, // reserved + { 1, 1, 0 }, // apps + { 1, 1, 0 }, // req granularity + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // correct imbalance between over-capacity queues + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA))); + } + + @Test + public void testNaturalTermination() { + int[][] qData = new int[][]{ + // A B C + { 40, 40, 20 }, // abs + { 55, 45, 0 }, // used + { 10, 10, 0 }, // pending + { 0, 0, 0 }, // reserved + { 1, 1, 0 }, // apps + { 1, 1, 0 }, // req granularity + }; + conf.setFloat(NATURAL_TERMINATION_FACTOR, (float) 0.1); + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // ignore 10% imbalance between over-capacity queues + verify(mDisp, never()).handle(isA(RMContainerPreemptEvent.class)); + } + + @Test + public void testObserveOnly() { + int[][] qData = new int[][]{ + // A B C + { 40, 40, 20 }, // abs + { 90, 10, 0 }, // used + { 10, 20, 50 }, // pending + { 0, 0, 0 }, // reserved + { 1, 1, 0 }, // apps + { 1, 1, 0 }, // req granularity + }; + conf.setBoolean(OBSERVE_ONLY, true); + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // verify even severe imbalance not affected + verify(mDisp, never()).handle(isA(RMContainerPreemptEvent.class)); + } + + + @Test + public void testContainerOrdering(){ + + List containers = new ArrayList(); + + ApplicationAttemptId appAttId = BuilderUtils.newApplicationAttemptId( + BuilderUtils.newApplicationId(TS, 10), 0); + + // create a set of containers + RMContainer rm1 = mockContainer(appAttId, 5, mock(Resource.class), 3); + RMContainer rm2 = mockContainer(appAttId, 3, mock(Resource.class), 3); + RMContainer rm3 = mockContainer(appAttId, 2, mock(Resource.class), 2); + RMContainer rm4 = mockContainer(appAttId, 1, mock(Resource.class), 2); + RMContainer rm5 = mockContainer(appAttId, 4, mock(Resource.class), 1); + + // insert them in non-sorted order + containers.add(rm3); + containers.add(rm2); + containers.add(rm1); + containers.add(rm5); + containers.add(rm4); + + // sort them + ProportionalCapacityPreemptionPolicy.sortContainers(containers); + + // verify the "priority"-first, "reverse container-id"-second + // ordering is enforced correctly + assert containers.get(0).equals(rm1); + assert containers.get(1).equals(rm2); + assert containers.get(2).equals(rm3); + assert containers.get(3).equals(rm4); + assert containers.get(4).equals(rm5); + + } + + static class IsPreemptionRequestFor + extends ArgumentMatcher { + private final ApplicationAttemptId appAttId; + private final RMContainerPreemptEventType type; + IsPreemptionRequestFor(ApplicationAttemptId appAttId) { + this(appAttId, PREEMPT_CONTAINER); + } + IsPreemptionRequestFor(ApplicationAttemptId appAttId, + RMContainerPreemptEventType type) { + this.appAttId = appAttId; + this.type = type; + } + @Override + public boolean matches(Object o) { + return appAttId.equals(((RMContainerPreemptEvent)o).getAppId()) + && type.equals(((RMContainerPreemptEvent)o).getType()); + } + @Override + public String toString() { + return appAttId.toString(); + } + } + + ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) { + ProportionalCapacityPreemptionPolicy policy = + new ProportionalCapacityPreemptionPolicy(conf, mDisp, mCS, mClock); + CSQueue mRoot = buildMockRootQueue(rand, qData); + when(mCS.getRootQueue()).thenReturn(mRoot); + + Resource clusterResources = BuilderUtils.newResource(sum(qData[0]), 0); + when(mCS.getClusterResources()).thenReturn(clusterResources); + return policy; + } + + CSQueue buildMockRootQueue(Random r, int[]... queueData) { + int[] abs = queueData[0]; + int[] used = queueData[1]; + int[] pending = queueData[2]; + int[] reserved = queueData[3]; + int[] apps = queueData[4]; + int[] gran = queueData[5]; + assert 26 > queueData[0].length; + for (int i = 0; i < queueData[0].length - 1; ++i) { + assert queueData[i].length == queueData[i+1].length; + // simplifying assumption: all requests/rsrc evenly distributed + // across applications, all requests in queue uniform size + if (apps[i] != 0) { + assert 0 == pending[i] % apps[i]; + assert 0 == reserved[i] % apps[i]; + } + assert 0 == used[i] % gran[i]; + assert 0 == pending[i] % gran[i]; + assert 0 == reserved[i] % gran[i]; + } + float tot = sum(abs); + List lqs = new ArrayList(); + int appAlloc = 0; + for (int i = 0; i < abs.length; ++i) { + String queueName = "queue" + ('A' + i); + LeafQueue lq = mock(LeafQueue.class); + when(lq.getQueueName()).thenReturn(queueName); + when(lq.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot); + when(lq.getAbsoluteCapacity()).thenReturn(abs[i] / tot); + when(lq.getTotalResourcePending()).thenReturn( + BuilderUtils.newResource(pending[i], 0)); + // consider moving where CapacityScheduler::comparator accessible + NavigableSet qApps = new TreeSet( + new Comparator() { + @Override + public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { + return a1.getApplicationAttemptId() + .compareTo(a2.getApplicationAttemptId()); + } + }); + if (apps[i] != 0) { + int aUsed = used[i] / apps[i]; + int aPending = pending[i] / apps[i]; + int aReserve = reserved[i] / apps[i]; + for (int a = 0; a < apps[i]; ++a) { + qApps.add(mockApp(appAlloc, aUsed, aPending, aReserve, gran[i])); + ++appAlloc; + } + } + when(lq.getApplications()).thenReturn(qApps); + lqs.add(lq); + } + CSQueue root = mock(CSQueue.class); + when(root.getChildQueues()).thenReturn(lqs); + return root; + } + + FiCaSchedulerApp mockApp(int id, int used, int pending, int reserved, + int gran) { + FiCaSchedulerApp app = mock(FiCaSchedulerApp.class); + + ApplicationAttemptId appAttId = BuilderUtils.newApplicationAttemptId( + BuilderUtils.newApplicationId(TS, id), 0); + when(app.getApplicationAttemptId()).thenReturn(appAttId); + + int cAlloc = 0; + Resource unit = BuilderUtils.newResource(gran, 0); + List cReserved = new ArrayList(); + for (int i = 0; i < reserved; i += gran) { + cReserved.add(mockContainer(appAttId, cAlloc, unit, 1)); + ++cAlloc; + } + when(app.getReservedContainers()).thenReturn(cReserved); + + List cLive = new ArrayList(); + for (int i = 0; i < used; i += gran) { + cLive.add(mockContainer(appAttId, cAlloc, unit, 1)); + ++cAlloc; + } + when(app.getLiveContainers()).thenReturn(cLive); + return app; + } + + RMContainer mockContainer(ApplicationAttemptId appAttId, int id, Resource r, int priority) { + ContainerId cId = BuilderUtils.newContainerId(rf, appAttId, id); + Container c = mock(Container.class); + when(c.getResource()).thenReturn(r); + when(c.getPriority()).thenReturn(Priority.create(priority)); + RMContainer mC = mock(RMContainer.class); + when(mC.getContainerId()).thenReturn(cId); + when(mC.getContainer()).thenReturn(c); + + return mC; + } + + static int sum(int... ii) { + int ret = 0; + for (int i : ii) { + ret += i; + } + return ret; + } + +}