diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index da5e4c9347e..5a592137ef6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1207,11 +1207,19 @@ private void updateRootQueueMetrics() { */ private boolean shouldAttemptPreemption() { if (context.isPreemptionEnabled()) { - return (context.getPreemptionUtilizationThreshold() < Math.max( + float maxUtilizationOfCustomResources = rootMetrics + .getMaxUtilizationOfCustomResources(getClusterResource()); + + float maxUtilizationOfNormalResources = Math.max( (float) rootMetrics.getAllocatedMB() / getClusterResource().getMemorySize(), (float) rootMetrics.getAllocatedVirtualCores() / - getClusterResource().getVirtualCores())); + getClusterResource().getVirtualCores()); + + float utilization = Math.max(maxUtilizationOfNormalResources, + maxUtilizationOfCustomResources); + + return (context.getPreemptionUtilizationThreshold() < utilization); } return false; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 4f1f20b942b..46837a4c0c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; @@ -60,6 +61,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; public class FairSchedulerTestBase { public final static String TEST_DIR = @@ -357,4 +359,15 @@ protected void addNode(int memory, int cores) { scheduler.handle(new NodeAddedSchedulerEvent(node)); rmNodes.add(node); } + + protected void addNode(int memory, int cores, + Map customResources) { + int id = rmNodes.size() + 1; + RMNode node = MockNodes.newNodeInfo(1, + ResourceTypesTestHelper.newResource(memory, cores, customResources), + id, "127.0.0." + id); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + rmNodes.add(node); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index da6428a8b5a..7d7aee8acbf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -17,21 +17,25 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.apache.curator.shaded.com.google.common.base.Joiner; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.junit.After; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -46,6 +50,13 @@ import java.util.Collection; import java.util.List; +import static org.apache.hadoop.yarn.util.resource.ResourceUtils.UNITS; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeThat; + /** * Tests to verify fairshare and minshare preemption, using parameterization. */ @@ -60,8 +71,12 @@ // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore) private static final int NODE_CAPACITY_MULTIPLE = 4; + private static final String CUSTOM_RES_1 = "custom_res_1"; + private static final String CUSTOM_RES_2 = "custom_res_2"; + private final boolean fairsharePreemption; private final boolean drf; + private final boolean useCustomResources; // App that takes up the entire cluster private FSAppAttempt greedyApp; @@ -69,20 +84,22 @@ // Starving app that is expected to instigate preemption private FSAppAttempt starvingApp; - @Parameterized.Parameters(name = "{0}") + @Parameterized.Parameters(name = "{0} - {2}") public static Collection getParameters() { return Arrays.asList(new Object[][] { - {"MinSharePreemption", 0}, - {"MinSharePreemptionWithDRF", 1}, - {"FairSharePreemption", 2}, - {"FairSharePreemptionWithDRF", 3} + {"MinSharePreemption", 0, false}, + {"MinSharePreemptionWithDRF", 1, false}, + {"FairSharePreemption", 2, false}, + {"FairSharePreemptionWithDRF", 3, false}, + {"FairSharePreemptionWithDRF", 3, true}, }); } - public TestFairSchedulerPreemption(String name, int mode) - throws IOException { + public TestFairSchedulerPreemption(String name, int mode, + boolean useCustomResources) throws IOException { fairsharePreemption = (mode > 1); // 2 and 3 drf = (mode % 2 == 1); // 1 and 3 + this.useCustomResources = useCustomResources; writeAllocFile(); } @@ -94,9 +111,21 @@ public void setup() throws IOException { conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true); conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f); conf.setInt(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 0); + if (useCustomResources) { + registerCustomResource(CUSTOM_RES_1); + registerCustomResource(CUSTOM_RES_2); + String resourceNames = Joiner.on(',').join(CUSTOM_RES_1, CUSTOM_RES_2); + conf.set(YarnConfiguration.RESOURCE_TYPES, resourceNames); + ResourceUtils.resetResourceTypes(conf); + } setupCluster(); } + private void registerCustomResource(String name) { + conf.set(YarnConfiguration.RESOURCE_TYPES + "." + name + UNITS, + "G"); + } + @After public void teardown() { ALLOC_FILE.delete(); @@ -185,7 +214,12 @@ private void writePreemptionParams(PrintWriter out) { private void writeResourceParams(PrintWriter out) { if (!fairsharePreemption) { - out.println("4096mb,4vcores"); + if (useCustomResources) { + out.println("memory-mb=4096,vcores=4," + + "custom_res_1=20,custom_res_2=40"); + } else { + out.println("4096mb,4vcores"); + } } } @@ -200,8 +234,18 @@ private void setupCluster() throws IOException { // Create and add two nodes to the cluster, with capacities // disproportional to the container requests. - addNode(NODE_CAPACITY_MULTIPLE * GB, 3 * NODE_CAPACITY_MULTIPLE); - addNode(NODE_CAPACITY_MULTIPLE * GB, 3 * NODE_CAPACITY_MULTIPLE); + if (useCustomResources) { + ImmutableMap customResources = ImmutableMap + . builder().put(CUSTOM_RES_1, String.valueOf(40)) + .put(CUSTOM_RES_2, String.valueOf(80)).build(); + addNode(NODE_CAPACITY_MULTIPLE * GB, 3 * NODE_CAPACITY_MULTIPLE, + customResources); + addNode(NODE_CAPACITY_MULTIPLE * GB, 3 * NODE_CAPACITY_MULTIPLE, + customResources); + } else { + addNode(NODE_CAPACITY_MULTIPLE * GB, 3 * NODE_CAPACITY_MULTIPLE); + addNode(NODE_CAPACITY_MULTIPLE * GB, 3 * NODE_CAPACITY_MULTIPLE); + } // Reinitialize the scheduler so DRF policy picks up cluster capacity // TODO (YARN-6194): One shouldn't need to call this @@ -236,9 +280,38 @@ private void takeAllResources(String queueName) { ApplicationAttemptId appAttemptId = createSchedulingRequest(GB, 1, queueName, "default", NODE_CAPACITY_MULTIPLE * rmNodes.size()); + takeResourcesInternal(queueName, appAttemptId); + } + + private void takeAllResourcesOfCustomResource(String queueName, + String customRes) { + // Create an app that takes up all the resources of a custom resource on the + // cluster + int numContainers = NODE_CAPACITY_MULTIPLE * rmNodes.size(); + ResourceRequest resourceRequest = + createResourceRequest(0, 0, ResourceRequest.ANY, 1, numContainers, + true); + Resource capability = resourceRequest.getCapability(); + resourceRequest.setCapability( + ResourceTypesTestHelper.newResource(capability.getMemorySize(), + capability.getVirtualCores(), + ImmutableMap.builder() + .put(customRes, String.valueOf(10)) + .build())); + + ApplicationAttemptId appAttemptId = createSchedulingRequest( + Lists.newArrayList(resourceRequest), queueName, "default"); + takeResourcesInternal(queueName, appAttemptId); + } + + private void takeResourcesInternal(String queueName, + ApplicationAttemptId appAttemptId) { greedyApp = scheduler.getSchedulerApp(appAttemptId); scheduler.update(); sendEnoughNodeUpdatesToAssignFully(); + + //cluster has 8GB of memory / 24 vcores in total + //requested app has 1GB of memory / 1 vcores --> 8 containers assertEquals(8, greedyApp.getLiveContainers().size()); // Verify preemptable for queue and app attempt assertTrue( @@ -251,11 +324,8 @@ private void takeAllResources(String queueName) { * cluster. * * @param queueName queue name - * @throws InterruptedException - * if any thread has interrupted the current thread. */ - private void preemptHalfResources(String queueName) - throws InterruptedException { + private void preemptHalfResources(String queueName) { ApplicationAttemptId appAttemptId = createSchedulingRequest(2 * GB, 2, queueName, "default", NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2); @@ -266,6 +336,29 @@ private void preemptHalfResources(String queueName) scheduler.update(); } + private void preemptHalfResourcesCustomResource(String queueName, + String customRes) { + int numContainers = NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2; + + // cannot request 0 memory as it will lead to division by zero when + // calculating Resource ratios + ResourceRequest resourceRequest = createResourceRequest(1, 0, + ResourceRequest.ANY, 1, numContainers, true); + Resource capability = resourceRequest.getCapability(); + resourceRequest.setCapability(ResourceTypesTestHelper.newResource( + capability.getMemorySize(), capability.getVirtualCores(), + ImmutableMap. builder() + .put(customRes, String.valueOf(20)).build())); + + ApplicationAttemptId appAttemptId = createSchedulingRequest( + Lists.newArrayList(resourceRequest), queueName, "default"); + starvingApp = scheduler.getSchedulerApp(appAttemptId); + + // Move clock enough to identify starvation + clock.tickSec(1); + scheduler.update(); + } + /** * Submit application to {@code queue1} and take over the entire cluster. * Submit application with larger containers to {@code queue2} that @@ -275,11 +368,16 @@ private void preemptHalfResources(String queueName) * @param queue2 second queue * @throws InterruptedException if interrupted while waiting */ - private void submitApps(String queue1, String queue2) - throws InterruptedException { + private void submitApps(String queue1, String queue2) { takeAllResources(queue1); preemptHalfResources(queue2); } + + private void submitAppsAndTakeAllOfCustomResource(String queue1, + String queue2, String customRes) { + takeAllResourcesOfCustomResource(queue1, customRes); + preemptHalfResourcesCustomResource(queue2, customRes); + } private void verifyPreemption(int numStarvedAppContainers, int numGreedyAppContainers) @@ -350,6 +448,21 @@ public void testPreemptionWithinSameLeafQueue() throws Exception { } } + @Test + public void testPreemptionWithinSameLeafQueueCustomResource() + throws Exception { + assumeThat(useCustomResources, is(true)); + if (useCustomResources) { + String queue = "root.preemptable.child-1"; + submitAppsAndTakeAllOfCustomResource(queue, queue, CUSTOM_RES_1); + if (fairsharePreemption) { + verifyPreemption(2, 4); + } else { + verifyNoPreemption(); + } + } + } + @Test public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception { submitApps("root.preemptable.child-1", "root.preemptable.child-2");