diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java index 9aeb51cc2cc..5f205f63649 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java @@ -379,8 +379,15 @@ public float ratio(Resource a, Resource b) { for (int i = 0; i < maxLength; i++) { ResourceInformation aResourceInformation = a.getResourceInformation(i); ResourceInformation bResourceInformation = b.getResourceInformation(i); - float tmp = (float) aResourceInformation.getValue() - / (float) bResourceInformation.getValue(); + + final float tmp; + //avoid division by zero + if (bResourceInformation.getValue() != 0) { + tmp = (float) aResourceInformation.getValue() + / (float) bResourceInformation.getValue(); + } else { + tmp = 0; + } ratio = ratio > tmp ? ratio : tmp; } return ratio; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/PreemptionResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/PreemptionResourceCalculator.java new file mode 100644 index 00000000000..98512920181 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/PreemptionResourceCalculator.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.util.resource; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; + +/** + * A {@link ResourceCalculator} which uses the concept of + * dominant resource to compare multi-dimensional resources. + * The only difference is the ratio calculation, see the javadoc there. + */ +@Private +@Unstable +public class PreemptionResourceCalculator extends DefaultResourceCalculator { + + /** + * Computes the ratio of a to b, with respect to all resource types. + * Returns 0 if any resource of 'a' cannot fit at least once in 'b'. + * Example: + * Resource a: + * Resource b: + * Without the additional condition, this method would have returned 1, + * but it is wrong as memory of resource 'b' cannot fit in resource 'a'. + */ + @Override + public float ratio(Resource a, Resource b) { + float ratio = 0.0f; + int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + for (int i = 0; i < maxLength; i++) { + ResourceInformation aResourceInformation = a.getResourceInformation(i); + ResourceInformation bResourceInformation = b.getResourceInformation(i); + + float tmp; + //avoid division by zero + if (bResourceInformation.getValue() != 0) { + tmp = (float) aResourceInformation.getValue() + / (float) bResourceInformation.getValue(); + } else { + tmp = 0; + } + + if (aResourceInformation.getValue() > 0 && tmp < 1) { + return 0; + } + + ratio = ratio > tmp ? ratio : tmp; + } + return ratio; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index 48c2c364ae9..7e0db89b013 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -298,11 +298,18 @@ public static Resource subtract(Resource lhs, Resource rhs) { */ public static Resource subtractFromNonNegative(Resource lhs, Resource rhs) { subtractFrom(lhs, rhs); - if (lhs.getMemorySize() < 0) { - lhs.setMemorySize(0); - } - if (lhs.getVirtualCores() < 0) { - lhs.setVirtualCores(0); + + //make sure all custom resource have a value of zero at least + int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + for (int i = 0; i < maxLength; i++) { + try { + ResourceInformation lhsValue = lhs.getResourceInformation(i); + if (lhsValue.getValue() < 0) { + lhs.setResourceValue(i, 0); + } + } catch (ResourceNotFoundException ye) { + LOG.warn("Resource is missing:" + ye.getMessage()); + } } return lhs; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index da5e4c9347e..4e67da85d73 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.SchedulingRequest; @@ -94,6 +95,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.PreemptionResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; @@ -151,6 +153,9 @@ private static final ResourceCalculator RESOURCE_CALCULATOR = new DefaultResourceCalculator(); + + private static final ResourceCalculator PREEMPTION_RESOURCE_CALCULATOR = + new PreemptionResourceCalculator(); private static final ResourceCalculator DOMINANT_RESOURCE_CALCULATOR = new DominantResourceCalculator(); @@ -1184,6 +1189,9 @@ public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) { @Override public ResourceCalculator getResourceCalculator() { + if (this.conf.getPreemptionEnabled()) { + return PREEMPTION_RESOURCE_CALCULATOR; + } return RESOURCE_CALCULATOR; } @@ -1207,15 +1215,31 @@ private void updateRootQueueMetrics() { */ private boolean shouldAttemptPreemption() { if (context.isPreemptionEnabled()) { - return (context.getPreemptionUtilizationThreshold() < Math.max( - (float) rootMetrics.getAllocatedMB() / - getClusterResource().getMemorySize(), - (float) rootMetrics.getAllocatedVirtualCores() / - getClusterResource().getVirtualCores())); + final Resource clusterResource = getClusterResource(); + float utilization = + getMaxUtilizationOfAllocatedResources(clusterResource); + return (context.getPreemptionUtilizationThreshold() < utilization); } return false; } + @VisibleForTesting + float getMaxUtilizationOfAllocatedResources( + Resource clusterResource) { + final Resource allocated = rootMetrics.getAllocatedResources(); + float maxUtilization = 0; + for (ResourceInformation res : allocated.getResources()) { + long resUtilization = clusterResource.getResourceValue(res.getName()); + if (resUtilization != 0) { + float utilization = (float) res.getValue() / resUtilization; + if (utilization > maxUtilization) { + maxUtilization = utilization; + } + } + } + return maxUtilization; + } + @Override public QueueMetrics getRootQueueMetrics() { return rootMetrics; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerPreemptionTestContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerPreemptionTestContext.java new file mode 100644 index 00000000000..526c5c1c428 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerPreemptionTestContext.java @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.util.ControlledClock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper.extractCustomResourcesAsStrings; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Class that contains data and helper methods to group preemption related + * test functionality. + */ +public class FairSchedulerPreemptionTestContext extends FairSchedulerTestBase { + private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues"); + private static final Logger LOG = + LoggerFactory.getLogger(FairSchedulerPreemptionTestContext.class); + + // Scheduler clock + private final ControlledClock clock = new ControlledClock(); + + // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore) + static final int NODE_CAPACITY_MULTIPLE = 4; + static final int GB = 1024; + + private final boolean fairsharePreemption; + private final boolean drf; + + // App that takes up the entire cluster + private FSAppAttempt greedyApp; + + // Starving app that is expected to instigate preemption + private FSAppAttempt starvingApp; + private Resource totalClusterCapacity; + + FairSchedulerPreemptionTestContext(int mode) { + this.fairsharePreemption = (mode > 1); // 2 and 3 + this.drf = (mode % 2 == 1); // 1 and 3 + } + + void setup(String additionalQueueProperties, + Resource resourcePerNode) throws IOException { + this.conf = createConfig(); + writeAllocFile(additionalQueueProperties); + setupCluster(resourcePerNode); + computeTotalClusterCapacity(); + LOG.debug("Total cluster capacity: " + totalClusterCapacity); + } + + void setup(Configuration conf, String additionalQueueProperties, + Resource resourcePerNode) throws IOException { + this.conf = conf; + writeAllocFile(additionalQueueProperties); + setupCluster(resourcePerNode); + computeTotalClusterCapacity(); + LOG.debug("Total cluster capacity: " + totalClusterCapacity); + } + + Configuration createConfig() { + createConfiguration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, + ALLOC_FILE.getAbsolutePath()); + conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true); + conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f); + conf.setInt(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 0); + return conf; + } + + void computeTotalClusterCapacity() { + totalClusterCapacity = Resources.createResource(0, 0); + for (RMNode rmNode : rmNodes) { + Resources.addTo(totalClusterCapacity, rmNode.getTotalCapability()); + } + } + + void teardown() { + ALLOC_FILE.delete(); + conf = null; + if (resourceManager != null) { + resourceManager.stop(); + resourceManager = null; + } + } + + boolean isFairsharePreemption() { + return fairsharePreemption; + } + + private void writeAllocFile(String additionalQueueProperties) + throws IOException { + /* + * Queue hierarchy: + * root + * |--- preemptable + * |--- child-1 + * |--- child-2 + * |--- preemptable-sibling + * |--- nonpreemptible + * |--- child-1 + * |--- child-2 + */ + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + + out.println(""); + writePreemptionParams(out); + + // Child-1 + out.println(""); + writeResourceParams(out, additionalQueueProperties); + out.println(""); + + // Child-2 + out.println(""); + writeResourceParams(out, additionalQueueProperties); + out.println(""); + + out.println(""); // end of preemptable queue + + out.println(""); + writePreemptionParams(out); + out.println(""); + + // Queue with preemption disallowed + out.println(""); + out.println("false" + + ""); + writePreemptionParams(out); + + // Child-1 + out.println(""); + writeResourceParams(out, additionalQueueProperties); + out.println(""); + + // Child-2 + out.println(""); + writeResourceParams(out, additionalQueueProperties); + out.println(""); + + out.println(""); // end of nonpreemptable queue + + if (drf) { + out.println("drf" + + ""); + } + out.println(""); + out.close(); + + assertTrue("Allocation file does not exist, not running the test", + ALLOC_FILE.exists()); + } + + private void writePreemptionParams(PrintWriter out) { + if (fairsharePreemption) { + out.println("1" + + ""); + out.println("0" + + ""); + } else { + out.println("0" + + ""); + } + } + + private void writeResourceParams(PrintWriter out, String + additionalQueueProperties) { + if (additionalQueueProperties != null && + !additionalQueueProperties.isEmpty()) { + out.println(additionalQueueProperties); + } + } + + void setupCluster(Resource resourcePerNode) throws IOException { + rmNodes.clear(); + resourceManager = new MockRM(conf); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + // YARN-6249, FSLeafQueue#lastTimeAtMinShare is initialized to the time in + // the real world, so we should keep the clock up with it. + clock.setTime(SystemClock.getInstance().getTime()); + scheduler.setClock(clock); + resourceManager.start(); + + int memory = (int) resourcePerNode.getMemorySize(); + int vCores = resourcePerNode.getVirtualCores(); + if (resourcePerNode.getResources().length > 2) { + Map customResources = extractCustomResourcesAsStrings( + resourcePerNode); + addNode(memory, vCores, customResources); + addNode(memory, vCores, customResources); + } else { + addNode(memory, vCores); + addNode(memory, vCores); + } + + // Reinitialize the scheduler so DRF policy picks up cluster capacity + // TODO (YARN-6194): One shouldn't need to call this + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Verify if child-1 and child-2 are preemptable + FSQueue child1 = + scheduler.getQueueManager().getQueue("nonpreemptable.child-1"); + assertFalse(child1.isPreemptable()); + FSQueue child2 = + scheduler.getQueueManager().getQueue("nonpreemptable.child-2"); + assertFalse(child2.isPreemptable()); + } + + private void sendEnoughNodeUpdatesToAssignFully() { + for (RMNode node : rmNodes) { + NodeUpdateSchedulerEvent nodeUpdateSchedulerEvent = + new NodeUpdateSchedulerEvent(node); + for (int i = 0; i < NODE_CAPACITY_MULTIPLE; i++) { + scheduler.handle(nodeUpdateSchedulerEvent); + } + } + } + + /** + * Set the number of AM containers for each node. + * + * @param numAMContainersPerNode number of AM containers per node + */ + void setNumAMContainersPerNode(int numAMContainersPerNode) { + List potentialNodes = + scheduler.getNodeTracker().getNodesByResourceName("*"); + for (FSSchedulerNode node: potentialNodes) { + List containers= + node.getCopiedListOfRunningContainers(); + // Change the first numAMContainersPerNode out of 4 containers to + // AM containers + for (int i = 0; i < numAMContainersPerNode; i++) { + ((RMContainerImpl) containers.get(i)).setAMContainer(true); + } + } + } + + void setAllAMContainersOnNode(NodeId nodeId) { + SchedulerNode node = scheduler.getNodeTracker().getNode(nodeId); + for (RMContainer container: node.getCopiedListOfRunningContainers()) { + ((RMContainerImpl) container).setAMContainer(true); + } + } + + List getLiveContainersOfStarvingApp() { + return (ArrayList) starvingApp.getLiveContainers(); + } + + RMNode getRMNode(int i) { + return rmNodes.get(i); + } + + void setStarvingApp(ApplicationAttemptId starvedAppAttemptId) { + starvingApp = scheduler.getSchedulerApp(starvedAppAttemptId); + } + + void identifyStarvation() { + // Move clock enough to identify starvation + clock.tickSec(1); + scheduler.update(); + } + + void tryPreemptMoreThanFairShare(String queueName) + throws InterruptedException { + ApplicationAttemptId appAttemptId + = createSchedulingRequest(3 * GB, 3, queueName, "default", + NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2); + starvingApp = scheduler.getSchedulerApp(appAttemptId); + + verifyPreemption(1, 5); + } + + float getMaxUtilizationOfAllocatedResources() { + return scheduler + .getMaxUtilizationOfAllocatedResources(totalClusterCapacity); + } + + /** + * Submit an application to a given queue and take over the entire cluster. + * + * @param queueName queue name + */ + void takeAllResources(String queueName) { + // Create an app that takes up all the resources on the cluster + int numContainers = NODE_CAPACITY_MULTIPLE * rmNodes.size(); + ApplicationAttemptId appAttemptId = + createSchedulingRequest(GB, 1, queueName, "default", numContainers); + takeResourcesInternal(queueName, appAttemptId, numContainers); + } + + void takeResourcesInternal(String queueName, + ApplicationAttemptId appAttemptId, int numContainers) { + greedyApp = scheduler.getSchedulerApp(appAttemptId); + scheduler.update(); + sendEnoughNodeUpdatesToAssignFully(); + + //cluster has 8GB of memory / 24 vcores in total + //requested app has 1GB of memory / 1 vcores --> 8 containers + assertEquals(numContainers, greedyApp.getLiveContainers().size()); + // Verify preemptable for queue and app attempt + assertTrue( + scheduler.getQueueManager().getQueue(queueName).isPreemptable() + == greedyApp.isPreemptable()); + } + + /** + * Submit an application to a given queue and preempt half resources of the + * cluster. + * + * @param queueName queue name + */ + void preemptHalfResources(String queueName) { + int numContainers = NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2; + ApplicationAttemptId appAttemptId = createSchedulingRequest( + 2 * GB, 2, queueName, "default", numContainers); + starvingApp = scheduler.getSchedulerApp(appAttemptId); + identifyStarvation(); + } + + /** + * Submit application to {@code queue1} and take over the entire cluster. + * Submit application with larger containers to {@code queue2} that + * requires preemption from the first application. + * + * @param queue1 first queue + * @param queue2 second queue + * @throws InterruptedException if interrupted while waiting + */ + void submitApps(String queue1, String queue2) { + takeAllResources(queue1); + preemptHalfResources(queue2); + } + + void verifyPreemption(int numStarvedAppContainers, + int numGreedyAppContainers) + throws InterruptedException { + // Sleep long enough for four containers to be preempted. + for (int i = 0; i < 1000; i++) { + if (greedyApp.getLiveContainers().size() == numGreedyAppContainers) { + break; + } + Thread.sleep(10); + } + + // Post preemption, verify the greedyApp has the correct # of containers. + assertEquals("Incorrect # of containers on the greedy app", + numGreedyAppContainers, greedyApp.getLiveContainers().size()); + + // Verify the queue metrics are set appropriately. The greedyApp started + // with 8 1GB, 1vcore containers. + assertEquals("Incorrect # of preempted containers in QueueMetrics", + 8 - numGreedyAppContainers, + greedyApp.getQueue().getMetrics().getAggregatePreemptedContainers()); + + // Verify the node is reserved for the starvingApp + for (RMNode rmNode : rmNodes) { + FSSchedulerNode node = (FSSchedulerNode) + scheduler.getNodeTracker().getNode(rmNode.getNodeID()); + if (node.getContainersForPreemption().size() > 0) { + assertTrue("node should be reserved for the starvingApp", + node.getPreemptionList().keySet().contains(starvingApp)); + } + } + + sendEnoughNodeUpdatesToAssignFully(); + + // Verify the preempted containers are assigned to starvingApp + assertEquals("Starved app is not assigned the right # of containers", + numStarvedAppContainers, starvingApp.getLiveContainers().size()); + + // Verify the node is not reserved for the starvingApp anymore + for (RMNode rmNode : rmNodes) { + FSSchedulerNode node = (FSSchedulerNode) + scheduler.getNodeTracker().getNode(rmNode.getNodeID()); + if (node.getContainersForPreemption().size() > 0) { + assertFalse(node.getPreemptionList().keySet().contains(starvingApp)); + } + } + } + + void verifyNoPreemption() throws InterruptedException { + // Sleep long enough to ensure not even one container is preempted. + for (int i = 0; i < 100; i++) { + if (greedyApp.getLiveContainers().size() != 8) { + break; + } + Thread.sleep(10); + } + assertEquals(8, greedyApp.getLiveContainers().size()); + } + + public Resource getTotalClusterCapacity() { + return totalClusterCapacity; + } + + public int getSizeOfRMNodes() { + return rmNodes.size(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 4f1f20b942b..27edf371cc0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; @@ -60,6 +61,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; public class FairSchedulerTestBase { public final static String TEST_DIR = @@ -134,6 +136,22 @@ protected ResourceRequest createResourceRequest( return request; } + protected ResourceRequest createResourceRequest( + Resource capability, String host, int priority, int numContainers, + boolean relaxLocality) { + ResourceRequest request = recordFactory + .newRecordInstance(ResourceRequest.class); + request.setCapability(capability); + request.setResourceName(host); + request.setNumContainers(numContainers); + Priority prio = recordFactory.newRecordInstance(Priority.class); + prio.setPriority(priority); + request.setPriority(prio); + request.setRelaxLocality(relaxLocality); + request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL); + return request; + } + /** * Creates a single container priority-1 request and submits to * scheduler. @@ -158,6 +176,12 @@ protected ApplicationAttemptId createSchedulingRequest( return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1); } + protected ApplicationAttemptId createSchedulingRequest( + Resource capability, String queueId, String userId, int numContainers) { + return createSchedulingRequest(capability, queueId, userId, numContainers, + 1); + } + protected ApplicationAttemptId createSchedulingRequest( int memory, String queueId, String userId, int numContainers, int priority) { return createSchedulingRequest(memory, 1, queueId, userId, numContainers, @@ -173,6 +197,15 @@ protected ApplicationAttemptId createSchedulingRequest( userId); } + protected ApplicationAttemptId createSchedulingRequest( + Resource capability, String queueId, String userId, int numContainers, + int priority) { + ResourceRequest request = createResourceRequest(capability, + ResourceRequest.ANY, priority, numContainers, true); + return createSchedulingRequest(Lists.newArrayList(request), queueId, + userId); + } + protected ApplicationAttemptId createSchedulingRequest( Collection requests, String queueId, String userId) { ApplicationAttemptId id = @@ -357,4 +390,15 @@ protected void addNode(int memory, int cores) { scheduler.handle(new NodeAddedSchedulerEvent(node)); rmNodes.add(node); } + + protected void addNode(int memory, int cores, + Map customResources) { + int id = rmNodes.size() + 1; + RMNode node = MockNodes.newNodeInfo(1, + ResourceTypesTestHelper.newResource(memory, cores, customResources), + id, "127.0.0." + id); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + rmNodes.add(node); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index da6428a8b5a..2b5cca587a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -18,56 +18,37 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; -import org.apache.hadoop.yarn.util.ControlledClock; -import org.apache.hadoop.yarn.util.SystemClock; import org.junit.After; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; - -import java.io.File; -import java.io.FileWriter; import java.io.IOException; -import java.io.PrintWriter; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerPreemptionTestContext.GB; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair + .FairSchedulerPreemptionTestContext.NODE_CAPACITY_MULTIPLE; +import static org.junit.Assert.assertTrue; + /** * Tests to verify fairshare and minshare preemption, using parameterization. */ @RunWith(Parameterized.class) -public class TestFairSchedulerPreemption extends FairSchedulerTestBase { - private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues"); - private static final int GB = 1024; - - // Scheduler clock - private final ControlledClock clock = new ControlledClock(); - - // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore) - private static final int NODE_CAPACITY_MULTIPLE = 4; - - private final boolean fairsharePreemption; - private final boolean drf; +public class TestFairSchedulerPreemption { + private final FairSchedulerPreemptionTestContext ctx; - // App that takes up the entire cluster - private FSAppAttempt greedyApp; - - // Starving app that is expected to instigate preemption - private FSAppAttempt starvingApp; + private static final String MIN_RESOURCES = "4096mb,4vcores"; @Parameterized.Parameters(name = "{0}") public static Collection getParameters() { @@ -75,335 +56,83 @@ {"MinSharePreemption", 0}, {"MinSharePreemptionWithDRF", 1}, {"FairSharePreemption", 2}, - {"FairSharePreemptionWithDRF", 3} + {"FairSharePreemptionWithDRF", 3}, }); } - public TestFairSchedulerPreemption(String name, int mode) - throws IOException { - fairsharePreemption = (mode > 1); // 2 and 3 - drf = (mode % 2 == 1); // 1 and 3 - writeAllocFile(); + public TestFairSchedulerPreemption(String name, int mode) { + this.ctx = new FairSchedulerPreemptionTestContext(mode); } @Before public void setup() throws IOException { - createConfiguration(); - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, - ALLOC_FILE.getAbsolutePath()); - conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true); - conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f); - conf.setInt(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 0); - setupCluster(); - } - - @After - public void teardown() { - ALLOC_FILE.delete(); - conf = null; - if (resourceManager != null) { - resourceManager.stop(); - resourceManager = null; - } - } - - private void writeAllocFile() throws IOException { - /* - * Queue hierarchy: - * root - * |--- preemptable - * |--- child-1 - * |--- child-2 - * |--- preemptable-sibling - * |--- nonpreemptible - * |--- child-1 - * |--- child-2 - */ - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println(""); - out.println(""); - - out.println(""); - writePreemptionParams(out); - - // Child-1 - out.println(""); - writeResourceParams(out); - out.println(""); - - // Child-2 - out.println(""); - writeResourceParams(out); - out.println(""); - - out.println(""); // end of preemptable queue - - out.println(""); - writePreemptionParams(out); - out.println(""); - - // Queue with preemption disallowed - out.println(""); - out.println("false" + - ""); - writePreemptionParams(out); - - // Child-1 - out.println(""); - writeResourceParams(out); - out.println(""); - - // Child-2 - out.println(""); - writeResourceParams(out); - out.println(""); - - out.println(""); // end of nonpreemptable queue - - if (drf) { - out.println("drf" + - ""); - } - out.println(""); - out.close(); - - assertTrue("Allocation file does not exist, not running the test", - ALLOC_FILE.exists()); - } - - private void writePreemptionParams(PrintWriter out) { - if (fairsharePreemption) { - out.println("1" + - ""); - out.println("0" + - ""); - } else { - out.println("0" + - ""); - } - } - - private void writeResourceParams(PrintWriter out) { - if (!fairsharePreemption) { - out.println("4096mb,4vcores"); - } - } - - private void setupCluster() throws IOException { - resourceManager = new MockRM(conf); - scheduler = (FairScheduler) resourceManager.getResourceScheduler(); - // YARN-6249, FSLeafQueue#lastTimeAtMinShare is initialized to the time in - // the real world, so we should keep the clock up with it. - clock.setTime(SystemClock.getInstance().getTime()); - scheduler.setClock(clock); - resourceManager.start(); - // Create and add two nodes to the cluster, with capacities // disproportional to the container requests. - addNode(NODE_CAPACITY_MULTIPLE * GB, 3 * NODE_CAPACITY_MULTIPLE); - addNode(NODE_CAPACITY_MULTIPLE * GB, 3 * NODE_CAPACITY_MULTIPLE); - - // Reinitialize the scheduler so DRF policy picks up cluster capacity - // TODO (YARN-6194): One shouldn't need to call this - scheduler.reinitialize(conf, resourceManager.getRMContext()); - - // Verify if child-1 and child-2 are preemptable - FSQueue child1 = - scheduler.getQueueManager().getQueue("nonpreemptable.child-1"); - assertFalse(child1.isPreemptable()); - FSQueue child2 = - scheduler.getQueueManager().getQueue("nonpreemptable.child-2"); - assertFalse(child2.isPreemptable()); + Resource resourcePerNode = ResourceTypesTestHelper.newResource( + NODE_CAPACITY_MULTIPLE * GB, 3 * NODE_CAPACITY_MULTIPLE, + Collections.emptyMap()); + this.ctx.setup(additionalQueueProperties(), resourcePerNode); } - private void sendEnoughNodeUpdatesToAssignFully() { - for (RMNode node : rmNodes) { - NodeUpdateSchedulerEvent nodeUpdateSchedulerEvent = - new NodeUpdateSchedulerEvent(node); - for (int i = 0; i < NODE_CAPACITY_MULTIPLE; i++) { - scheduler.handle(nodeUpdateSchedulerEvent); - } + private String additionalQueueProperties() { + if (!ctx.isFairsharePreemption()) { + return MIN_RESOURCES; } + return ""; } - /** - * Submit an application to a given queue and take over the entire cluster. - * - * @param queueName queue name - */ - private void takeAllResources(String queueName) { - // Create an app that takes up all the resources on the cluster - ApplicationAttemptId appAttemptId - = createSchedulingRequest(GB, 1, queueName, "default", - NODE_CAPACITY_MULTIPLE * rmNodes.size()); - greedyApp = scheduler.getSchedulerApp(appAttemptId); - scheduler.update(); - sendEnoughNodeUpdatesToAssignFully(); - assertEquals(8, greedyApp.getLiveContainers().size()); - // Verify preemptable for queue and app attempt - assertTrue( - scheduler.getQueueManager().getQueue(queueName).isPreemptable() - == greedyApp.isPreemptable()); - } - - /** - * Submit an application to a given queue and preempt half resources of the - * cluster. - * - * @param queueName queue name - * @throws InterruptedException - * if any thread has interrupted the current thread. - */ - private void preemptHalfResources(String queueName) - throws InterruptedException { - ApplicationAttemptId appAttemptId - = createSchedulingRequest(2 * GB, 2, queueName, "default", - NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2); - starvingApp = scheduler.getSchedulerApp(appAttemptId); - - // Move clock enough to identify starvation - clock.tickSec(1); - scheduler.update(); - } - - /** - * Submit application to {@code queue1} and take over the entire cluster. - * Submit application with larger containers to {@code queue2} that - * requires preemption from the first application. - * - * @param queue1 first queue - * @param queue2 second queue - * @throws InterruptedException if interrupted while waiting - */ - private void submitApps(String queue1, String queue2) - throws InterruptedException { - takeAllResources(queue1); - preemptHalfResources(queue2); - } - - private void verifyPreemption(int numStarvedAppContainers, - int numGreedyAppContainers) - throws InterruptedException { - // Sleep long enough for four containers to be preempted. - for (int i = 0; i < 1000; i++) { - if (greedyApp.getLiveContainers().size() == numGreedyAppContainers) { - break; - } - Thread.sleep(10); - } - - // Post preemption, verify the greedyApp has the correct # of containers. - assertEquals("Incorrect # of containers on the greedy app", - numGreedyAppContainers, greedyApp.getLiveContainers().size()); - - // Verify the queue metrics are set appropriately. The greedyApp started - // with 8 1GB, 1vcore containers. - assertEquals("Incorrect # of preempted containers in QueueMetrics", - 8 - numGreedyAppContainers, - greedyApp.getQueue().getMetrics().getAggregatePreemptedContainers()); - - // Verify the node is reserved for the starvingApp - for (RMNode rmNode : rmNodes) { - FSSchedulerNode node = (FSSchedulerNode) - scheduler.getNodeTracker().getNode(rmNode.getNodeID()); - if (node.getContainersForPreemption().size() > 0) { - assertTrue("node should be reserved for the starvingApp", - node.getPreemptionList().keySet().contains(starvingApp)); - } - } - - sendEnoughNodeUpdatesToAssignFully(); - - // Verify the preempted containers are assigned to starvingApp - assertEquals("Starved app is not assigned the right # of containers", - numStarvedAppContainers, starvingApp.getLiveContainers().size()); - - // Verify the node is not reserved for the starvingApp anymore - for (RMNode rmNode : rmNodes) { - FSSchedulerNode node = (FSSchedulerNode) - scheduler.getNodeTracker().getNode(rmNode.getNodeID()); - if (node.getContainersForPreemption().size() > 0) { - assertFalse(node.getPreemptionList().keySet().contains(starvingApp)); - } - } - } - - private void verifyNoPreemption() throws InterruptedException { - // Sleep long enough to ensure not even one container is preempted. - for (int i = 0; i < 100; i++) { - if (greedyApp.getLiveContainers().size() != 8) { - break; - } - Thread.sleep(10); - } - assertEquals(8, greedyApp.getLiveContainers().size()); + @After + public void teardown() { + this.ctx.teardown(); } @Test public void testPreemptionWithinSameLeafQueue() throws Exception { String queue = "root.preemptable.child-1"; - submitApps(queue, queue); - if (fairsharePreemption) { - verifyPreemption(2, 4); + ctx.submitApps(queue, queue); + + if (ctx.isFairsharePreemption()) { + ctx.verifyPreemption(2, 4); } else { - verifyNoPreemption(); + ctx.verifyNoPreemption(); } } @Test public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception { - submitApps("root.preemptable.child-1", "root.preemptable.child-2"); - verifyPreemption(2, 4); + String queue1 = "root.preemptable.child-1"; + String queue2 = "root.preemptable.child-2"; + + ctx.submitApps(queue1, queue2); + ctx.verifyPreemption(2, 4); } @Test public void testPreemptionBetweenNonSiblingQueues() throws Exception { - submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1"); - verifyPreemption(2, 4); + String queue1 = "root.preemptable.child-1"; + String queue2 = "root.nonpreemptable.child-1"; + + ctx.submitApps(queue1, queue2); + ctx.verifyPreemption(2, 4); } @Test public void testNoPreemptionFromDisallowedQueue() throws Exception { - submitApps("root.nonpreemptable.child-1", "root.preemptable.child-1"); - verifyNoPreemption(); - } - - /** - * Set the number of AM containers for each node. - * - * @param numAMContainersPerNode number of AM containers per node - */ - private void setNumAMContainersPerNode(int numAMContainersPerNode) { - List potentialNodes = - scheduler.getNodeTracker().getNodesByResourceName("*"); - for (FSSchedulerNode node: potentialNodes) { - List containers= - node.getCopiedListOfRunningContainers(); - // Change the first numAMContainersPerNode out of 4 containers to - // AM containers - for (int i = 0; i < numAMContainersPerNode; i++) { - ((RMContainerImpl) containers.get(i)).setAMContainer(true); - } - } - } - - private void setAllAMContainersOnNode(NodeId nodeId) { - SchedulerNode node = scheduler.getNodeTracker().getNode(nodeId); - for (RMContainer container: node.getCopiedListOfRunningContainers()) { - ((RMContainerImpl) container).setAMContainer(true); - } + ctx.submitApps("root.nonpreemptable.child-1", "root.preemptable.child-1"); + ctx.verifyNoPreemption(); } @Test public void testPreemptionSelectNonAMContainer() throws Exception { - takeAllResources("root.preemptable.child-1"); - setNumAMContainersPerNode(2); - preemptHalfResources("root.preemptable.child-2"); + String queue1 = "root.preemptable.child-1"; + String queue2 = "root.preemptable.child-2"; + ctx.takeAllResources(queue1); + ctx.setNumAMContainersPerNode(2); + ctx.preemptHalfResources(queue2); - verifyPreemption(2, 4); + ctx.verifyPreemption(2, 4); - ArrayList containers = - (ArrayList) starvingApp.getLiveContainers(); + List containers = ctx.getLiveContainersOfStarvingApp(); String host0 = containers.get(0).getNodeId().getHost(); String host1 = containers.get(1).getNodeId().getHost(); // Each node provides two and only two non-AM containers to be preempted, so @@ -414,10 +143,12 @@ public void testPreemptionSelectNonAMContainer() throws Exception { @Test public void testRelaxLocalityToNotPreemptAM() throws Exception { - takeAllResources("root.preemptable.child-1"); - RMNode node1 = rmNodes.get(0); - setAllAMContainersOnNode(node1.getNodeID()); - SchedulerNode node = scheduler.getNodeTracker().getNode(node1.getNodeID()); + String queue1 = "root.preemptable.child-1"; + String queue2 = "root.preemptable.child-2"; + ctx.takeAllResources(queue1); + RMNode node1 = ctx.getRMNode(0); + ctx.setAllAMContainersOnNode(node1.getNodeID()); + SchedulerNode node = ctx.scheduler.getNodeTracker().getNode(node1.getNodeID()); ApplicationAttemptId greedyAppAttemptId = node.getCopiedListOfRunningContainers().get(0) .getApplicationAttemptId(); @@ -426,22 +157,19 @@ public void testRelaxLocalityToNotPreemptAM() throws Exception { // satisfied. This forces the RR that we consider for preemption to be the // NODE_LOCAL one. ResourceRequest nodeRequest = - createResourceRequest(GB, node1.getHostName(), 1, 4, true); + ctx.createResourceRequest(GB, node1.getHostName(), 1, 4, true); ResourceRequest rackRequest = - createResourceRequest(GB * 10, node1.getRackName(), 1, 1, true); + ctx.createResourceRequest(GB * 10, node1.getRackName(), 1, 1, true); ResourceRequest anyRequest = - createResourceRequest(GB * 10, ResourceRequest.ANY, 1, 1, true); + ctx.createResourceRequest(GB * 10, ResourceRequest.ANY, 1, 1, true); List resourceRequests = Arrays.asList(nodeRequest, rackRequest, anyRequest); - ApplicationAttemptId starvedAppAttemptId = createSchedulingRequest( - "root.preemptable.child-2", "default", resourceRequests); - starvingApp = scheduler.getSchedulerApp(starvedAppAttemptId); - - // Move clock enough to identify starvation - clock.tickSec(1); - scheduler.update(); + ApplicationAttemptId starvedAppAttemptId = ctx.createSchedulingRequest( + queue2, "default", resourceRequests); + ctx.setStarvingApp(starvedAppAttemptId); + ctx.identifyStarvation(); // Make sure 4 containers were preempted from the greedy app, but also that // none were preempted on our all-AM node, even though the NODE_LOCAL RR @@ -450,7 +178,7 @@ public void testRelaxLocalityToNotPreemptAM() throws Exception { // TODO (YARN-7655) The starved app should be allocated 4 containers. // It should be possible to modify the RRs such that this is true // after YARN-7903. - verifyPreemption(0, 4); + ctx.verifyPreemption(0, 4); for (RMContainer container : node.getCopiedListOfRunningContainers()) { assert (container.isAMContainer()); assert (container.getApplicationAttemptId().equals(greedyAppAttemptId)); @@ -459,37 +187,30 @@ public void testRelaxLocalityToNotPreemptAM() throws Exception { @Test public void testAppNotPreemptedBelowFairShare() throws Exception { - takeAllResources("root.preemptable.child-1"); - tryPreemptMoreThanFairShare("root.preemptable.child-2"); - } - - private void tryPreemptMoreThanFairShare(String queueName) - throws InterruptedException { - ApplicationAttemptId appAttemptId - = createSchedulingRequest(3 * GB, 3, queueName, "default", - NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2); - starvingApp = scheduler.getSchedulerApp(appAttemptId); - - verifyPreemption(1, 5); + ctx.takeAllResources("root.preemptable.child-1"); + ctx.tryPreemptMoreThanFairShare("root.preemptable.child-2"); } @Test public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare() throws InterruptedException { // Run this test only for fairshare preemption - if (!fairsharePreemption) { + if (!ctx.isFairsharePreemption()) { return; } + String queue1 = "root.preemptable.child-1"; + String queue2 = "root.preemptable-sibling"; + String queue3 = "root.preemptable.child-2"; // Let one of the child queues take over the entire cluster - takeAllResources("root.preemptable.child-1"); + ctx.takeAllResources(queue1); // Submit a job so half the resources go to parent's sibling - preemptHalfResources("root.preemptable-sibling"); - verifyPreemption(2, 4); + ctx.preemptHalfResources(queue2); + ctx.verifyPreemption(2, 4); // Submit a job to the child's sibling to force preemption from the child - preemptHalfResources("root.preemptable.child-2"); - verifyPreemption(1, 2); + ctx.preemptHalfResources(queue3); + ctx.verifyPreemption(1, 2); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemptionCustomResources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemptionCustomResources.java new file mode 100644 index 00000000000..92e86af5a15 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemptionCustomResources.java @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair + .FairSchedulerPreemptionTestContext.GB; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair + .FairSchedulerPreemptionTestContext.NODE_CAPACITY_MULTIPLE; +import static org.apache.hadoop.yarn.util.resource.ResourceUtils.UNITS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests to verify fairshare and minshare preemption with custom resources, + * using parameterization. + */ +@RunWith(Parameterized.class) +public class TestFairSchedulerPreemptionCustomResources { + private final FairSchedulerPreemptionTestContext ctx; + + private static final String CUSTOM_RES_1 = "custom_res_1"; + private static final String CUSTOM_RES_2 = "custom_res_2"; + private static final int CUSTOM_RES_1_BASE_VALUE = 10; + private static final int CUSTOM_RES_2_BASE_VALUE = 20; + private static final String MIN_RESOURCES = + "memory-mb=4096,vcores=4,custom_res_1=40"; + private static final Resource BASE_RESOURCE_FOR_CUSTOM_RES_REQUEST = + BuilderUtils.newResource(GB, 1); + private Resource resourcePerNode; + + @Parameterized.Parameters(name = "{0}") + public static Collection getParameters() { + return Arrays.asList(new Object[][] { + {"FairSharePreemptionWithDRF", 3}, + {"MinSharePreemptionWithDRF", 1}, + }); + } + + public TestFairSchedulerPreemptionCustomResources(String name, int mode) { + this.ctx = new FairSchedulerPreemptionTestContext(mode); + } + + @Before + public void setup() throws IOException { + Configuration conf = createConfigWithCustomResources(); + resourcePerNode = createResourcePerNode(); + this.ctx.setup(conf, additionalQueueProperties(), resourcePerNode); + } + + private Resource createResourcePerNode() { + ImmutableMap customResources = ImmutableMap + . builder() + .put(CUSTOM_RES_1, + String.valueOf(NODE_CAPACITY_MULTIPLE * CUSTOM_RES_1_BASE_VALUE)) + .put(CUSTOM_RES_2, + String.valueOf(NODE_CAPACITY_MULTIPLE * CUSTOM_RES_2_BASE_VALUE)) + .build(); + + //Ensure that memory is never a dominant resource in requests, + //this is why we increase the memory amount with times of 5 here + return ResourceTypesTestHelper.newResource( + NODE_CAPACITY_MULTIPLE * GB * 5, 3 * NODE_CAPACITY_MULTIPLE, + customResources); + } + + private String additionalQueueProperties() { + if (!ctx.isFairsharePreemption()) { + return MIN_RESOURCES; + } + return ""; + } + + private Configuration createConfigWithCustomResources() { + Configuration conf = ctx.createConfig(); + String customResourcesStr = registerCustomResources(conf); + conf.set(YarnConfiguration.RESOURCE_TYPES, customResourcesStr); + ResourceUtils.resetResourceTypes(conf); + return conf; + } + + private String registerCustomResources(Configuration conf) { + conf.set(YarnConfiguration.RESOURCE_TYPES + "." + CUSTOM_RES_1 + UNITS, + "G"); + conf.set(YarnConfiguration.RESOURCE_TYPES + "." + CUSTOM_RES_2 + UNITS, + "G"); + return Joiner.on(',').join(CUSTOM_RES_1, CUSTOM_RES_2); + } + + private void takeAllResourcesOfCustomResource(String queueName, String + customRes) { + takeAllResourcesOfCustomResource(queueName, customRes, + BASE_RESOURCE_FOR_CUSTOM_RES_REQUEST); + } + + private void takeAllResourcesOfCustomResource(String queueName, + String customRes, Resource baseResource) { + // Create an app that takes up all the resources of + // a custom resource on the cluster + int numContainers = NODE_CAPACITY_MULTIPLE * ctx.getSizeOfRMNodes(); + long totalValueOfCustomRes = + ctx.getTotalClusterCapacity().getResourceValue(customRes); + long customResAmount = totalValueOfCustomRes / numContainers; + + final Resource capability; + if (baseResource != null) { + capability = ResourceTypesTestHelper.newResource( + (int) baseResource.getMemorySize(), + baseResource.getVirtualCores(), + ImmutableMap.builder() + .put(customRes, String.valueOf(customResAmount)) + .build()); + } else { + capability = ResourceTypesTestHelper.newResource( + (int) BASE_RESOURCE_FOR_CUSTOM_RES_REQUEST.getMemorySize(), + BASE_RESOURCE_FOR_CUSTOM_RES_REQUEST.getVirtualCores(), + ImmutableMap.builder() + .put(customRes, String.valueOf(customResAmount)) + .build()); + } + + final ApplicationAttemptId appAttemptId = + ctx.createSchedulingRequest(capability, queueName, "default", + numContainers); + ctx.takeResourcesInternal(queueName, appAttemptId, numContainers); + } + + void tryPreemptMoreThanFairShareCustomResource(String queueName, + String customRes) + throws InterruptedException { + int numContainers = NODE_CAPACITY_MULTIPLE * ctx.getSizeOfRMNodes() / 2; + long customResAmount = CUSTOM_RES_1_BASE_VALUE * 3; + + Resource capability = ResourceTypesTestHelper.newResource( + (int) BASE_RESOURCE_FOR_CUSTOM_RES_REQUEST.getMemorySize(), + BASE_RESOURCE_FOR_CUSTOM_RES_REQUEST.getVirtualCores(), + ImmutableMap.builder() + .put(customRes, String.valueOf(customResAmount)) + .build()); + + final ApplicationAttemptId appAttemptId = + ctx.createSchedulingRequest(capability, queueName, "default", + numContainers); + ctx.setStarvingApp(appAttemptId); + ctx.verifyPreemption(1, 5); + } + + private void preemptHalfResourcesCustomResource(String queueName, String + customRes) { + int numContainers = NODE_CAPACITY_MULTIPLE * ctx.getSizeOfRMNodes() / 2; + long totalValueOfCustomRes = + ctx.getTotalClusterCapacity().getResourceValue(customRes); + long customResAmount = totalValueOfCustomRes / numContainers; + + Resource capability = ResourceTypesTestHelper.newResource( + 2 * GB, 2, + ImmutableMap.builder() + .put(customRes, String.valueOf(customResAmount)) + .build()); + + final ApplicationAttemptId appAttemptId = + ctx.createSchedulingRequest(capability, queueName, "default", + numContainers); + ctx.setStarvingApp(appAttemptId); + ctx.identifyStarvation(); + } + + private void submitAppsCustomResources(String queue1, String queue2, + String customRes) { + takeAllResourcesOfCustomResource(queue1, customRes, + BASE_RESOURCE_FOR_CUSTOM_RES_REQUEST); + preemptHalfResourcesCustomResource(queue2, customRes); + } + + @After + public void teardown() { + this.ctx.teardown(); + } + + @Test + public void testPreemptionWithinSameLeafQueue() throws Exception { + String queue = "root.preemptable.child-1"; + if (!ctx.isFairsharePreemption()) { + // In case of minshare preemption, make sure to request + // minimum share of the queue [4 * (1GB, 1vcore) --> 4GB, 4vcores] + // so that minshare starvation does not happen for basic resources + submitAppsCustomResources(queue, queue, CUSTOM_RES_1); + } else { + submitAppsCustomResources(queue, queue, CUSTOM_RES_1); + } + + if (ctx.isFairsharePreemption()) { + ctx.verifyPreemption(2, 4); + } else { + ctx.verifyNoPreemption(); + } + } + + @Test + public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception { + String queue1 = "root.preemptable.child-1"; + String queue2 = "root.preemptable.child-2"; + + submitAppsCustomResources(queue1, queue2, CUSTOM_RES_1); + ctx.verifyPreemption(2, 4); + } + + @Test + public void testPreemptionBetweenNonSiblingQueues() throws Exception { + String queue1 = "root.preemptable.child-1"; + String queue2 = "root.nonpreemptable.child-1"; + + submitAppsCustomResources(queue1, queue2, CUSTOM_RES_1); + ctx.verifyPreemption(2, 4); + } + + @Test + public void testNoPreemptionFromDisallowedQueue() throws Exception { + ctx.submitApps("root.nonpreemptable.child-1", "root.preemptable.child-1"); + ctx.verifyNoPreemption(); + } + + @Test + public void testPreemptionSelectNonAMContainer() throws Exception { + String queue1 = "root.preemptable.child-1"; + String queue2 = "root.preemptable.child-2"; + takeAllResourcesOfCustomResource(queue1, CUSTOM_RES_1); + ctx.setNumAMContainersPerNode(2); + preemptHalfResourcesCustomResource(queue2, CUSTOM_RES_1); + + ctx.verifyPreemption(2, 4); + + List containers = ctx.getLiveContainersOfStarvingApp(); + String host0 = containers.get(0).getNodeId().getHost(); + String host1 = containers.get(1).getNodeId().getHost(); + // Each node provides two and only two non-AM containers to be preempted, so + // the preemption happens on both nodes. + assertTrue("Preempted containers should come from two different " + + "nodes.", !host0.equals(host1)); + } + + @Test + public void testRelaxLocalityToNotPreemptAM() throws Exception { + String queue1 = "root.preemptable.child-1"; + String queue2 = "root.preemptable.child-2"; + takeAllResourcesOfCustomResource(queue1, CUSTOM_RES_1); + RMNode node1 = ctx.getRMNode(0); + ctx.setAllAMContainersOnNode(node1.getNodeID()); + SchedulerNode node = ctx.scheduler.getNodeTracker() + .getNode(node1.getNodeID()); + ApplicationAttemptId greedyAppAttemptId = + node.getCopiedListOfRunningContainers().get(0) + .getApplicationAttemptId(); + + // Make the RACK_LOCAL and OFF_SWITCH requests big enough that they can't be + // satisfied. This forces the RR that we consider for preemption to be the + // NODE_LOCAL one. + Resource normalResource = ResourceTypesTestHelper.newResource( + (int) BASE_RESOURCE_FOR_CUSTOM_RES_REQUEST.getMemorySize(), + BASE_RESOURCE_FOR_CUSTOM_RES_REQUEST.getVirtualCores(), + ImmutableMap.builder() + .put(CUSTOM_RES_1, String.valueOf(CUSTOM_RES_1_BASE_VALUE)) + .build()); + Resource biggerResource = ResourceTypesTestHelper.newResource( + (int) BASE_RESOURCE_FOR_CUSTOM_RES_REQUEST.getMemorySize(), + BASE_RESOURCE_FOR_CUSTOM_RES_REQUEST.getVirtualCores(), + ImmutableMap.builder() + .put(CUSTOM_RES_1, String.valueOf(CUSTOM_RES_1_BASE_VALUE * 10)) + .build()); + ResourceRequest nodeRequest = + ctx.createResourceRequest(normalResource, node1.getHostName(), 1, 4, + true); + ResourceRequest rackRequest = + ctx.createResourceRequest(biggerResource, node1.getRackName(), 1, 1, + true); + ResourceRequest anyRequest = + ctx.createResourceRequest(biggerResource, ResourceRequest.ANY, 1, 1, + true); + + List resourceRequests = + Arrays.asList(nodeRequest, rackRequest, anyRequest); + + ApplicationAttemptId starvedAppAttemptId = ctx.createSchedulingRequest( + queue2, "default", resourceRequests); + ctx.setStarvingApp(starvedAppAttemptId); + ctx.identifyStarvation(); + + // Make sure 4 containers were preempted from the greedy app, but also that + // none were preempted on our all-AM node, even though the NODE_LOCAL RR + // asked for resources on it. + + // TODO (YARN-7655) The starved app should be allocated 4 containers. + // It should be possible to modify the RRs such that this is true + // after YARN-7903. + ctx.verifyPreemption(0, 4); + for (RMContainer container : node.getCopiedListOfRunningContainers()) { + assert (container.isAMContainer()); + assert (container.getApplicationAttemptId().equals(greedyAppAttemptId)); + } + } + + @Test + public void testAppNotPreemptedBelowFairShare() throws Exception { + takeAllResourcesOfCustomResource("root.preemptable.child-1", CUSTOM_RES_1); + tryPreemptMoreThanFairShareCustomResource("root.preemptable.child-2", + CUSTOM_RES_1); + } + + @Test + public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare() + throws InterruptedException { + // Run this test only for fairshare preemption + if (!ctx.isFairsharePreemption()) { + return; + } + + String queue1 = "root.preemptable.child-1"; + String queue2 = "root.preemptable-sibling"; + String queue3 = "root.preemptable.child-2"; + takeAllResourcesOfCustomResource(queue1, CUSTOM_RES_1); + preemptHalfResourcesCustomResource(queue2, CUSTOM_RES_1); + ctx.verifyPreemption(2, 4); + preemptHalfResourcesCustomResource(queue3, CUSTOM_RES_1); + ctx.verifyPreemption(1, 2); + } + + @Test + public void + testGetMaxUtilizationOfAllocatedResourcesWithoutAllocatedResource() { + assertEquals(0, + ctx.getMaxUtilizationOfAllocatedResources(), 0.0); + } + + @Test + public void testGetMaxUtilizationOfAllocatedResourcesWithAllocatedResource() + throws IOException { + createConfigWithCustomResources(); + ctx.conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 100); + ctx.setupCluster(resourcePerNode); + ctx.computeTotalClusterCapacity(); + + takeAllResourcesOfCustomResource("root", CUSTOM_RES_1, + BuilderUtils.newResource(400, 0)); + assertEquals(1, + ctx.getMaxUtilizationOfAllocatedResources(), 0.0); + } +}