diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java index 56bc99c..776ca16 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java @@ -26,7 +26,7 @@ // Preemption-related info private boolean preemptionEnabled = false; private float preemptionUtilizationThreshold; - private FSStarvedApps starvedApps; + private FSStarvedApps starvedApps = new FSStarvedApps(); public boolean isPreemptionEnabled() { return preemptionEnabled; @@ -34,9 +34,6 @@ public boolean isPreemptionEnabled() { public void setPreemptionEnabled() { this.preemptionEnabled = true; - if (starvedApps == null) { - starvedApps = new FSStarvedApps(); - } } public FSStarvedApps getStarvedApps() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index 3732086..dafd551 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -38,15 +38,17 @@ */ public class FSPreemptionThread extends Thread { private static final Log LOG = LogFactory.getLog(FSPreemptionThread.class); - private final FSContext context; + private final FairScheduler scheduler; + private final FSStarvedApps starvedApps; private final long warnTimeBeforeKill; private final Timer preemptionTimer; public FSPreemptionThread(FairScheduler scheduler) { this.scheduler = scheduler; - this.context = scheduler.getContext(); + FSContext context = scheduler.getContext(); FairSchedulerConfiguration fsConf = scheduler.getConf(); + starvedApps = context.getStarvedApps(); context.setPreemptionEnabled(); context.setPreemptionUtilizationThreshold( fsConf.getPreemptionUtilizationThreshold()); @@ -60,8 +62,8 @@ public FSPreemptionThread(FairScheduler scheduler) { public void run() { while (!Thread.interrupted()) { FSAppAttempt starvedApp; - try{ - starvedApp = context.getStarvedApps().take(); + try { + starvedApp = starvedApps.nextAppToProcess(); if (!Resources.isNone(starvedApp.getStarvation())) { List containers = identifyContainersToPreempt(starvedApp); @@ -69,6 +71,7 @@ public void run() { preemptContainers(containers); } } + starvedApps.appProcessed(starvedApp); } catch (InterruptedException e) { LOG.info("Preemption thread interrupted! Exiting."); return; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java index 670a12d..13209b8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java @@ -21,7 +21,9 @@ import org.apache.hadoop.yarn.util.resource.Resources; import java.io.Serializable; +import java.util.ArrayList; import java.util.Comparator; +import java.util.List; import java.util.concurrent.PriorityBlockingQueue; /** @@ -31,22 +33,30 @@ * in the future. This class also has some methods to simplify testing. */ public class FSStarvedApps { - private int numAppsAddedSoFar; + private int totalAppsEverAdded; private PriorityBlockingQueue apps; + private List appsBeingProcessed; public FSStarvedApps() { apps = new PriorityBlockingQueue<>(10, new StarvationComparator()); + appsBeingProcessed = new ArrayList<>(); } public void addStarvedApp(FSAppAttempt app) { - if (!apps.contains(app)) { + if (!apps.contains(app) && !appsBeingProcessed.contains(app)) { apps.add(app); - numAppsAddedSoFar++; + totalAppsEverAdded++; } } - public FSAppAttempt take() throws InterruptedException { - return apps.take(); + public FSAppAttempt nextAppToProcess() throws InterruptedException { + FSAppAttempt app = apps.take(); + appsBeingProcessed.add(app); + return app; + } + + public void appProcessed(FSAppAttempt app) { + appsBeingProcessed.remove(app); } private static class StarvationComparator implements @@ -64,8 +74,8 @@ public int compare(FSAppAttempt app1, FSAppAttempt app2) { } @VisibleForTesting - public int getNumAppsAddedSoFar() { - return numAppsAddedSoFar; + public int getTotalAppsEverAdded() { + return totalAppsEverAdded; } @VisibleForTesting diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java new file mode 100644 index 0000000..095a81f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; + +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileWriter; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.List; + +/** + * Test class to verify identification of app starvation + */ +public class TestFSAppStarvation extends FairSchedulerTestBase { + + private final static String ALLOC_FILE = + new File(TEST_DIR, "test-queues").getAbsolutePath(); + + private List rmNodes = new ArrayList<>(); + + @Before + public void setup() { + createConfiguration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true); + conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f); + } + + @After + public void teardown() { + conf = null; + if (resourceManager != null) { + resourceManager.stop(); + resourceManager = null; + } + new File(ALLOC_FILE).delete(); + } + + @Test + public void testPreemptionDisabled() throws Exception { + conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, false); + setupStarvedCluster(); + submitAppsToEachLeafQueue(); + sendNodeUpdateEvents(); + + assertEquals("Found starved apps even when preemption is turned off", 0, + scheduler.getContext().getStarvedApps().getTotalAppsEverAdded()); + } + + @Test + public void testPreemptionEnabled() throws Exception { + setupStarvedCluster(); + submitAppsToEachLeafQueue(); + sendNodeUpdateEvents(); + + Thread.sleep(10); + scheduler.update(); + + assertEquals("Incorrect starved applications", 2, + scheduler.getContext().getStarvedApps().getTotalAppsEverAdded()); + } + + @Test + public void testClusterUtilizationThreshold() throws Exception { + conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 1.1f); + setupStarvedCluster(); + submitAppsToEachLeafQueue(); + sendNodeUpdateEvents(); + + assertEquals("Found starved apps when preemption threshold is over 100%", 0, + scheduler.getContext().getStarvedApps().getTotalAppsEverAdded()); + } + + /** + * Setup the cluster for starvation testing: + * 1. Create FS allocation file + * 2. Create and start MockRM + * 3. Add two nodes to the cluster + * 4. Submit an app that uses up all resources on the cluster + */ + private void setupStarvedCluster() throws Exception { + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + + // Default queue + out.println(""); + out.println(""); + + // Queue with preemption disabled + out.println(""); + out.println("0" + + ""); + out.println(""); + + // Queue with minshare preemption enabled + out.println(""); + out.println("0" + + ""); + out.println("0" + + ""); + out.println("2048mb,2vcores"); + out.println(""); + + // Queue with fairshare preemption enabled + out.println(""); + out.println("1" + + ""); + out.println("0" + + ""); + + // Child queue under fairshare with same settings + out.println(""); + out.println("1" + + ""); + out.println("0" + + ""); + out.println(""); + + out.println(""); + + out.println(""); + out.close(); + + assertTrue("Allocation file does not exist, not running the test", + new File(ALLOC_FILE).exists()); + + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + + // Create two nodes and add them + for (int i = 0; i < 2; i++) { + addNode(4 * 1024, 4); + } + + // Create an app that takes up all the resources on the cluster + ApplicationAttemptId app + = createSchedulingRequest(1024, 1, "root.default", "default", 8); + + scheduler.update(); + sendNodeUpdateEvents(); + + assertEquals(8, scheduler.getSchedulerApp(app).getLiveContainers().size()); + } + + private void submitAppsToEachLeafQueue() { + String queues[] = {"no-preemption", "minshare", "fairshare.child"}; + for (int i = 0; i < queues.length; i++) { + createSchedulingRequest(1024, 1, "root." + queues[i], "user", 1); + } + scheduler.update(); + } + + private void addNode(int memory, int cores) { + int id = rmNodes.size() + 1; + RMNode node = + MockNodes.newNodeInfo(1, Resources.createResource(memory, cores), id, + "127.0.0." + id); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + rmNodes.add(node); + } + + private void sendNodeUpdateEvents() { + for (RMNode node : rmNodes) { + NodeUpdateSchedulerEvent nodeUpdateSchedulerEvent = + new NodeUpdateSchedulerEvent(node); + for (int i = 0; i < 4; i++) { + scheduler.handle(nodeUpdateSchedulerEvent); + } + } + } +}