diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index d148132..9257457 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -1206,6 +1206,17 @@ protected void setAttemptRecovering(boolean isRecovering) { this.isAttemptRecovering = isRecovering; } + @Override + public boolean equals(Object o) { + if (! (o instanceof SchedulerApplicationAttempt)) { + return false; + } + + SchedulerApplicationAttempt other = (SchedulerApplicationAttempt) o; + return (this == other || + this.getApplicationAttemptId().equals(other.getApplicationAttemptId())); + } + /** * Different state for Application Master, user can see this state from web UI */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java index 670a12d..430146c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java @@ -25,28 +25,53 @@ import java.util.concurrent.PriorityBlockingQueue; /** - * Helper class to track starved apps. + * Helper class to track starved applications. * * Initially, this uses a blocking queue. We could use other data structures * in the future. This class also has some methods to simplify testing. */ public class FSStarvedApps { - private int numAppsAddedSoFar; - private PriorityBlockingQueue apps; + + // List of apps to be processed by the preemption thread. + private PriorityBlockingQueue appsToProcess; + + // App being currently processed. This assumes a single reader. + private FSAppAttempt appBeingProcessed; + + // Count of apps added over the life of the scheduler for testing purposes + private int totalAppsEverAdded; public FSStarvedApps() { - apps = new PriorityBlockingQueue<>(10, new StarvationComparator()); + appsToProcess = new PriorityBlockingQueue<>(10, new StarvationComparator()); } + /** + * Add a starved application if it is not already added. + * @param app application to add + */ public void addStarvedApp(FSAppAttempt app) { - if (!apps.contains(app)) { - apps.add(app); - numAppsAddedSoFar++; + if (!app.equals(appBeingProcessed) && !appsToProcess.contains(app)) { + appsToProcess.add(app); + totalAppsEverAdded++; } } + /** + * Blocking call to fetch the next app to process. The returned app is + * tracked until the next call to this method. This tracking assumes a + * single reader. + * + * @return starved application to process + * @throws InterruptedException + */ public FSAppAttempt take() throws InterruptedException { - return apps.take(); + // Reset appBeingProcessed before the blocking call + appBeingProcessed = null; + + // Blocking call to fetch the next starved application + FSAppAttempt app = appsToProcess.take(); + appBeingProcessed = app; + return app; } private static class StarvationComparator implements @@ -64,12 +89,12 @@ public int compare(FSAppAttempt app1, FSAppAttempt app2) { } @VisibleForTesting - public int getNumAppsAddedSoFar() { - return numAppsAddedSoFar; + public int totalAppsEverAdded() { + return totalAppsEverAdded; } @VisibleForTesting public int numStarvedApps() { - return apps.size(); + return appsToProcess.size(); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java new file mode 100644 index 0000000..0b5ba81 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; + +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileWriter; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.List; + +/** + * Test class to verify identification of app starvation + */ +public class TestFSAppStarvation extends FairSchedulerTestBase { + + private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues"); + private final List rmNodes = new ArrayList<>(); + + // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore) + private static final int NODE_CAPACITY_MULTIPLE = 4; + + @Before + public void setup() { + createConfiguration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, + ALLOC_FILE.getAbsolutePath()); + conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true); + conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f); + } + + @After + public void teardown() { + conf = null; + if (resourceManager != null) { + resourceManager.stop(); + resourceManager = null; + } + ALLOC_FILE.delete(); + } + + /** + * Test to verify application starvation is computed only when preemption + * is enabled. + * + * @throws Exception + */ + @Test + public void testPreemptionDisabled() throws Exception { + conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, false); + setupStarvedCluster(); + submitAppsToEachLeafQueue(); + sendNodeUpdateEvents(); + + assertNull("Found starved apps even when preemption is turned off", + scheduler.getContext().getStarvedApps()); + } + + /** + * Test to verify application starvation is computed correctly when + * preemption is turned on. + * @throws Exception + */ + @Test + public void testPreemptionEnabled() throws Exception { + setupStarvedCluster(); + submitAppsToEachLeafQueue(); + sendNodeUpdateEvents(); + + // Sleep to hit the preemption timeouts + Thread.sleep(10); + + // Scheduler update to populate starved apps + scheduler.update(); + + assertNotNull("FSContext does not have an FSStarvedApps instance", + scheduler.getContext().getStarvedApps()); + assertEquals("Expecting 2 starved applications, one each for the " + + "minshare and fairshare queues", 2, + scheduler.getContext().getStarvedApps().totalAppsEverAdded()); + } + + /** + * Test to verify app starvation is computed only when the cluster + * utilization threshold is over the preemption threshold. + * + * @throws Exception + */ + @Test + public void testClusterUtilizationThreshold() throws Exception { + // Set preemption threshold to 1.1, so the utilization is always lower + conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 1.1f); + + setupStarvedCluster(); + submitAppsToEachLeafQueue(); + sendNodeUpdateEvents(); + + // Sleep to hit the preemption timeouts + Thread.sleep(10); + + // Scheduler update to populate starved apps + scheduler.update(); + + assertNotNull("FSContext does not have an FSStarvedApps instance", + scheduler.getContext().getStarvedApps()); + assertEquals("Found starved apps when preemption threshold is over 100%", 0, + scheduler.getContext().getStarvedApps().totalAppsEverAdded()); + } + + /** + * Setup the cluster for starvation testing: + * 1. Create FS allocation file + * 2. Create and start MockRM + * 3. Add two nodes to the cluster + * 4. Submit an app that uses up all resources on the cluster + */ + private void setupStarvedCluster() throws Exception { + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + + // Default queue + out.println(""); + out.println(""); + + // Queue with preemption disabled + out.println(""); + out.println("0" + + ""); + out.println(""); + + // Queue with minshare preemption enabled + out.println(""); + out.println("0" + + ""); + out.println("0" + + ""); + out.println("2048mb,2vcores"); + out.println(""); + + // Queue with fairshare preemption enabled + out.println(""); + out.println("1" + + ""); + out.println("0" + + ""); + + // Child queue under fairshare with same settings + out.println(""); + out.println("1" + + ""); + out.println("0" + + ""); + out.println(""); + + out.println(""); + + out.println(""); + out.close(); + + assertTrue("Allocation file does not exist, not running the test", + ALLOC_FILE.exists()); + + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + + // Create and add two nodes to the cluster + addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE); + addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE); + + // Create an app that takes up all the resources on the cluster + ApplicationAttemptId app + = createSchedulingRequest(1024, 1, "root.default", "default", 8); + + scheduler.update(); + sendNodeUpdateEvents(); + + assertEquals(8, scheduler.getSchedulerApp(app).getLiveContainers().size()); + } + + private void submitAppsToEachLeafQueue() { + String queues[] = {"no-preemption", "minshare", "fairshare.child"}; + for (int i = 0; i < queues.length; i++) { + createSchedulingRequest(1024, 1, "root." + queues[i], "user", 1); + } + scheduler.update(); + } + + private void addNode(int memory, int cores) { + int id = rmNodes.size() + 1; + RMNode node = + MockNodes.newNodeInfo(1, Resources.createResource(memory, cores), id, + "127.0.0." + id); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + rmNodes.add(node); + } + + private void sendNodeUpdateEvents() { + for (RMNode node : rmNodes) { + NodeUpdateSchedulerEvent nodeUpdateSchedulerEvent = + new NodeUpdateSchedulerEvent(node); + for (int i = 0; i < NODE_CAPACITY_MULTIPLE; i++) { + scheduler.handle(nodeUpdateSchedulerEvent); + } + } + } +}