diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java new file mode 100644 index 0000000..a55e434 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java @@ -0,0 +1,415 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerConfigEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ReservationQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.DynamicQueueConf; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Sets; + +/** + * This class implements a {@link PlanFollower}. This is invoked on a timer, and + * it is in charge to publish the state of the {@link Plan}s to the underlying + * {@link CapacityScheduler}. This implementation does so, by + * adding/removing/resizing leaf queues in the scheduler, thus affecting the + * dynamic behavior of the scheduler in a way that is consistent with the + * content of the plan. It also updates the plan's view on how much resources + * are available in the cluster. + * + * This implementation of PlanFollower is relatively stateless, and it can + * synchronize schedulers and Plans that have arbitrary changes (performing set + * differences among existing queues). This makes it resilient to frequency of + * synchronization, and RM restart issues (no "catch up" is necessary). + */ +public class CapacitySchedulerPlanFollower implements PlanFollower { + + private static final Logger LOG = LoggerFactory + .getLogger(CapacitySchedulerPlanFollower.class); + + private Collection plans = new ArrayList(); + + private Clock clock; + private CapacityScheduler scheduler; + private Map> drainedReservations = + new HashMap>(); + + @Override + public void init(Clock clock, ResourceScheduler sched, Collection plans) { + LOG.info("Initializing Plan Follower Policy:" + + this.getClass().getCanonicalName()); + if (!(sched instanceof CapacityScheduler)) { + throw new YarnRuntimeException("Class " + + sched.getClass().getCanonicalName() + " not instance of " + + CapacityScheduler.class.getCanonicalName()); + } + this.clock = clock; + this.scheduler = (CapacityScheduler) sched; + this.plans.addAll(plans); + } + + @Override + public synchronized void run() { + for (Plan plan : plans) { + synchronizePlan(plan); + } + } + + @Override + public void synchronizePlan(Plan plan) { + String planQueueName = plan.getQueueName(); + LOG.debug("Running plan follower edit policy for plan: " + planQueueName); + // align with plan step + long step = plan.getStep(); + long now = clock.getTime(); + if (now % step != 0) { + now += step - (now % step); + } + synchronized (scheduler) { + CSQueue planQueue = scheduler.getQueue(planQueueName); + if (!(planQueue instanceof PlanQueue)) { + LOG.error("The Plan is not an PlanQueue!"); + return; + } + // first we publish to the plan the current availability of resources + Resource clusterResources = scheduler.getClusterResource(); + float planAbsCap = planQueue.getAbsoluteCapacity(); + Resource planResources = Resources.multiply(clusterResources, planAbsCap); + plan.setTotalCapacity(planResources); + + Set currentReservations = + plan.getReservationsAtTime(now); + Set curReservationNames = new HashSet(); + Resource allocatedCapacity = Resource.newInstance(0, 0); + int numSes = 0; + if (currentReservations != null) { + numSes = currentReservations.size(); + for (ReservationAllocation reservation : currentReservations) { + curReservationNames.add(reservation.getReservationId().toString()); + Resources.addTo(allocatedCapacity, + reservation.getResourcesAtTime(now)); + } + } + // if the resources dedicated to this plan shrink invoke replanner + String defPlanQName = planQueueName + PlanQueue.DEFAULT_QUEUE_SUFFIX; + if (Resources.greaterThan(scheduler.getResourceCalculator(), + clusterResources, allocatedCapacity, planResources)) { + try { + plan.getReplanner().plan(plan, null); + } catch (PlanningException e) { + LOG.warn("Exception while trying to replan: {}", planQueueName, e); + } + } + // Add default queue if it doesnt exist + if (scheduler.getQueue(defPlanQName) == null) { + ReservationQueue resQueue = + new ReservationQueue(scheduler, defPlanQName, + ((PlanQueue) planQueue)); + try { + scheduler.addQueue(resQueue); + } catch (SchedulerConfigEditException e) { + LOG.warn("Exception while trying to add default queue for plan: {}", + planQueueName, e); + } + } + curReservationNames.add(defPlanQName); + // identify the sessions that have expired and new sessions that have to + // be activated + Set previousReservations = + scheduler.getReservationQueueNames(planQueueName); + Set expired = + Sets.difference(previousReservations, curReservationNames); + Set toAdd = + Sets.difference(curReservationNames, previousReservations); + // determine newly expired reservations to move to parent's default + // queue + Set toRemove = null; + boolean isMove = plan.getMoveOnExpiry(); + if (isMove) { + Set toMove = null; + Set drained = drainedReservations.remove(planQueueName); + if (drained == null) { + toRemove = new HashSet(); + toMove = new HashSet(expired); + } else { + toRemove = new HashSet(drained); + toMove = new HashSet(Sets.difference(expired, drained)); + } + // Move all the apps in these queues to defPlanQName + moveAppsInQueues(toMove, defPlanQName); + // moveAll is async so we need to tracked in-flight moves (before + // killing) + drainedReservations.put(planQueueName, toMove); + } else { + toRemove = new HashSet(expired); + } + + // Kill all the apps in these queues + killAppsInQueues(toRemove); + + // Set the entitlement for all expired queues to zero. We might not be + // able to remove expired queues the first time around if the events of + // MOVE and KILL have not been handled yet. This will work correctly + // at some future invocation of PlanFollower and converge to a + // correct state. + for (String resToResize : expired) { + // reduce entitlement to 0 and remove + try { + scheduler.setEntitlement(resToResize, + new DynamicQueueConf(0.0f, 0.0f)); + } catch (YarnException e) { + LOG.warn( + "Exception while trying to expire reservation: {} for plan: {}", + resToResize, planQueueName, e); + } + if (scheduler.getQueue(resToResize).getNumApplications() == 0) { + try { + scheduler.removeQueue(resToResize); + if (isMove) { + drainedReservations.get(planQueueName).remove(resToResize); + } + LOG.info("Queue " + resToResize + " removed"); + } catch (SchedulerConfigEditException e) { + LOG.warn( + "Exception while trying to remove expired reservation: {} for plan: {}", + resToResize, planQueueName, e); + } + } + } + + // Add new reservations and update existing ones + float totalAssignedCapacity = 0f; + if (currentReservations != null) { + // first release all excess capacity in default queue + try { + scheduler + .setEntitlement(defPlanQName, new DynamicQueueConf(0f, 1.0f)); + } catch (YarnException e) { + LOG.warn( + "Exception while trying to release default queue capacity for plan: {}", + planQueueName, e); + } + // sort allocations from the one giving up the most resources, to the + // one asking for the most + // avoid order-of-operation errors that temporarily violate 100% + // capacity bound + List sortedAllocations = + sortByDelta(new ArrayList( + currentReservations), now); + for (ReservationAllocation res : sortedAllocations) { + String currResId = res.getReservationId().toString(); + if (toAdd.contains(currResId)) { + ReservationQueue resQueue = + new ReservationQueue(scheduler, currResId, + ((PlanQueue) planQueue)); + try { + scheduler.addQueue(resQueue); + } catch (SchedulerConfigEditException e) { + LOG.warn( + "Exception while trying to activate reservation: {} for plan: {}", + currResId, planQueueName, e); + } + } + Resource capToAssign = res.getResourcesAtTime(now); + float targetCapacity = 0f; + if (planResources.getMemory() > 0 + && planResources.getVirtualCores() > 0) { + targetCapacity = + Resources.divide(scheduler.getResourceCalculator(), + clusterResources, capToAssign, planResources); + } + LOG.debug( + "Assigning capacity of {} to queue {} with target capacity {}", + capToAssign, currResId, targetCapacity); + // set maxCapacity to 100% unless the job requires gang, in which + // case we stick to capacity (as running early/before is likely a + // waste of resources) + float maxCapacity = 1.0f; + if (res.containsGangs()) { + maxCapacity = targetCapacity; + } + try { + scheduler.setEntitlement(currResId, new DynamicQueueConf( + targetCapacity, maxCapacity)); + } catch (YarnException e) { + LOG.warn("Exception while trying to size reservation for plan: {}", + currResId, planQueueName, e); + } + totalAssignedCapacity += targetCapacity; + } + } + // compute the default queue capacity + LOG.debug("Plan total capacity is: {} and reserved capacity is: {}", + planAbsCap, totalAssignedCapacity); + float defQCap = 1.0f - totalAssignedCapacity; + LOG.debug("PlanFollowerEditPolicyTask: total Plan Capacity: {} " + + "currReservation: {} default-queue capacity: {}", planResources, + numSes, defQCap); + + // set the default queue to eat-up all remaining capacity + try { + scheduler.setEntitlement(defPlanQName, new DynamicQueueConf(defQCap, + 1.0f)); + } catch (YarnException e) { + LOG.warn( + "Exception while trying to reclaim default queue capacity for plan: {}", + planQueueName, e); + } + } + // garbage collect finished reservations from plan + try { + plan.archiveCompletedReservations(now); + } catch (PlanningException e) { + LOG.error("Exception in archiving completed reservations: ", e); + } + LOG.info("Finished iteration of plan follower edit policy for plan: " + + planQueueName); + + // Extension: update plan with app states, + // useful to support smart replanning + } + + /** + * Send a Move signal to all apps running in the set of Queues toMove. Apps + * are moved to the default queue defPlanQName + * + * @param toMove + * @param defPlanQName + * @throws YarnException + */ + private void moveAppsInQueues(Set toMove, String defPlanQName) { + for (String drainReservation : toMove) { + // fallback to parent's default queue + try { + if (scheduler.getAppsInQueue(drainReservation).size() > 0) { + scheduler.moveAllApps(drainReservation, defPlanQName); + LOG.info("Moving applications from queue {} to queue {}", + drainReservation, defPlanQName); + } + } catch (YarnException e) { + LOG.warn( + "Encountered unexpected error during migration of reservation: {}", + drainReservation, e); + } + } + } + + /** + * Send a Kill signal to all apps running in the set of queues toRemove + * + * @param toRemove + */ + private void killAppsInQueues(Set toRemove) { + for (String expiredReservation : toRemove) { + try { + if (scheduler.getAppsInQueue(expiredReservation).size() > 0) { + scheduler.killAllAppsInQueue(expiredReservation); + LOG.info("Killing applications in queue: {}", expiredReservation); + } + } catch (YarnException e) { + LOG.warn( + "Deleting as encountered unexpected error during migration of reservation: {}", + expiredReservation, e); + } + } + } + + @Override + public synchronized void setPlans(Collection plans) { + this.plans.clear(); + this.plans.addAll(plans); + } + + /** + * Sort in the order from the least new amount of resources asked (likely + * negative) to the highest. This prevents "order-of-operation" errors related + * to exceeding 100% capacity temporarily. + */ + private List sortByDelta( + List currentReservations, long now) { + Collections.sort(currentReservations, new ReservationAllocationComparator( + scheduler, now)); + return currentReservations; + } + + private class ReservationAllocationComparator implements + Comparator { + CapacityScheduler scheduler; + long now; + + ReservationAllocationComparator(CapacityScheduler scheduler, long now) { + this.scheduler = scheduler; + this.now = now; + } + + @Override + public int compare(ReservationAllocation lhs, ReservationAllocation rhs) { + // compute delta between current and previous reservation, and compare + // based on that + Resource lhsRes; + Resource clusterResource = scheduler.getClusterResource(); + CSQueue lhsQueue = scheduler.getQueue(lhs.getReservationId().toString()); + if (lhsQueue != null) { + lhsRes = + Resources.subtract( + lhs.getResourcesAtTime(now), + Resources.multiply(clusterResource, + lhsQueue.getAbsoluteCapacity())); + } else { + lhsRes = lhs.getResourcesAtTime(now); + } + Resource rhsRes; + CSQueue rhsQueue = scheduler.getQueue(rhs.getReservationId().toString()); + if (rhsQueue != null) { + rhsRes = + Resources.subtract( + rhs.getResourcesAtTime(now), + Resources.multiply(clusterResource, + rhsQueue.getAbsoluteCapacity())); + } else { + rhsRes = rhs.getResourcesAtTime(now); + } + return lhsRes.compareTo(rhsRes); + } + + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java new file mode 100644 index 0000000..9d00366 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java @@ -0,0 +1,67 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.Collection; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.util.Clock; + +/** + * A PlanFollower is a component that runs on a timer, and synchronizes the + * underlying {@link ResourceScheduler} with the {@link Plan}(s) and viceversa. + * + * While different implementations might operate differently, the key idea is to + * map the current allocation of resources for each active reservation in the + * plan(s), to a corresponding notion in the underlying scheduler (e.g., tuning + * capacity of queues, set pool weights, or tweak application priorities). The + * goal is to affect the dynamic allocation of resources done by the scheduler + * so that the jobs obtain access to resources in a way that is consistent with + * the reservations in the plan. A key conceptual step here is to convert the + * absolute-valued promises made in the reservations to appropriate relative + * priorities/queue sizes etc. + * + * Symmetrically the PlanFollower exposes changes in cluster conditions (as + * tracked by the scheduler) to the plan, e.g., the overall amount of physical + * resources available. The Plan in turn can react by replanning its allocations + * if appropriate. + * + * The implementation can assume that is run frequently enough to be able to + * observe and react to normal operational changes in cluster conditions on the + * fly (e.g., if cluster resources drop, we can update the relative weights of a + * queue so that the absolute promises made to the job at reservation time are + * respected). + * + * However, due to RM restarts and the related downtime, it is advisable for + * implementations to operate in a stateless way, and be able to synchronize the + * state of plans/scheduler regardless of how big is the time gap between + * executions. + */ +public interface PlanFollower extends Runnable { + + /** + * Init function that configures the PlanFollower, by providing: + * + * @param clock a reference to the system clock. + * @param sched a reference to the underlying scheduler + * @param plans references to the plans we should keep synchronized at every + * time tick. + */ + public void init(Clock clock, ResourceScheduler sched, Collection plans); + + /** + * The function performing the actual synchronization operation for a given + * Plan. This is normally invoked by the run method, but it can be invoked + * synchronously to avoid race conditions when a user's reservation request + * start time is imminent. + * + * @param plan the Plan to synchronize + */ + public void synchronizePlan(Plan plan); + + /** + * Setter for the list of plans. + * + * @param plans the collection of Plans we operate on at every time tick. + */ + public void setPlans(Collection plans); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java new file mode 100644 index 0000000..4861951 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +public class TestCapacitySchedulerPlanFollower { + + static final long TS = 3141592653L; + + Clock mClock = null; + Configuration conf = null; + CapacityScheduler scheduler = null; + ResourceCalculator rc = new DefaultResourceCalculator(); + ReservationAgent mAgent; + Plan plan; + Resource minAlloc = Resource.newInstance(1024, 1); + ResourceCalculator res = new DefaultResourceCalculator(); + + @Rule + public TestName name = new TestName(); + + @Before + public void setup() throws IOException { + ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); + conf = new Configuration(false); + mClock = mock(Clock.class); + scheduler = testUtil.mockCapacityScheduler(125); + mAgent = mock(ReservationAgent.class); + + String reservationQ = testUtil.getFullReservationQueueName(); + CapacitySchedulerConfiguration capConf = scheduler.getConfiguration(); + capConf.setReservationWindow(reservationQ, 20L); + capConf.setMaximumCapacity(reservationQ, 40); + capConf.setAverageCapacity(reservationQ, 20); + Resource maxAlloc = Resource.newInstance(1024 * 8, 8); + + long step = 1L; + + CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); + policy.init(reservationQ, capConf, new HashSet()); + + plan = + new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent, + scheduler.getClusterResource(), step, res, + scheduler.getMinimumResourceCapability(), maxAlloc, "dedicated", + null, true); + } + + @Test + public void test() throws PlanningException, InterruptedException { + + // add a few reservations to the plan + long ts = System.currentTimeMillis(); + ReservationId r1 = ReservationId.newInstance(ts, 1); + int[] f1 = { 10, 10, 10, 10, 10 }; + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3", + "dedicated", 0, 0 + f1.length, ReservationSystemTestUtil + .generateAllocation(0L, 1L, f1), res, minAlloc))); + + ReservationId r2 = ReservationId.newInstance(ts, 2); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation(r2, null, "u3", + "dedicated", 3, 3 + f1.length, ReservationSystemTestUtil + .generateAllocation(3L, 1L, f1), res, minAlloc))); + + ReservationId r3 = ReservationId.newInstance(ts, 3); + int[] f2 = { 0, 10, 20, 10, 0 }; + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation(r3, null, "u4", + "dedicated", 10, 10 + f2.length, ReservationSystemTestUtil + .generateAllocation(10L, 1L, f2), res, minAlloc))); + + CapacitySchedulerPlanFollower planFollower = + new CapacitySchedulerPlanFollower(); + planFollower.init(mClock, scheduler, Collections.singletonList(plan)); + + when(mClock.getTime()).thenReturn(0L); + planFollower.run(); + + CSQueue q = scheduler.getQueue(r1.toString()); + assertNotNull(q); + Assert.assertEquals(0.1, q.getCapacity(), 0.01); + + // TODO add test for maxCapacit != 100% + Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0); + + CSQueue q2 = scheduler.getQueue(r2.toString()); + assertNull(q2); + CSQueue q3 = scheduler.getQueue(r3.toString()); + assertNull(q3); + + when(mClock.getTime()).thenReturn(3L); + planFollower.run(); + + q = scheduler.getQueue(r1.toString()); + assertNotNull(q); + Assert.assertEquals(0.1, q.getCapacity(), 0.01); + Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0); + q2 = scheduler.getQueue(r2.toString()); + assertNotNull(q2); + Assert.assertEquals(0.1, q.getCapacity(), 0.01); + Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0); + q3 = scheduler.getQueue(r3.toString()); + assertNull(q3); + + when(mClock.getTime()).thenReturn(10L); + planFollower.run(); + + q = scheduler.getQueue(r1.toString()); + assertNull(q); + q2 = scheduler.getQueue(r2.toString()); + assertNull(q2); + q3 = scheduler.getQueue(r3.toString()); + assertNotNull(q3); + Assert.assertEquals(0, q3.getCapacity(), 0.01); + Assert.assertEquals(1.0, q3.getMaximumCapacity(), 1.0); + + when(mClock.getTime()).thenReturn(11L); + planFollower.run(); + + q = scheduler.getQueue(r1.toString()); + assertNull(q); + q2 = scheduler.getQueue(r2.toString()); + assertNull(q2); + q3 = scheduler.getQueue(r3.toString()); + assertNotNull(q3); + Assert.assertEquals(0.1, q3.getCapacity(), 0.01); + Assert.assertEquals(0.1, q3.getMaximumCapacity(), 1.0); + + when(mClock.getTime()).thenReturn(12L); + planFollower.run(); + + q = scheduler.getQueue(r1.toString()); + assertNull(q); + q2 = scheduler.getQueue(r2.toString()); + assertNull(q2); + q3 = scheduler.getQueue(r3.toString()); + assertNotNull(q3); + Assert.assertEquals(0.2, q3.getCapacity(), 0.01); + Assert.assertEquals(0.2, q3.getMaximumCapacity(), 1.0); + + when(mClock.getTime()).thenReturn(16L); + planFollower.run(); + + q = scheduler.getQueue(r1.toString()); + assertNull(q); + q2 = scheduler.getQueue(r2.toString()); + assertNull(q2); + q3 = scheduler.getQueue(r3.toString()); + assertNull(q3); + + assertTrue(scheduler.getQueue("dedicated" + PlanQueue.DEFAULT_QUEUE_SUFFIX) + .getCapacity() > 0.9); + + } + + public static ApplicationACLsManager mockAppACLsManager() { + Configuration conf = new Configuration(); + return new ApplicationACLsManager(conf); + } + +}