diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
index 9aeb51cc2cc..5f205f63649 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
@@ -379,8 +379,15 @@ public float ratio(Resource a, Resource b) {
for (int i = 0; i < maxLength; i++) {
ResourceInformation aResourceInformation = a.getResourceInformation(i);
ResourceInformation bResourceInformation = b.getResourceInformation(i);
- float tmp = (float) aResourceInformation.getValue()
- / (float) bResourceInformation.getValue();
+
+ final float tmp;
+ //avoid division by zero
+ if (bResourceInformation.getValue() != 0) {
+ tmp = (float) aResourceInformation.getValue()
+ / (float) bResourceInformation.getValue();
+ } else {
+ tmp = 0;
+ }
ratio = ratio > tmp ? ratio : tmp;
}
return ratio;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/PreemptionResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/PreemptionResourceCalculator.java
new file mode 100644
index 00000000000..98512920181
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/PreemptionResourceCalculator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.util.resource;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+
+/**
+ * A {@link ResourceCalculator} which uses the concept of
+ * dominant resource to compare multi-dimensional resources.
+ * The only difference is the ratio calculation, see the javadoc there.
+ */
+@Private
+@Unstable
+public class PreemptionResourceCalculator extends DefaultResourceCalculator {
+
+ /**
+ * Computes the ratio of a to b, with respect to all resource types.
+ * Returns 0 if any resource of 'a' cannot fit at least once in 'b'.
+ * Example:
+ * Resource a:
+ * Resource b:
+ * Without the additional condition, this method would have returned 1,
+ * but it is wrong as memory of resource 'b' cannot fit in resource 'a'.
+ */
+ @Override
+ public float ratio(Resource a, Resource b) {
+ float ratio = 0.0f;
+ int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+ for (int i = 0; i < maxLength; i++) {
+ ResourceInformation aResourceInformation = a.getResourceInformation(i);
+ ResourceInformation bResourceInformation = b.getResourceInformation(i);
+
+ float tmp;
+ //avoid division by zero
+ if (bResourceInformation.getValue() != 0) {
+ tmp = (float) aResourceInformation.getValue()
+ / (float) bResourceInformation.getValue();
+ } else {
+ tmp = 0;
+ }
+
+ if (aResourceInformation.getValue() > 0 && tmp < 1) {
+ return 0;
+ }
+
+ ratio = ratio > tmp ? ratio : tmp;
+ }
+ return ratio;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
index 48c2c364ae9..7e0db89b013 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
@@ -298,11 +298,18 @@ public static Resource subtract(Resource lhs, Resource rhs) {
*/
public static Resource subtractFromNonNegative(Resource lhs, Resource rhs) {
subtractFrom(lhs, rhs);
- if (lhs.getMemorySize() < 0) {
- lhs.setMemorySize(0);
- }
- if (lhs.getVirtualCores() < 0) {
- lhs.setVirtualCores(0);
+
+ //make sure all custom resource have a value of zero at least
+ int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+ for (int i = 0; i < maxLength; i++) {
+ try {
+ ResourceInformation lhsValue = lhs.getResourceInformation(i);
+ if (lhsValue.getValue() < 0) {
+ lhs.setResourceValue(i, 0);
+ }
+ } catch (ResourceNotFoundException ye) {
+ LOG.warn("Resource is missing:" + ye.getMessage());
+ }
}
return lhs;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index da5e4c9347e..4e67da85d73 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -40,6 +40,7 @@
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
@@ -94,6 +95,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.PreemptionResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -151,6 +153,9 @@
private static final ResourceCalculator RESOURCE_CALCULATOR =
new DefaultResourceCalculator();
+
+ private static final ResourceCalculator PREEMPTION_RESOURCE_CALCULATOR =
+ new PreemptionResourceCalculator();
private static final ResourceCalculator DOMINANT_RESOURCE_CALCULATOR =
new DominantResourceCalculator();
@@ -1184,6 +1189,9 @@ public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) {
@Override
public ResourceCalculator getResourceCalculator() {
+ if (this.conf.getPreemptionEnabled()) {
+ return PREEMPTION_RESOURCE_CALCULATOR;
+ }
return RESOURCE_CALCULATOR;
}
@@ -1207,15 +1215,31 @@ private void updateRootQueueMetrics() {
*/
private boolean shouldAttemptPreemption() {
if (context.isPreemptionEnabled()) {
- return (context.getPreemptionUtilizationThreshold() < Math.max(
- (float) rootMetrics.getAllocatedMB() /
- getClusterResource().getMemorySize(),
- (float) rootMetrics.getAllocatedVirtualCores() /
- getClusterResource().getVirtualCores()));
+ final Resource clusterResource = getClusterResource();
+ float utilization =
+ getMaxUtilizationOfAllocatedResources(clusterResource);
+ return (context.getPreemptionUtilizationThreshold() < utilization);
}
return false;
}
+ @VisibleForTesting
+ float getMaxUtilizationOfAllocatedResources(
+ Resource clusterResource) {
+ final Resource allocated = rootMetrics.getAllocatedResources();
+ float maxUtilization = 0;
+ for (ResourceInformation res : allocated.getResources()) {
+ long resUtilization = clusterResource.getResourceValue(res.getName());
+ if (resUtilization != 0) {
+ float utilization = (float) res.getValue() / resUtilization;
+ if (utilization > maxUtilization) {
+ maxUtilization = utilization;
+ }
+ }
+ }
+ return maxUtilization;
+ }
+
@Override
public QueueMetrics getRootQueueMetrics() {
return rootMetrics;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerPreemptionTestContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerPreemptionTestContext.java
new file mode 100644
index 00000000000..526c5c1c428
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerPreemptionTestContext.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.util.ControlledClock;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper.extractCustomResourcesAsStrings;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Class that contains data and helper methods to group preemption related
+ * test functionality.
+ */
+public class FairSchedulerPreemptionTestContext extends FairSchedulerTestBase {
+ private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues");
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FairSchedulerPreemptionTestContext.class);
+
+ // Scheduler clock
+ private final ControlledClock clock = new ControlledClock();
+
+ // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
+ static final int NODE_CAPACITY_MULTIPLE = 4;
+ static final int GB = 1024;
+
+ private final boolean fairsharePreemption;
+ private final boolean drf;
+
+ // App that takes up the entire cluster
+ private FSAppAttempt greedyApp;
+
+ // Starving app that is expected to instigate preemption
+ private FSAppAttempt starvingApp;
+ private Resource totalClusterCapacity;
+
+ FairSchedulerPreemptionTestContext(int mode) {
+ this.fairsharePreemption = (mode > 1); // 2 and 3
+ this.drf = (mode % 2 == 1); // 1 and 3
+ }
+
+ void setup(String additionalQueueProperties,
+ Resource resourcePerNode) throws IOException {
+ this.conf = createConfig();
+ writeAllocFile(additionalQueueProperties);
+ setupCluster(resourcePerNode);
+ computeTotalClusterCapacity();
+ LOG.debug("Total cluster capacity: " + totalClusterCapacity);
+ }
+
+ void setup(Configuration conf, String additionalQueueProperties,
+ Resource resourcePerNode) throws IOException {
+ this.conf = conf;
+ writeAllocFile(additionalQueueProperties);
+ setupCluster(resourcePerNode);
+ computeTotalClusterCapacity();
+ LOG.debug("Total cluster capacity: " + totalClusterCapacity);
+ }
+
+ Configuration createConfig() {
+ createConfiguration();
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
+ ALLOC_FILE.getAbsolutePath());
+ conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
+ conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
+ conf.setInt(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 0);
+ return conf;
+ }
+
+ void computeTotalClusterCapacity() {
+ totalClusterCapacity = Resources.createResource(0, 0);
+ for (RMNode rmNode : rmNodes) {
+ Resources.addTo(totalClusterCapacity, rmNode.getTotalCapability());
+ }
+ }
+
+ void teardown() {
+ ALLOC_FILE.delete();
+ conf = null;
+ if (resourceManager != null) {
+ resourceManager.stop();
+ resourceManager = null;
+ }
+ }
+
+ boolean isFairsharePreemption() {
+ return fairsharePreemption;
+ }
+
+ private void writeAllocFile(String additionalQueueProperties)
+ throws IOException {
+ /*
+ * Queue hierarchy:
+ * root
+ * |--- preemptable
+ * |--- child-1
+ * |--- child-2
+ * |--- preemptable-sibling
+ * |--- nonpreemptible
+ * |--- child-1
+ * |--- child-2
+ */
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("");
+ out.println("");
+
+ out.println("");
+ writePreemptionParams(out);
+
+ // Child-1
+ out.println("");
+ writeResourceParams(out, additionalQueueProperties);
+ out.println("");
+
+ // Child-2
+ out.println("");
+ writeResourceParams(out, additionalQueueProperties);
+ out.println("");
+
+ out.println(""); // end of preemptable queue
+
+ out.println("");
+ writePreemptionParams(out);
+ out.println("");
+
+ // Queue with preemption disallowed
+ out.println("");
+ out.println("false" +
+ "");
+ writePreemptionParams(out);
+
+ // Child-1
+ out.println("");
+ writeResourceParams(out, additionalQueueProperties);
+ out.println("");
+
+ // Child-2
+ out.println("");
+ writeResourceParams(out, additionalQueueProperties);
+ out.println("");
+
+ out.println(""); // end of nonpreemptable queue
+
+ if (drf) {
+ out.println("drf" +
+ "");
+ }
+ out.println("");
+ out.close();
+
+ assertTrue("Allocation file does not exist, not running the test",
+ ALLOC_FILE.exists());
+ }
+
+ private void writePreemptionParams(PrintWriter out) {
+ if (fairsharePreemption) {
+ out.println("1" +
+ "");
+ out.println("0" +
+ "");
+ } else {
+ out.println("0" +
+ "");
+ }
+ }
+
+ private void writeResourceParams(PrintWriter out, String
+ additionalQueueProperties) {
+ if (additionalQueueProperties != null &&
+ !additionalQueueProperties.isEmpty()) {
+ out.println(additionalQueueProperties);
+ }
+ }
+
+ void setupCluster(Resource resourcePerNode) throws IOException {
+ rmNodes.clear();
+ resourceManager = new MockRM(conf);
+ scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+ // YARN-6249, FSLeafQueue#lastTimeAtMinShare is initialized to the time in
+ // the real world, so we should keep the clock up with it.
+ clock.setTime(SystemClock.getInstance().getTime());
+ scheduler.setClock(clock);
+ resourceManager.start();
+
+ int memory = (int) resourcePerNode.getMemorySize();
+ int vCores = resourcePerNode.getVirtualCores();
+ if (resourcePerNode.getResources().length > 2) {
+ Map customResources = extractCustomResourcesAsStrings(
+ resourcePerNode);
+ addNode(memory, vCores, customResources);
+ addNode(memory, vCores, customResources);
+ } else {
+ addNode(memory, vCores);
+ addNode(memory, vCores);
+ }
+
+ // Reinitialize the scheduler so DRF policy picks up cluster capacity
+ // TODO (YARN-6194): One shouldn't need to call this
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Verify if child-1 and child-2 are preemptable
+ FSQueue child1 =
+ scheduler.getQueueManager().getQueue("nonpreemptable.child-1");
+ assertFalse(child1.isPreemptable());
+ FSQueue child2 =
+ scheduler.getQueueManager().getQueue("nonpreemptable.child-2");
+ assertFalse(child2.isPreemptable());
+ }
+
+ private void sendEnoughNodeUpdatesToAssignFully() {
+ for (RMNode node : rmNodes) {
+ NodeUpdateSchedulerEvent nodeUpdateSchedulerEvent =
+ new NodeUpdateSchedulerEvent(node);
+ for (int i = 0; i < NODE_CAPACITY_MULTIPLE; i++) {
+ scheduler.handle(nodeUpdateSchedulerEvent);
+ }
+ }
+ }
+
+ /**
+ * Set the number of AM containers for each node.
+ *
+ * @param numAMContainersPerNode number of AM containers per node
+ */
+ void setNumAMContainersPerNode(int numAMContainersPerNode) {
+ List potentialNodes =
+ scheduler.getNodeTracker().getNodesByResourceName("*");
+ for (FSSchedulerNode node: potentialNodes) {
+ List containers=
+ node.getCopiedListOfRunningContainers();
+ // Change the first numAMContainersPerNode out of 4 containers to
+ // AM containers
+ for (int i = 0; i < numAMContainersPerNode; i++) {
+ ((RMContainerImpl) containers.get(i)).setAMContainer(true);
+ }
+ }
+ }
+
+ void setAllAMContainersOnNode(NodeId nodeId) {
+ SchedulerNode node = scheduler.getNodeTracker().getNode(nodeId);
+ for (RMContainer container: node.getCopiedListOfRunningContainers()) {
+ ((RMContainerImpl) container).setAMContainer(true);
+ }
+ }
+
+ List getLiveContainersOfStarvingApp() {
+ return (ArrayList) starvingApp.getLiveContainers();
+ }
+
+ RMNode getRMNode(int i) {
+ return rmNodes.get(i);
+ }
+
+ void setStarvingApp(ApplicationAttemptId starvedAppAttemptId) {
+ starvingApp = scheduler.getSchedulerApp(starvedAppAttemptId);
+ }
+
+ void identifyStarvation() {
+ // Move clock enough to identify starvation
+ clock.tickSec(1);
+ scheduler.update();
+ }
+
+ void tryPreemptMoreThanFairShare(String queueName)
+ throws InterruptedException {
+ ApplicationAttemptId appAttemptId
+ = createSchedulingRequest(3 * GB, 3, queueName, "default",
+ NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2);
+ starvingApp = scheduler.getSchedulerApp(appAttemptId);
+
+ verifyPreemption(1, 5);
+ }
+
+ float getMaxUtilizationOfAllocatedResources() {
+ return scheduler
+ .getMaxUtilizationOfAllocatedResources(totalClusterCapacity);
+ }
+
+ /**
+ * Submit an application to a given queue and take over the entire cluster.
+ *
+ * @param queueName queue name
+ */
+ void takeAllResources(String queueName) {
+ // Create an app that takes up all the resources on the cluster
+ int numContainers = NODE_CAPACITY_MULTIPLE * rmNodes.size();
+ ApplicationAttemptId appAttemptId =
+ createSchedulingRequest(GB, 1, queueName, "default", numContainers);
+ takeResourcesInternal(queueName, appAttemptId, numContainers);
+ }
+
+ void takeResourcesInternal(String queueName,
+ ApplicationAttemptId appAttemptId, int numContainers) {
+ greedyApp = scheduler.getSchedulerApp(appAttemptId);
+ scheduler.update();
+ sendEnoughNodeUpdatesToAssignFully();
+
+ //cluster has 8GB of memory / 24 vcores in total
+ //requested app has 1GB of memory / 1 vcores --> 8 containers
+ assertEquals(numContainers, greedyApp.getLiveContainers().size());
+ // Verify preemptable for queue and app attempt
+ assertTrue(
+ scheduler.getQueueManager().getQueue(queueName).isPreemptable()
+ == greedyApp.isPreemptable());
+ }
+
+ /**
+ * Submit an application to a given queue and preempt half resources of the
+ * cluster.
+ *
+ * @param queueName queue name
+ */
+ void preemptHalfResources(String queueName) {
+ int numContainers = NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2;
+ ApplicationAttemptId appAttemptId = createSchedulingRequest(
+ 2 * GB, 2, queueName, "default", numContainers);
+ starvingApp = scheduler.getSchedulerApp(appAttemptId);
+ identifyStarvation();
+ }
+
+ /**
+ * Submit application to {@code queue1} and take over the entire cluster.
+ * Submit application with larger containers to {@code queue2} that
+ * requires preemption from the first application.
+ *
+ * @param queue1 first queue
+ * @param queue2 second queue
+ * @throws InterruptedException if interrupted while waiting
+ */
+ void submitApps(String queue1, String queue2) {
+ takeAllResources(queue1);
+ preemptHalfResources(queue2);
+ }
+
+ void verifyPreemption(int numStarvedAppContainers,
+ int numGreedyAppContainers)
+ throws InterruptedException {
+ // Sleep long enough for four containers to be preempted.
+ for (int i = 0; i < 1000; i++) {
+ if (greedyApp.getLiveContainers().size() == numGreedyAppContainers) {
+ break;
+ }
+ Thread.sleep(10);
+ }
+
+ // Post preemption, verify the greedyApp has the correct # of containers.
+ assertEquals("Incorrect # of containers on the greedy app",
+ numGreedyAppContainers, greedyApp.getLiveContainers().size());
+
+ // Verify the queue metrics are set appropriately. The greedyApp started
+ // with 8 1GB, 1vcore containers.
+ assertEquals("Incorrect # of preempted containers in QueueMetrics",
+ 8 - numGreedyAppContainers,
+ greedyApp.getQueue().getMetrics().getAggregatePreemptedContainers());
+
+ // Verify the node is reserved for the starvingApp
+ for (RMNode rmNode : rmNodes) {
+ FSSchedulerNode node = (FSSchedulerNode)
+ scheduler.getNodeTracker().getNode(rmNode.getNodeID());
+ if (node.getContainersForPreemption().size() > 0) {
+ assertTrue("node should be reserved for the starvingApp",
+ node.getPreemptionList().keySet().contains(starvingApp));
+ }
+ }
+
+ sendEnoughNodeUpdatesToAssignFully();
+
+ // Verify the preempted containers are assigned to starvingApp
+ assertEquals("Starved app is not assigned the right # of containers",
+ numStarvedAppContainers, starvingApp.getLiveContainers().size());
+
+ // Verify the node is not reserved for the starvingApp anymore
+ for (RMNode rmNode : rmNodes) {
+ FSSchedulerNode node = (FSSchedulerNode)
+ scheduler.getNodeTracker().getNode(rmNode.getNodeID());
+ if (node.getContainersForPreemption().size() > 0) {
+ assertFalse(node.getPreemptionList().keySet().contains(starvingApp));
+ }
+ }
+ }
+
+ void verifyNoPreemption() throws InterruptedException {
+ // Sleep long enough to ensure not even one container is preempted.
+ for (int i = 0; i < 100; i++) {
+ if (greedyApp.getLiveContainers().size() != 8) {
+ break;
+ }
+ Thread.sleep(10);
+ }
+ assertEquals(8, greedyApp.getLiveContainers().size());
+ }
+
+ public Resource getTotalClusterCapacity() {
+ return totalClusterCapacity;
+ }
+
+ public int getSizeOfRMNodes() {
+ return rmNodes.size();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
index 4f1f20b942b..27edf371cc0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
@@ -31,6 +31,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
@@ -60,6 +61,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
public class FairSchedulerTestBase {
public final static String TEST_DIR =
@@ -134,6 +136,22 @@ protected ResourceRequest createResourceRequest(
return request;
}
+ protected ResourceRequest createResourceRequest(
+ Resource capability, String host, int priority, int numContainers,
+ boolean relaxLocality) {
+ ResourceRequest request = recordFactory
+ .newRecordInstance(ResourceRequest.class);
+ request.setCapability(capability);
+ request.setResourceName(host);
+ request.setNumContainers(numContainers);
+ Priority prio = recordFactory.newRecordInstance(Priority.class);
+ prio.setPriority(priority);
+ request.setPriority(prio);
+ request.setRelaxLocality(relaxLocality);
+ request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
+ return request;
+ }
+
/**
* Creates a single container priority-1 request and submits to
* scheduler.
@@ -158,6 +176,12 @@ protected ApplicationAttemptId createSchedulingRequest(
return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1);
}
+ protected ApplicationAttemptId createSchedulingRequest(
+ Resource capability, String queueId, String userId, int numContainers) {
+ return createSchedulingRequest(capability, queueId, userId, numContainers,
+ 1);
+ }
+
protected ApplicationAttemptId createSchedulingRequest(
int memory, String queueId, String userId, int numContainers, int priority) {
return createSchedulingRequest(memory, 1, queueId, userId, numContainers,
@@ -173,6 +197,15 @@ protected ApplicationAttemptId createSchedulingRequest(
userId);
}
+ protected ApplicationAttemptId createSchedulingRequest(
+ Resource capability, String queueId, String userId, int numContainers,
+ int priority) {
+ ResourceRequest request = createResourceRequest(capability,
+ ResourceRequest.ANY, priority, numContainers, true);
+ return createSchedulingRequest(Lists.newArrayList(request), queueId,
+ userId);
+ }
+
protected ApplicationAttemptId createSchedulingRequest(
Collection requests, String queueId, String userId) {
ApplicationAttemptId id =
@@ -357,4 +390,15 @@ protected void addNode(int memory, int cores) {
scheduler.handle(new NodeAddedSchedulerEvent(node));
rmNodes.add(node);
}
+
+ protected void addNode(int memory, int cores,
+ Map customResources) {
+ int id = rmNodes.size() + 1;
+ RMNode node = MockNodes.newNodeInfo(1,
+ ResourceTypesTestHelper.newResource(memory, cores, customResources),
+ id, "127.0.0." + id);
+ scheduler.handle(new NodeAddedSchedulerEvent(node));
+ rmNodes.add(node);
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
index da6428a8b5a..2b5cca587a0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
@@ -18,56 +18,37 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
-import org.apache.hadoop.yarn.util.ControlledClock;
-import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.After;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-
-import java.io.File;
-import java.io.FileWriter;
import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerPreemptionTestContext.GB;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
+ .FairSchedulerPreemptionTestContext.NODE_CAPACITY_MULTIPLE;
+import static org.junit.Assert.assertTrue;
+
/**
* Tests to verify fairshare and minshare preemption, using parameterization.
*/
@RunWith(Parameterized.class)
-public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
- private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues");
- private static final int GB = 1024;
-
- // Scheduler clock
- private final ControlledClock clock = new ControlledClock();
-
- // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
- private static final int NODE_CAPACITY_MULTIPLE = 4;
-
- private final boolean fairsharePreemption;
- private final boolean drf;
+public class TestFairSchedulerPreemption {
+ private final FairSchedulerPreemptionTestContext ctx;
- // App that takes up the entire cluster
- private FSAppAttempt greedyApp;
-
- // Starving app that is expected to instigate preemption
- private FSAppAttempt starvingApp;
+ private static final String MIN_RESOURCES = "4096mb,4vcores";
@Parameterized.Parameters(name = "{0}")
public static Collection