diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 488f34e..91e804f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -81,6 +81,7 @@ private Resource preemptedResources = Resources.createResource(0); private RMContainerComparator comparator = new RMContainerComparator(); private final Map preemptionMap = new HashMap(); + private long lastTimeAtFairShareThreshold; // Used to record node reservation by an app. // Key = RackName, Value = Set of Nodes reserved by app on rack @@ -109,6 +110,8 @@ public FSAppAttempt(FairScheduler scheduler, this.startTime = scheduler.getClock().getTime(); this.priority = Priority.newInstance(1); this.resourceWeights = new ResourceWeights(); + this.lastMemoryAggregateAllocationUpdateTime = + scheduler.getClock().getTime(); } public ResourceWeights getResourceWeights() { @@ -964,4 +967,26 @@ public RMContainer preemptContainer() { } return toBePreempted; } + + public long getLastTimeAtFairShareThreshold() { + return lastTimeAtFairShareThreshold; + } + + public void setLastTimeAtFairShareThreshold( + long lastTimeAtFairShareThreshold) { + this.lastTimeAtFairShareThreshold = lastTimeAtFairShareThreshold; + } + + /** + * Is a appAttempt being starved for its fair share threshold. + */ + public boolean isAppStarvedForFairShare() { + Resource needed = Resources.multiply(getFairShare(), + getQueue().getFairSharePreemptionThreshold()); + Resource desiredShare = Resources.min( + getQueue().policy.getResourceCalculator(), + scheduler.getClusterResource(), needed, getDemand()); + return Resources.lessThan(getQueue().policy.getResourceCalculator(), + scheduler.getClusterResource(), getResourceUsage(), desiredShare); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index ca5a146..38acd40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -271,13 +271,17 @@ public void updateDemand() { try { for (FSAppAttempt sched : runnableApps) { if (Resources.equals(demand, maxRes)) { - break; + // just update demand for app itself rather than leaf queue; + sched.updateDemand(); + continue; } updateDemandForApp(sched, maxRes); } for (FSAppAttempt sched : nonRunnableApps) { if (Resources.equals(demand, maxRes)) { - break; + // just update demand for app itself rather than leaf queue; + sched.updateDemand(); + continue; } updateDemandForApp(sched, maxRes); } @@ -348,10 +352,16 @@ public Resource assignContainer(FSSchedulerNode node) { @Override public RMContainer preemptContainer() { + return preemptContainer(false); + } + + // if force is true, queue will preempt container no matter resource usage + // is over fairshare, which happens in preemption within queue. + public RMContainer preemptContainer(boolean force) { RMContainer toBePreempted = null; // If this queue is not over its fair share, reject - if (!preemptContainerPreCheck()) { + if (!force && !preemptContainerPreCheck()) { return toBePreempted; } @@ -511,7 +521,8 @@ public void recoverContainer(Resource clusterResource, } /** - * Update the preemption fields for the queue, i.e. the times since last was + * Update the preemption fields for the queue and each runnable app in queue, + * i.e. the times since last was * at its guaranteed share and over its fair share threshold. */ public void updateStarvationStats() { @@ -522,6 +533,12 @@ public void updateStarvationStats() { if (!isStarvedForFairShare()) { setLastTimeAtFairShareThreshold(now); } + // check each app starvation stats + for (FSAppAttempt app : runnableApps) { + if (!app.isAppStarvedForFairShare()) { + app.setLastTimeAtFairShareThreshold(now); + } + } } /** Allows setting weight for a dynamically created queue @@ -567,4 +584,8 @@ private boolean isStarved(Resource share) { return Resources.lessThan(policy.getResourceCalculator(), scheduler.getClusterResource(), resourceUsage, desiredShare); } + + public List getRunnableApps() { + return runnableApps; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 7e013e0..f2b6738 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -23,10 +23,12 @@ import java.util.Collections; import java.util.Comparator; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -179,7 +181,10 @@ protected long waitTimeBeforeKill; // Containers whose AMs have been warned that they will be preempted soon. - private List warnedContainers = new ArrayList(); + // Value is the certain queuename of preemption within queue + // For global preemption, value is null + private Map warnedContainers = + new HashMap(); private float reservableNodesRatio; // percentage of available nodes // an app can be reserved on @@ -209,6 +214,9 @@ @VisibleForTesting AllocationConfiguration allocConf; + // Resource preempted from queues due to preemption within queue + private Map resourceDeficitWithinQueues; + // Container size threshold for making a reservation. @VisibleForTesting Resource reservationThreshold; @@ -403,16 +411,23 @@ protected synchronized void preemptTasksIfNecessary() { lastPreemptCheckTime = curTime; Resource resToPreempt = Resources.clone(Resources.none()); + Map withinQueueResToPreempt = + new HashMap(); for (FSLeafQueue sched : queueMgr.getLeafQueues()) { - Resources.addTo(resToPreempt, resourceDeficit(sched, curTime)); + Resources.addTo(resToPreempt, + resourceDeficit(sched, curTime, withinQueueResToPreempt)); } - if (isResourceGreaterThanNone(resToPreempt)) { - preemptResources(resToPreempt); + if (isResourceGreaterThanNone(resToPreempt) + || !withinQueueResToPreempt.isEmpty()) { + preemptResources(resToPreempt, withinQueueResToPreempt); } } /** - * Preempt a quantity of resources. Each round, we start from the root queue, + * Preempt a quantity of resources for preemption among queues and + * a queue-resource map for preemption within queues. + * Each round, we preempt resource within queues before global preemption + * For global preemption, we start from the root queue, * level-by-level, until choosing a candidate application. * The policy for prioritizing preemption for each queue depends on its * SchedulingPolicy: (1) fairshare/DRF, choose the ChildSchedulable that is @@ -422,23 +437,44 @@ protected synchronized void preemptTasksIfNecessary() { * containers with lowest priority to preempt. * We make sure that no queue is placed below its fair share in the process. */ - protected void preemptResources(Resource toPreempt) { + protected void preemptResources(Resource toPreempt, + Map withinQueueResToPreempt) { long start = getClock().getTime(); - if (Resources.equals(toPreempt, Resources.none())) { + if (Resources.equals(toPreempt, Resources.none()) && + withinQueueResToPreempt.isEmpty()) { return; } // Scan down the list of containers we've already warned and kill them // if we need to. Remove any containers from the list that we don't need // or that are no longer running. - Iterator warnedIter = warnedContainers.iterator(); + Iterator> warnedIter = + warnedContainers.entrySet().iterator(); while (warnedIter.hasNext()) { - RMContainer container = warnedIter.next(); + Entry entry = warnedIter.next(); + RMContainer container = entry.getKey(); + String queue = entry.getValue(); if ((container.getState() == RMContainerState.RUNNING || container.getState() == RMContainerState.ALLOCATED) && - isResourceGreaterThanNone(toPreempt)) { + (isResourceGreaterThanNone(toPreempt) || + !withinQueueResToPreempt.isEmpty())) { warnOrKillContainer(container); - Resources.subtractFrom(toPreempt, container.getContainer().getResource()); + if (queue == null) { + Resources.subtractFrom(toPreempt, + container.getContainer().getResource()); + } else { + Resource toPreemptInQueue = withinQueueResToPreempt.get(queue); + if (toPreemptInQueue == null) { + Resources.subtractFrom(toPreempt, + container.getContainer().getResource()); + } else { + Resources.subtractFrom(toPreemptInQueue, + container.getContainer().getResource()); + if (!isResourceGreaterThanNone(toPreemptInQueue)) { + withinQueueResToPreempt.remove(queue); + } + } + } } else { warnedIter.remove(); } @@ -450,6 +486,38 @@ protected void preemptResources(Resource toPreempt) { queue.resetPreemptedResources(); } + // Should preempt resource within queue before global preemption + // For preemption within queue, + // we mark container in warnedContainers as + // For global preemption, + // we mark container in warnedContainers as + for (Entry entry : withinQueueResToPreempt.entrySet()) { + String queueName = entry.getKey(); + FSLeafQueue leafQueue = getQueueManager() + .getLeafQueue(queueName, false); + if (leafQueue != null) { + Resource toPreemptWithinQueue = entry.getValue(); + while (isResourceGreaterThanNone(toPreemptWithinQueue)) { + RMContainer container = leafQueue.preemptContainer(true); + if (container == null) { + break; + } else { + warnOrKillContainer(container); + warnedContainers.put(container, queueName); + Resource preemptedWithinQueue = resourceDeficitWithinQueues + .get(queueName); + if (preemptedWithinQueue == null) { + preemptedWithinQueue = Resources.createResource(0); + } + Resources.addTo(preemptedWithinQueue, container.getContainer() + .getResource()); + resourceDeficitWithinQueues.put(queueName, preemptedWithinQueue); + Resources.subtractFrom(toPreemptWithinQueue, container + .getContainer().getResource()); + } + } + } + } while (isResourceGreaterThanNone(toPreempt)) { RMContainer container = getQueueManager().getRootQueue().preemptContainer(); @@ -457,7 +525,7 @@ protected void preemptResources(Resource toPreempt) { break; } else { warnOrKillContainer(container); - warnedContainers.add(container); + warnedContainers.put(container, null); Resources.subtractFrom( toPreempt, container.getContainer().getResource()); } @@ -473,6 +541,11 @@ protected void preemptResources(Resource toPreempt) { fsOpDurations.addPreemptCallDuration(duration); } + // for test + protected void preemptResources(Resource toPreempt) { + preemptResources(toPreempt, new HashMap()); + } + private boolean isResourceGreaterThanNone(Resource toPreempt) { return (toPreempt.getMemory() > 0) || (toPreempt.getVirtualCores() > 0); } @@ -510,6 +583,7 @@ protected void warnOrKillContainer(RMContainer container) { /** * Return the resource amount that this queue is allowed to preempt, if any. + * Add resource to preempt within queue to withinQueueResToPreempt, if any. * If the queue has been below its min share for at least its preemption * timeout, it should preempt the difference between its current share and * this min share. If it has been below its fair share preemption threshold @@ -518,7 +592,8 @@ protected void warnOrKillContainer(RMContainer container) { * max of the two amounts (this shouldn't happen unless someone sets the * timeouts to be identical for some reason). */ - protected Resource resourceDeficit(FSLeafQueue sched, long curTime) { + protected Resource resourceDeficit(FSLeafQueue sched, long curTime, + Map withinQueueResToPreempt) { long minShareTimeout = sched.getMinSharePreemptionTimeout(); long fairShareTimeout = sched.getFairSharePreemptionTimeout(); Resource resDueToMinShare = Resources.none(); @@ -538,16 +613,44 @@ protected Resource resourceDeficit(FSLeafQueue sched, long curTime) { } Resource deficit = Resources.max(calc, clusterResource, resDueToMinShare, resDueToFairShare); - if (Resources.greaterThan(calc, clusterResource, - deficit, Resources.none())) { + + if (isResourceGreaterThanNone(deficit)) { String message = "Should preempt " + deficit + " res for queue " + sched.getName() + ": resDueToMinShare = " + resDueToMinShare + ", resDueToFairShare = " + resDueToFairShare; LOG.info(message); + } else { + // if none resource to preempt for this queue, + // check each app in queue if need resource within queue + Resource queueResToPreempt = Resources.clone(Resources.none()); + for(FSAppAttempt app : sched.getRunnableApps()) { + if (curTime - app.getLastTimeAtFairShareThreshold() > fairShareTimeout) { + Resource target = Resources.min(calc, clusterResource, + app.getFairShare(), app.getDemand()); + Resource resForApp = Resources.max(calc, clusterResource, + Resources.none(), + Resources.subtract(target, app.getResourceUsage())); + queueResToPreempt = Resources.add(queueResToPreempt, resForApp); + } + } + + if (withinQueueResToPreempt != null && + isResourceGreaterThanNone(queueResToPreempt)) { + withinQueueResToPreempt.put(sched.getName(), queueResToPreempt); + String message = "Should preempt " + queueResToPreempt + + " res within queue " + sched.getName(); + LOG.info(message); + } } + return deficit; } + // for test + protected Resource resourceDeficit(FSLeafQueue sched, long curTime) { + return resourceDeficit(sched, curTime, null); + } + public synchronized RMContainerTokenSecretManager getContainerTokenSecretManager() { return rmContext.getContainerTokenSecretManager(); @@ -1157,8 +1260,34 @@ synchronized void attemptScheduling(FSSchedulerNode node) { int assignedContainers = 0; while (node.getReservedContainer() == null) { boolean assignedContainer = false; - if (!queueMgr.getRootQueue().assignContainer(node).equals( - Resources.none())) { + Resource assignedResource = Resources.none(); + // assign to queue with resource preempted within queue + Iterator> deficitResIter = + resourceDeficitWithinQueues.entrySet().iterator(); + while (deficitResIter.hasNext()) { + Entry entry = deficitResIter.next(); + String queue = entry.getKey(); + Resource resDeficitWithinQueue = entry.getValue(); + if (isResourceGreaterThanNone(resDeficitWithinQueue)) { + FSLeafQueue leafQueue = queueMgr.getLeafQueue(queue, false); + if (leafQueue != null) { + assignedResource = leafQueue.assignContainer(node); + if (!assignedResource.equals(Resources.none())) { + assignedContainers++; + assignedContainer = true; + Resources.subtractFrom(resDeficitWithinQueue, assignedResource); + if (!isResourceGreaterThanNone(resDeficitWithinQueue)) { + deficitResIter.remove(); + } + break; + } + } + } + } + if (assignedResource.equals(Resources.none())) { + assignedResource = queueMgr.getRootQueue().assignContainer(node); + } + if (!assignedResource.equals(Resources.none())) { assignedContainers++; assignedContainer = true; } @@ -1422,6 +1551,8 @@ private void initScheduler(Configuration conf) throws IOException { throw new IOException("Failed to start FairScheduler", e); } + resourceDeficitWithinQueues = new ConcurrentHashMap(); + updateThread = new UpdateThread(); updateThread.setName("FairSchedulerUpdateThread"); updateThread.setDaemon(true); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index fac28b7..5438f12 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2820,6 +2820,162 @@ public void testPreemptionDecisionWhenPreemptionDisabledOnAllQueues() } @Test + /** + * Tests the timing of decision to preempt tasks. + */ + public void testPreemptionDecisionWithinQueue() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + ControlledClock clock = new ControlledClock(); + scheduler.setClock(clock); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("0mb,0vcores"); + out.println(""); + out.println(""); + out.println(".25"); + out.println("1024mb,0vcores"); + out.println("4096mb,4vcores"); + out.println(""); + out.println(""); + out.println(".25"); + out.println("1024mb,0vcores"); + out.println("4096mb,4vcores"); + out.println(""); + out.println(""); + out.println(".25"); + out.println("1024mb,0vcores"); + out.println(""); + out.println(""); + out.println(".25"); + out.println("1024mb,0vcores"); + out.println(""); + out.println("5"); + out.println("10"); + out.println(".5"); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Create four nodes + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + RMNode node3 = + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3, + "127.0.0.3"); + NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); + scheduler.handle(nodeEvent3); + + RMNode node4 = + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3, + "127.0.0.4"); + NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4); + scheduler.handle(nodeEvent4); + + // Queue A and B each request 4 containers + ApplicationAttemptId app1 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1); + ApplicationAttemptId app2 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 2); + + scheduler.update(); + + // Sufficient node check-ins to fully schedule containers + for (int i = 0; i < 2; i++) { + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeUpdate1); + + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); + scheduler.handle(nodeUpdate2); + + NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate3); + + NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4); + scheduler.handle(nodeUpdate4); + } + + // Now new requests arrive from queue B + ApplicationAttemptId app3 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 2); + + scheduler.update(); + + FSLeafQueue schedA = + scheduler.getQueueManager().getLeafQueue("queueA", true); + FSLeafQueue schedB = + scheduler.getQueueManager().getLeafQueue("queueB", true); + + assertTrue(Resources.equals( + Resources.none(), scheduler.resourceDeficit(schedA, clock.getTime(), null))); + assertTrue(Resources.equals( + Resources.none(), scheduler.resourceDeficit(schedB, clock.getTime(), null))); + // After fairSharePreemptionTime has passed, they should want to preempt fair + // share. + clock.tickSec(12); + // now fairshare/minshare for queue is satisfied, but certain app is starve + // fairshare usage starve + // queueA 4096mb 4096mb no + // queueB 4096mb 4096mb no + // app01 4096mb 4096mb no + // app02 2048mb 4096mb no + // app03 2048mb 0mb yes + Map internalResToPreempt = new HashMap(); + assertTrue(Resources.equals(Resources.none(), + scheduler.resourceDeficit(schedA, clock.getTime(), internalResToPreempt))); + assertTrue(Resources.equals(Resources.none(), + scheduler.resourceDeficit(schedB, clock.getTime(), internalResToPreempt))); + assertEquals(internalResToPreempt.get(schedB.getName()).getMemory(), 2048); + //now preempt res + scheduler.preemptResources(Resources.none(), internalResToPreempt); + // now only app2 is selected to be preempted + assertTrue("App2 should have container to be preempted", + !Collections.disjoint( + scheduler.getSchedulerApp(app2).getLiveContainers(), + scheduler.getSchedulerApp(app2).getPreemptionContainers())); + assertTrue("App1 should not have container to be preempted", + Collections.disjoint( + scheduler.getSchedulerApp(app1).getLiveContainers(), + scheduler.getSchedulerApp(app1).getPreemptionContainers())); + + scheduler.update(); + clock.tickSec(20); + scheduler.preemptResources(Resources.createResource(2 * 1024)); + for (int i = 0; i < 3; i++) { + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeUpdate1); + + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); + scheduler.handle(nodeUpdate2); + + NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate3); + + NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4); + scheduler.handle(nodeUpdate4); + } + // after preemption + assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + } + + @Test public void testBackwardsCompatiblePreemptionConfiguration() throws Exception { conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index 6f759ce..71e2e14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -26,7 +26,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; - import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; @@ -37,6 +36,7 @@ import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; +import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -51,7 +51,8 @@ public int lastPreemptMemory = -1; @Override - protected void preemptResources(Resource toPreempt) { + protected void preemptResources(Resource toPreempt, + Map withinQueueResToPreempt) { lastPreemptMemory = toPreempt.getMemory(); }