diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index 48c2c364ae9..37cee571b6b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -304,6 +304,18 @@ public static Resource subtractFromNonNegative(Resource lhs, Resource rhs) { if (lhs.getVirtualCores() < 0) { lhs.setVirtualCores(0); } + //make sure all custom resource have a value of zero at least + int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + for (int i = 2; i < maxLength; i++) { + try { + ResourceInformation lhsValue = lhs.getResourceInformation(i); + if (lhsValue.getValue() < 0) { + lhs.setResourceValue(i, 0); + } + } catch (ResourceNotFoundException ye) { + LOG.warn("Resource is missing:" + ye.getMessage()); + } + } return lhs; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index da5e4c9347e..0251e9405b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.SchedulingRequest; @@ -1207,15 +1208,32 @@ private void updateRootQueueMetrics() { */ private boolean shouldAttemptPreemption() { if (context.isPreemptionEnabled()) { - return (context.getPreemptionUtilizationThreshold() < Math.max( - (float) rootMetrics.getAllocatedMB() / - getClusterResource().getMemorySize(), - (float) rootMetrics.getAllocatedVirtualCores() / - getClusterResource().getVirtualCores())); + final Resource clusterResource = getClusterResource(); + float utilization = + getMaxUtilizationOfAllocatedResources(clusterResource); + return (context.getPreemptionUtilizationThreshold() < utilization); } return false; } + @VisibleForTesting + float getMaxUtilizationOfAllocatedResources( + Resource clusterResource) { + final Resource allocated = rootMetrics.getAllocatedResources(); + float maxUtilization = 0; + for (ResourceInformation res : allocated.getResources()) { + long clusterUtilization = + clusterResource.getResourceValue(res.getName()); + if (clusterUtilization != 0) { + float utilization = (float) res.getValue() / clusterUtilization; + if (utilization > maxUtilization) { + maxUtilization = utilization; + } + } + } + return maxUtilization; + } + @Override public QueueMetrics getRootQueueMetrics() { return rootMetrics; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 4f1f20b942b..46837a4c0c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; @@ -60,6 +61,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; public class FairSchedulerTestBase { public final static String TEST_DIR = @@ -357,4 +359,15 @@ protected void addNode(int memory, int cores) { scheduler.handle(new NodeAddedSchedulerEvent(node)); rmNodes.add(node); } + + protected void addNode(int memory, int cores, + Map customResources) { + int id = rmNodes.size() + 1; + RMNode node = MockNodes.newNodeInfo(1, + ResourceTypesTestHelper.newResource(memory, cores, customResources), + id, "127.0.0." + id); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + rmNodes.add(node); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index da6428a8b5a..a4781d50a44 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -17,25 +17,34 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; + +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileWriter; @@ -46,6 +55,11 @@ import java.util.Collection; import java.util.List; +import static org.apache.hadoop.yarn.util.resource.ResourceUtils.UNITS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + /** * Tests to verify fairshare and minshare preemption, using parameterization. */ @@ -53,6 +67,8 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues"); private static final int GB = 1024; + private static final Logger LOG = + LoggerFactory.getLogger(TestFairSchedulerPreemption.class); // Scheduler clock private final ControlledClock clock = new ControlledClock(); @@ -60,29 +76,40 @@ // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore) private static final int NODE_CAPACITY_MULTIPLE = 4; + private static final String CUSTOM_RES_1 = "custom_res_1"; + private static final String CUSTOM_RES_2 = "custom_res_2"; + private static final Resource BASE_RESOURCE_FOR_CUSTOM_RES_REQUEST = + BuilderUtils.newResource(GB, 1); + private final boolean fairsharePreemption; private final boolean drf; + private final boolean useCustomResources; // App that takes up the entire cluster private FSAppAttempt greedyApp; // Starving app that is expected to instigate preemption private FSAppAttempt starvingApp; + + private Resource totalClusterCapacity; - @Parameterized.Parameters(name = "{0}") + @Parameterized.Parameters(name = "{0} - {2}") public static Collection getParameters() { return Arrays.asList(new Object[][] { - {"MinSharePreemption", 0}, - {"MinSharePreemptionWithDRF", 1}, - {"FairSharePreemption", 2}, - {"FairSharePreemptionWithDRF", 3} + {"MinSharePreemption", 0, false}, + {"MinSharePreemptionWithDRF", 1, false}, + {"FairSharePreemption", 2, false}, + {"FairSharePreemptionWithDRF", 3, false}, + {"FairSharePreemptionWithDRF", 3, true}, + {"MinSharePreemptionWithDRF", 1, true}, }); } - public TestFairSchedulerPreemption(String name, int mode) - throws IOException { + public TestFairSchedulerPreemption(String name, int mode, + boolean useCustomResources) throws IOException { fairsharePreemption = (mode > 1); // 2 and 3 drf = (mode % 2 == 1); // 1 and 3 + this.useCustomResources = useCustomResources; writeAllocFile(); } @@ -94,7 +121,25 @@ public void setup() throws IOException { conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true); conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f); conf.setInt(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 0); + if (useCustomResources) { + String resourceNames = registerCustomResources(); + conf.set(YarnConfiguration.RESOURCE_TYPES, resourceNames); + ResourceUtils.resetResourceTypes(conf); + } setupCluster(); + totalClusterCapacity = Resources.createResource(0, 0); + for (RMNode rmNode : rmNodes) { + Resources.addTo(totalClusterCapacity, rmNode.getTotalCapability()); + } + LOG.debug("Total cluster capacity: " + totalClusterCapacity); + } + + private String registerCustomResources() { + conf.set(YarnConfiguration.RESOURCE_TYPES + "." + CUSTOM_RES_1 + UNITS, + "G"); + conf.set(YarnConfiguration.RESOURCE_TYPES + "." + CUSTOM_RES_2 + UNITS, + "G"); + return Joiner.on(',').join(CUSTOM_RES_1, CUSTOM_RES_2); } @After @@ -185,7 +230,12 @@ private void writePreemptionParams(PrintWriter out) { private void writeResourceParams(PrintWriter out) { if (!fairsharePreemption) { - out.println("4096mb,4vcores"); + if (useCustomResources) { + out.println("memory-mb=4096,vcores=4," + + "custom_res_1=40"); + } else { + out.println("4096mb,4vcores"); + } } } @@ -200,8 +250,19 @@ private void setupCluster() throws IOException { // Create and add two nodes to the cluster, with capacities // disproportional to the container requests. - addNode(NODE_CAPACITY_MULTIPLE * GB, 3 * NODE_CAPACITY_MULTIPLE); - addNode(NODE_CAPACITY_MULTIPLE * GB, 3 * NODE_CAPACITY_MULTIPLE); + if (useCustomResources) { + ImmutableMap customResources = ImmutableMap + . builder() + .put(CUSTOM_RES_1, String.valueOf(40)) + .put(CUSTOM_RES_2, String.valueOf(80)).build(); + addNode(NODE_CAPACITY_MULTIPLE * GB, 3 * NODE_CAPACITY_MULTIPLE, + customResources); + addNode(NODE_CAPACITY_MULTIPLE * GB, 3 * NODE_CAPACITY_MULTIPLE, + customResources); + } else { + addNode(NODE_CAPACITY_MULTIPLE * GB, 3 * NODE_CAPACITY_MULTIPLE); + addNode(NODE_CAPACITY_MULTIPLE * GB, 3 * NODE_CAPACITY_MULTIPLE); + } // Reinitialize the scheduler so DRF policy picks up cluster capacity // TODO (YARN-6194): One shouldn't need to call this @@ -233,13 +294,58 @@ private void sendEnoughNodeUpdatesToAssignFully() { */ private void takeAllResources(String queueName) { // Create an app that takes up all the resources on the cluster - ApplicationAttemptId appAttemptId - = createSchedulingRequest(GB, 1, queueName, "default", - NODE_CAPACITY_MULTIPLE * rmNodes.size()); + int numContainers = NODE_CAPACITY_MULTIPLE * rmNodes.size(); + ApplicationAttemptId appAttemptId = + createSchedulingRequest(GB, 1, queueName, "default", numContainers); + takeResourcesInternal(queueName, appAttemptId, numContainers); + } + + private void takeAllResourcesOfCustomResource(String queueName, + String customRes, Resource baseResource) { + // Create an app that takes up all the resources of a custom resource on the + // cluster + int numContainers = NODE_CAPACITY_MULTIPLE * rmNodes.size(); + final ResourceRequest resourceRequest; + if (baseResource != null) { + resourceRequest = + createResourceRequest( + (int) baseResource.getMemorySize(), + baseResource.getVirtualCores(), ResourceRequest.ANY, 1, + numContainers, + true); + } else { + resourceRequest = + createResourceRequest(GB, 1, ResourceRequest.ANY, 1, numContainers, + true); + } + + Resource capability = resourceRequest.getCapability(); + + long totalValueOfCustomRes = + totalClusterCapacity.getResourceValue(customRes); + long customResAmount = totalValueOfCustomRes / numContainers; + resourceRequest.setCapability( + ResourceTypesTestHelper.newResource(capability.getMemorySize(), + capability.getVirtualCores(), + ImmutableMap.builder() + .put(customRes, String.valueOf( + customResAmount)) + .build())); + + ApplicationAttemptId appAttemptId = createSchedulingRequest( + Lists.newArrayList(resourceRequest), queueName, "default"); + takeResourcesInternal(queueName, appAttemptId, numContainers); + } + + private void takeResourcesInternal(String queueName, + ApplicationAttemptId appAttemptId, int numContainers) { greedyApp = scheduler.getSchedulerApp(appAttemptId); scheduler.update(); sendEnoughNodeUpdatesToAssignFully(); - assertEquals(8, greedyApp.getLiveContainers().size()); + + //cluster has 8GB of memory / 24 vcores in total + //requested app has 1GB of memory / 1 vcores --> 8 containers + assertEquals(numContainers, greedyApp.getLiveContainers().size()); // Verify preemptable for queue and app attempt assertTrue( scheduler.getQueueManager().getQueue(queueName).isPreemptable() @@ -251,11 +357,8 @@ private void takeAllResources(String queueName) { * cluster. * * @param queueName queue name - * @throws InterruptedException - * if any thread has interrupted the current thread. */ - private void preemptHalfResources(String queueName) - throws InterruptedException { + private void preemptHalfResources(String queueName) { ApplicationAttemptId appAttemptId = createSchedulingRequest(2 * GB, 2, queueName, "default", NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2); @@ -266,6 +369,30 @@ private void preemptHalfResources(String queueName) scheduler.update(); } + private void preemptHalfResourcesCustomResource(String queueName, + String customRes) { + int numContainers = NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2; + + long totalValueOfCustomRes = + totalClusterCapacity.getResourceValue(customRes); + long customResAmount = totalValueOfCustomRes / numContainers; + ResourceRequest resourceRequest = createResourceRequest(2 * GB, 2, + ResourceRequest.ANY, 1, numContainers, true); + Resource capability = resourceRequest.getCapability(); + resourceRequest.setCapability(ResourceTypesTestHelper.newResource( + capability.getMemorySize(), capability.getVirtualCores(), + ImmutableMap. builder() + .put(customRes, String.valueOf(customResAmount)).build())); + + ApplicationAttemptId appAttemptId = createSchedulingRequest( + Lists.newArrayList(resourceRequest), queueName, "default"); + starvingApp = scheduler.getSchedulerApp(appAttemptId); + + // Move clock enough to identify starvation + clock.tickSec(1); + scheduler.update(); + } + /** * Submit application to {@code queue1} and take over the entire cluster. * Submit application with larger containers to {@code queue2} that @@ -275,11 +402,23 @@ private void preemptHalfResources(String queueName) * @param queue2 second queue * @throws InterruptedException if interrupted while waiting */ - private void submitApps(String queue1, String queue2) - throws InterruptedException { + private void submitApps(String queue1, String queue2) { takeAllResources(queue1); preemptHalfResources(queue2); } + + private void submitAppsCustomResources(String queue1, + String queue2, String customRes) { + takeAllResourcesOfCustomResource(queue1, customRes, + BASE_RESOURCE_FOR_CUSTOM_RES_REQUEST); + preemptHalfResourcesCustomResource(queue2, customRes); + } + + private void submitAppsCustomResources(String queue1, + String queue2, Resource res, String customRes) { + takeAllResourcesOfCustomResource(queue1, customRes, res); + preemptHalfResourcesCustomResource(queue2, customRes); + } private void verifyPreemption(int numStarvedAppContainers, int numGreedyAppContainers) @@ -342,7 +481,20 @@ private void verifyNoPreemption() throws InterruptedException { @Test public void testPreemptionWithinSameLeafQueue() throws Exception { String queue = "root.preemptable.child-1"; - submitApps(queue, queue); + if (useCustomResources) { + if (!fairsharePreemption) { + // In case of minshare preemption, make sure to request + // minimum share of the queue [4 * (1GB, 1vcore) --> 4GB, 4vcores] + // so that minshare starvation does not happen for basic resources + submitAppsCustomResources(queue, queue, + BASE_RESOURCE_FOR_CUSTOM_RES_REQUEST, CUSTOM_RES_1); + } else { + submitAppsCustomResources(queue, queue, CUSTOM_RES_1); + } + } else { + submitApps(queue, queue); + } + if (fairsharePreemption) { verifyPreemption(2, 4); } else { @@ -352,13 +504,27 @@ public void testPreemptionWithinSameLeafQueue() throws Exception { @Test public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception { - submitApps("root.preemptable.child-1", "root.preemptable.child-2"); + String queue1 = "root.preemptable.child-1"; + String queue2 = "root.preemptable.child-2"; + + if (useCustomResources) { + submitAppsCustomResources(queue1, queue2, CUSTOM_RES_1); + } else { + submitApps(queue1, queue2); + } verifyPreemption(2, 4); } @Test public void testPreemptionBetweenNonSiblingQueues() throws Exception { - submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1"); + String queue1 = "root.preemptable.child-1"; + String queue2 = "root.nonpreemptable.child-1"; + + if (useCustomResources) { + submitAppsCustomResources(queue1, queue2, CUSTOM_RES_1); + } else { + submitApps(queue1, queue2); + } verifyPreemption(2, 4); } @@ -396,9 +562,18 @@ private void setAllAMContainersOnNode(NodeId nodeId) { @Test public void testPreemptionSelectNonAMContainer() throws Exception { - takeAllResources("root.preemptable.child-1"); - setNumAMContainersPerNode(2); - preemptHalfResources("root.preemptable.child-2"); + String queue1 = "root.preemptable.child-1"; + String queue2 = "root.preemptable.child-2"; + if (useCustomResources) { + takeAllResourcesOfCustomResource(queue1, CUSTOM_RES_1, + BASE_RESOURCE_FOR_CUSTOM_RES_REQUEST); + setNumAMContainersPerNode(2); + preemptHalfResourcesCustomResource(queue2, CUSTOM_RES_1); + } else { + takeAllResources(queue1); + setNumAMContainersPerNode(2); + preemptHalfResources(queue2); + } verifyPreemption(2, 4); @@ -414,7 +589,14 @@ public void testPreemptionSelectNonAMContainer() throws Exception { @Test public void testRelaxLocalityToNotPreemptAM() throws Exception { - takeAllResources("root.preemptable.child-1"); + String queue1 = "root.preemptable.child-1"; + String queue2 = "root.preemptable.child-2"; + if (useCustomResources) { + takeAllResourcesOfCustomResource(queue1, CUSTOM_RES_1, + BASE_RESOURCE_FOR_CUSTOM_RES_REQUEST); + } else { + takeAllResources(queue1); + } RMNode node1 = rmNodes.get(0); setAllAMContainersOnNode(node1.getNodeID()); SchedulerNode node = scheduler.getNodeTracker().getNode(node1.getNodeID()); @@ -436,7 +618,7 @@ public void testRelaxLocalityToNotPreemptAM() throws Exception { Arrays.asList(nodeRequest, rackRequest, anyRequest); ApplicationAttemptId starvedAppAttemptId = createSchedulingRequest( - "root.preemptable.child-2", "default", resourceRequests); + queue2, "default", resourceRequests); starvingApp = scheduler.getSchedulerApp(starvedAppAttemptId); // Move clock enough to identify starvation @@ -481,15 +663,50 @@ public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare() return; } - // Let one of the child queues take over the entire cluster - takeAllResources("root.preemptable.child-1"); + String queue1 = "root.preemptable.child-1"; + String queue2 = "root.preemptable-sibling"; + String queue3 = "root.preemptable.child-2"; + if (useCustomResources) { + takeAllResourcesOfCustomResource(queue1, CUSTOM_RES_1, + BASE_RESOURCE_FOR_CUSTOM_RES_REQUEST); + preemptHalfResourcesCustomResource(queue2, CUSTOM_RES_1); + verifyPreemption(2, 4); + preemptHalfResourcesCustomResource(queue3, CUSTOM_RES_1); + verifyPreemption(1, 2); + } else { + // Let one of the child queues take over the entire cluster + takeAllResources(queue1); - // Submit a job so half the resources go to parent's sibling - preemptHalfResources("root.preemptable-sibling"); - verifyPreemption(2, 4); + // Submit a job so half the resources go to parent's sibling + preemptHalfResources(queue2); + verifyPreemption(2, 4); + + // Submit a job to the child's sibling to force preemption from the child + preemptHalfResources(queue3); + verifyPreemption(1, 2); + } + } - // Submit a job to the child's sibling to force preemption from the child - preemptHalfResources("root.preemptable.child-2"); - verifyPreemption(1, 2); + @Test + public void + testGetMaxUtilizationOfAllocatedResourcesWithoutAllocatedResource() { + if (!useCustomResources) { + return; + } + assertEquals(0, + scheduler.getMaxUtilizationOfAllocatedResources( + totalClusterCapacity), 0.0); + } + + @Test + public void testGetMaxUtilizationOfAllocatedResourcesWithAllocatedResource() { + if (!useCustomResources) { + return; + } + takeAllResourcesOfCustomResource("root", CUSTOM_RES_1, + BASE_RESOURCE_FOR_CUSTOM_RES_REQUEST); + assertEquals(1, + scheduler.getMaxUtilizationOfAllocatedResources(totalClusterCapacity), + 0.0); } }