diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index f5bc2cd..666829c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -81,12 +82,14 @@ private Resource fairShare = Resources.createResource(0, 0); // Preemption related variables - private Resource fairshareStarvation = Resources.none(); - private Resource minshareStarvation = Resources.none(); private final Resource preemptedResources = Resources.clone(Resources.none()); private final Set containersToPreempt = new HashSet<>(); + private Resource fairshareStarvation = Resources.none(); private long lastTimeAtFairShare; + // minShareStarvation attributed to this application by the leaf queue + private Resource minshareStarvation = Resources.none(); + // Used to record node reservation by an app. // Key = RackName, Value = Set of Nodes reserved by app on rack private Map> reservations = new HashMap<>(); @@ -149,7 +152,7 @@ void containerCompleted(RMContainer rmContainer, // Remove from the list of containers liveContainers.remove(rmContainer.getContainerId()); - removePreemption(rmContainer); + untrackContainerForPreemption(rmContainer); Resource containerResource = rmContainer.getContainer().getResource(); RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER, @@ -510,26 +513,42 @@ public FSLeafQueue getQueue() { } // Preemption related methods + + /** + * Get overall starvation - fairshare and attributed minshare + * + * @return total starvation attributed to this application + */ Resource getStarvation() { return Resources.add(fairshareStarvation, minshareStarvation); } + /** + * Set the minshare attributed to this application. To be called only from + * {@link FSLeafQueue#updateStarvedApps}. + * + * @param starvation minshare starvation attributed to this app + */ void setMinshareStarvation(Resource starvation) { this.minshareStarvation = starvation; } + /** + * Reset the minshare starvation attributed to this application. To be + * called only from {@link FSLeafQueue#updateStarvedApps} + */ void resetMinshareStarvation() { this.minshareStarvation = Resources.none(); } - void addPreemption(RMContainer container) { + void trackContainerForPreemption(RMContainer container) { containersToPreempt.add(container); synchronized (preemptedResources) { Resources.addTo(preemptedResources, container.getAllocatedResource()); } } - void removePreemption(RMContainer container) { + private void untrackContainerForPreemption(RMContainer container) { synchronized (preemptedResources) { Resources.subtractFrom(preemptedResources, container.getAllocatedResource()); @@ -540,7 +559,6 @@ void removePreemption(RMContainer container) { Set getPreemptionContainers() { return containersToPreempt; } - private Resource getPreemptedResources() { synchronized (preemptedResources) { @@ -563,8 +581,8 @@ boolean canContainerBePreempted(RMContainer container) { } // Check if any of the parent queues are not preemptable - // TODO (KK): Propagate the "preemptable" flag all the way down to the app - // to avoid recursing up every time. + // TODO (YARN-5831): Propagate the "preemptable" flag all the way down to + // the app to avoid recursing up every time. for (FSQueue q = getQueue(); !q.getQueueName().equals("root"); q = q.getParent()) { @@ -585,8 +603,9 @@ boolean canContainerBePreempted(RMContainer container) { /** * Create and return a container object reflecting an allocation for the - * given appliction on the given node with the given capability and + * given application on the given node with the given capability and * priority. + * * @param node Node * @param capability Capability * @param schedulerKey Scheduler Key @@ -1076,6 +1095,15 @@ ResourceRequest getNextResourceRequest() { return appSchedulingInfo.getNextResourceRequest(); } + /** + * Helper method that captures if this app is identified to be starved. + * @return true if the app is starved for fairshare, false otherwise + */ + @VisibleForTesting + boolean isStarvedForFairShare() { + return !Resources.isNone(fairshareStarvation); + } + /* Schedulable methods implementation */ @Override @@ -1105,14 +1133,13 @@ public Resource getMaxShare() { @Override public Resource getResourceUsage() { - // Here the getPreemptedResources() always return zero, except in - // a preemption round - // In the common case where preempted resource is zero, return the - // current consumption Resource object directly without calling - // Resources.subtract which creates a new Resource object for each call. - return getPreemptedResources().equals(Resources.none()) ? - getCurrentConsumption() : - Resources.subtract(getCurrentConsumption(), getPreemptedResources()); + /* + * getResourcesToPreempt() returns zero, except when there are containers + * to preempt. Avoid creating an object in the common case. + */ + return getPreemptedResources().equals(Resources.none()) + ? getCurrentConsumption() + : Resources.subtract(getCurrentConsumption(), getPreemptedResources()); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 3fcf627..0b4cc73 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -49,8 +49,9 @@ @Private @Unstable public class FSLeafQueue extends FSQueue { - private static final Log LOG = LogFactory.getLog( - FSLeafQueue.class.getName()); + private static final Log LOG = LogFactory.getLog(FSLeafQueue.class.getName()); + private static final List EMPTY_LIST = Collections.emptyList(); + private FairScheduler scheduler; private FSContext context; @@ -71,7 +72,6 @@ private Resource amResourceUsage; private final ActiveUsersManager activeUsersManager; - private static final List EMPTY_LIST = Collections.emptyList(); public FSLeafQueue(String name, FairScheduler scheduler, FSParentQueue parent) { @@ -210,7 +210,7 @@ public void updateInternal(boolean checkStarvation) { try { policy.computeShares(runnableApps, getFairShare()); if (checkStarvation) { - updatedStarvedApps(); + updateStarvedApps(); } } finally { readLock.unlock(); @@ -234,7 +234,7 @@ public void updateInternal(boolean checkStarvation) { * one application that is starved. And, even if the queue is not * starved due to fairshare, there might still be starved applications. */ - private void updatedStarvedApps() { + private void updateStarvedApps() { // First identify starved applications and track total amount of // starvation (in resources) Resource fairShareStarvation = Resources.clone(none()); @@ -549,10 +549,33 @@ private Resource minShareStarvation() { /** * Helper method for tests to check if a queue is starved for minShare. - * @return whether starved for minShare. + * @return whether starved for minshare. */ @VisibleForTesting - boolean isStarvedForMinShare() { + private boolean isStarvedForMinShare() { return !Resources.isNone(minShareStarvation()); } + + /** + * Helper method for tests to check if a queue is starved for minShare. + * @return whether starved for fairshare. + */ + @VisibleForTesting + private boolean isStarvedForFairShare() { + for (FSAppAttempt app : runnableApps) { + if (app.isStarvedForFairShare()) { + return true; + } + } + return false; + } + + /** + * Helper method for tests to check if a queue is starved for minShare. + * @return whether starved for either minshare or fairshare + */ + @VisibleForTesting + boolean isStarved() { + return isStarvedForMinShare() || isStarvedForFairShare(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index 01c830c..0a23662 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -36,14 +36,14 @@ /** * Thread that handles FairScheduler preemption. */ -public class FSPreemptionThread extends Thread { +class FSPreemptionThread extends Thread { private static final Log LOG = LogFactory.getLog(FSPreemptionThread.class); protected final FSContext context; private final FairScheduler scheduler; private final long warnTimeBeforeKill; private final Timer preemptionTimer; - public FSPreemptionThread(FairScheduler scheduler) { + FSPreemptionThread(FairScheduler scheduler) { this.scheduler = scheduler; this.context = scheduler.getContext(); FairSchedulerConfiguration fsConf = scheduler.getConf(); @@ -80,8 +80,10 @@ public void run() { * Given an app, identify containers to preempt to satisfy the app's next * resource request. * - * @param starvedApp - * @return + * @param starvedApp starved application for which we are identifying + * preemption targets + * @return list of containers to preempt to satisfy starvedApp, null if the + * app cannot be satisfied by preempting any running containers. */ private List identifyContainersToPreempt( FSAppAttempt starvedApp) { @@ -103,14 +105,13 @@ public void run() { // Reset containers for the new node being considered. containers.clear(); + // TODO (YARN-5829): Attempt to reserve the node for starved app. The + // subsequent if-check needs to be reworked accordingly. FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable(); if (nodeReservedApp != null && !nodeReservedApp.equals(starvedApp)) { // This node is already reserved by another app. Let us not consider // this for preemption. continue; - - // TODO (KK): If the nodeReservedApp is over its fairshare, may be it - // is okay to unreserve it if we find enough resources. } // Figure out list of containers to consider @@ -137,13 +138,15 @@ public void run() { // FSSchedulerNode#removeContainerForPreemption. node.addContainersForPreemption(containers); return containers; + } else { + // TODO (YARN-5829): Unreserve the node for the starved app. } } } return null; } - public void preemptContainers(List containers) { + private void preemptContainers(List containers) { // Warn application about containers to be killed for (RMContainer container : containers) { ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); @@ -151,7 +154,7 @@ public void preemptContainers(List containers) { FSLeafQueue queue = app.getQueue(); LOG.info("Preempting container " + container + " from queue " + queue.getName()); - app.addPreemption(container); + app.trackContainerForPreemption(container); } // Schedule timer task to kill containers diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index a605af6..a27a222 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -105,7 +105,7 @@ public synchronized void unreserveResource( this.reservedAppSchedulable = null; } - public synchronized FSAppAttempt getReservedAppSchedulable() { + synchronized FSAppAttempt getReservedAppSchedulable() { return reservedAppSchedulable; } @@ -118,14 +118,14 @@ public synchronized FSAppAttempt getReservedAppSchedulable() { * * @param containers container to mark */ - public void addContainersForPreemption(Collection containers) { + void addContainersForPreemption(Collection containers) { containersForPreemption.addAll(containers); } /** * @return set of containers marked for preemption. */ - public Set getContainersForPreemption() { + Set getContainersForPreemption() { return containersForPreemption; } @@ -134,7 +134,7 @@ public void addContainersForPreemption(Collection containers) { * * @param container container to remove */ - public void removeContainerForPreemption(RMContainer container) { + void removeContainerForPreemption(RMContainer container) { containersForPreemption.remove(container); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index abe8a6a..571f2e6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -998,13 +998,6 @@ private void updateRootQueueMetrics() { * Check if preemption is enabled and the utilization threshold for * preemption is met. * - * TODO (KK): Should we handle the case where usage is less than preemption - * threshold, but there are applications requesting resources on nodes that - * are otherwise occupied by long running applications over their - * fairshare? What if they are occupied by applications not over their - * fairshare? Does this mean YARN should not allocate all resources on a - * node to long-running services? - * * @return true if preemption should be attempted, false otherwise. */ private boolean shouldAttemptPreemption() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java index 323152d..382ce1b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java @@ -45,6 +45,8 @@ // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore) private static final int NODE_CAPACITY_MULTIPLE = 4; + private static final String queues[] = + {"no-preemption", "minshare", "fairshare.child", "drf.child"}; private FairSchedulerWithMockPreemption.MockPreemptionThread preemptionThread; @@ -94,12 +96,14 @@ public void testPreemptionEnabled() throws Exception { assertNotNull("FSContext does not have an FSStarvedApps instance", scheduler.getContext().getStarvedApps()); assertEquals("Expecting 2 starved applications, one each for the " + - "minshare and fairshare queues", 2, + "minshare and fairshare queues", 3, preemptionThread.uniqueAppsAdded()); // Verify the apps get added again on a subsequent update scheduler.update(); Thread.yield(); + + verifyLeafQueueStarvation(); assertTrue("Each app is marked as starved exactly once", preemptionThread.totalAppsAdded() > preemptionThread.uniqueAppsAdded()); } @@ -121,6 +125,16 @@ public void testClusterUtilizationThreshold() throws Exception { preemptionThread.totalAppsAdded()); } + private void verifyLeafQueueStarvation() { + for (String q : queues) { + if (!q.equals("no-preemption")) { + boolean isStarved = + scheduler.getQueueManager().getLeafQueue(q, false).isStarved(); + assertTrue(isStarved); + } + } + } + private void setupClusterAndSubmitJobs() throws Exception { setupStarvedCluster(); submitAppsToEachLeafQueue(); @@ -173,15 +187,18 @@ private void setupStarvedCluster() throws IOException { ""); out.println("0" + ""); + out.println("fair"); + addChildQueue(out); + out.println(""); - // Child queue under fairshare with same settings - out.println(""); + // Queue with fairshare preemption enabled + out.println(""); out.println("1" + ""); out.println("0" + ""); - out.println(""); - + out.println("drf"); + addChildQueue(out); out.println(""); out.println(""); @@ -210,8 +227,17 @@ private void setupStarvedCluster() throws IOException { assertEquals(8, scheduler.getSchedulerApp(app).getLiveContainers().size()); } + private void addChildQueue(PrintWriter out) { + // Child queue under fairshare with same settings + out.println(""); + out.println("1" + + ""); + out.println("0" + + ""); + out.println(""); + } + private void submitAppsToEachLeafQueue() { - String queues[] = {"no-preemption", "minshare", "fairshare.child"}; for (String queue : queues) { createSchedulingRequest(1024, 1, "root." + queue, "user", 1); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index b8f4a4d..98de8db 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -105,12 +105,8 @@ public void test() throws Exception { PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println(""); out.println(""); - out.println(""); - out.println("2048mb,0vcores"); - out.println(""); - out.println(""); - out.println("2048mb,0vcores"); - out.println(""); + out.println(""); + out.println(""); out.println(""); out.close(); @@ -143,162 +139,6 @@ public void test() throws Exception { scheduler.update(); Collection queues = scheduler.getQueueManager().getLeafQueues(); assertEquals(3, queues.size()); - - // Queue A should be above min share, B below. - FSLeafQueue queueA = - scheduler.getQueueManager().getLeafQueue("queueA", false); - FSLeafQueue queueB = - scheduler.getQueueManager().getLeafQueue("queueB", false); -// TODO: assertFalse(queueA.isStarvedForMinShare()); -// TODO: assertTrue(queueB.isStarvedForMinShare()); - - // Node checks in again, should allocate for B - scheduler.handle(nodeEvent2); - // Now B should have min share ( = demand here) -// TODO: assertFalse(queueB.isStarvedForMinShare()); - } - - @Test (timeout = 5000) - public void testIsStarvedForFairShare() throws Exception { - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println(""); - out.println(""); - out.println(""); - out.println(".2"); - out.println(""); - out.println(""); - out.println(".8"); - out.println(".4"); - out.println(""); - out.println(""); - out.println(""); - out.println(".6"); - out.println(""); - out.println(""); - out.println(".5"); - out.println(""); - out.close(); - - resourceManager = new MockRM(conf); - resourceManager.start(); - scheduler = (FairScheduler) resourceManager.getResourceScheduler(); - - // Add one big node (only care about aggregate capacity) - RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1, - "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - - scheduler.update(); - - // Queue A wants 4 * 1024. Node update gives this all to A - createSchedulingRequest(1 * 1024, "queueA", "user1", 4); - scheduler.update(); - NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); - for (int i = 0; i < 4; i ++) { - scheduler.handle(nodeEvent2); - } - - QueueManager queueMgr = scheduler.getQueueManager(); - FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false); - assertEquals(4 * 1024, queueA.getResourceUsage().getMemorySize()); - - // Both queue B1 and queue B2 want 3 * 1024 - createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 3); - createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 3); - scheduler.update(); - for (int i = 0; i < 4; i ++) { - scheduler.handle(nodeEvent2); - } - - FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", false); - FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", false); - assertEquals(2 * 1024, queueB1.getResourceUsage().getMemorySize()); - assertEquals(2 * 1024, queueB2.getResourceUsage().getMemorySize()); - - // For queue B1, the fairSharePreemptionThreshold is 0.4, and the fair share - // threshold is 1.6 * 1024 -// TODO: assertFalse(queueB1.isStarvedForFairShare()); - - // For queue B2, the fairSharePreemptionThreshold is 0.6, and the fair share - // threshold is 2.4 * 1024 -// TODO: assertTrue(queueB2.isStarvedForFairShare()); - - // Node checks in again - scheduler.handle(nodeEvent2); - scheduler.handle(nodeEvent2); - assertEquals(3 * 1024, queueB1.getResourceUsage().getMemorySize()); - assertEquals(3 * 1024, queueB2.getResourceUsage().getMemorySize()); - - // Both queue B1 and queue B2 usages go to 3 * 1024 -// TODO: assertFalse(queueB1.isStarvedForFairShare()); -// TODO: assertFalse(queueB2.isStarvedForFairShare()); - } - - @Test (timeout = 5000) - public void testIsStarvedForFairShareDRF() throws Exception { - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println(""); - out.println(""); - out.println(""); - out.println(".5"); - out.println(""); - out.println(""); - out.println(".5"); - out.println(""); - out.println("1"); - out.println("drf"); - out.println(""); - out.close(); - - resourceManager = new MockRM(conf); - resourceManager.start(); - scheduler = (FairScheduler) resourceManager.getResourceScheduler(); - - // Add one big node (only care about aggregate capacity) - RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1, - "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - - scheduler.update(); - - // Queue A wants 7 * 1024, 1. Node update gives this all to A - createSchedulingRequest(7 * 1024, 1, "queueA", "user1", 1); - scheduler.update(); - NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); - scheduler.handle(nodeEvent2); - - QueueManager queueMgr = scheduler.getQueueManager(); - FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false); - assertEquals(7 * 1024, queueA.getResourceUsage().getMemorySize()); - assertEquals(1, queueA.getResourceUsage().getVirtualCores()); - - // Queue B has 3 reqs : - // 1) 2 * 1024, 5 .. which will be granted - // 2) 1 * 1024, 1 .. which will be granted - // 3) 1 * 1024, 1 .. which wont - createSchedulingRequest(2 * 1024, 5, "queueB", "user1", 1); - createSchedulingRequest(1 * 1024, 2, "queueB", "user1", 2); - scheduler.update(); - for (int i = 0; i < 3; i ++) { - scheduler.handle(nodeEvent2); - } - - FSLeafQueue queueB = queueMgr.getLeafQueue("queueB", false); - assertEquals(3 * 1024, queueB.getResourceUsage().getMemorySize()); - assertEquals(6, queueB.getResourceUsage().getVirtualCores()); - - scheduler.update(); - - // Verify that Queue us not starved for fair share.. - // Since the Starvation logic now uses DRF when the policy = drf, The - // Queue should not be starved -// TODO: assertFalse(queueB.isStarvedForFairShare()); } @Test diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManagerRealScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManagerRealScheduler.java new file mode 100644 index 0000000..b2fb725 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManagerRealScheduler.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; + +import static org.junit.Assert.assertEquals; + +public class TestQueueManagerRealScheduler extends FairSchedulerTestBase { + private final static File ALLOC_FILE = new File(TEST_DIR, "test-queue-mgr"); + + @Before + public void setup() throws IOException { + createConfiguration(); + writeAllocFile(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, + ALLOC_FILE.getAbsolutePath()); + + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + } + + @After + public void teardown() { + ALLOC_FILE.deleteOnExit(); + if (resourceManager != null) { + resourceManager.stop(); + resourceManager = null; + } + } + + private void writeAllocFile() throws IOException { + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println("5"); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.print("15"); + out.print("30"); + out.print("40"); + out.println(""); + out.close(); + } + + @Test + public void testBackwardsCompatiblePreemptionConfiguration() throws Exception { + // Check the min/fair share preemption timeout for each queue + QueueManager queueMgr = scheduler.getQueueManager(); + assertEquals(30000, queueMgr.getQueue("root") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("default") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("queueA") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("queueB") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("queueB.queueB1") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("queueB.queueB2") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("queueC") + .getFairSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("root") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("default") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("queueA") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("queueB") + .getMinSharePreemptionTimeout()); + assertEquals(5000, queueMgr.getQueue("queueB.queueB1") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("queueB.queueB2") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("queueC") + .getMinSharePreemptionTimeout()); + + // If both exist, we take the default one + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println("5"); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.print("15"); + out.print("25"); + out.print("30"); + out.println(""); + out.close(); + + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + assertEquals(25000, queueMgr.getQueue("root") + .getFairSharePreemptionTimeout()); + } +}