diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java index 85d8715..e9931a5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java @@ -29,11 +29,15 @@ public class PendingAsk { private final Resource perAllocationResource; private final int count; - public final static PendingAsk ZERO = new PendingAsk(Resources.none(), 0); + private final boolean isGuaranteedTypeEnforced; - public PendingAsk(Resource res, int num) { + public final static PendingAsk ZERO = + new PendingAsk(Resources.none(), 0, false); + + public PendingAsk(Resource res, int num, boolean guaranteedTypeEnforced) { this.perAllocationResource = res; this.count = num; + this.isGuaranteedTypeEnforced = guaranteedTypeEnforced; } public Resource getPerAllocationResource() { @@ -44,11 +48,17 @@ public int getCount() { return count; } + public boolean isGuaranteedTypeEnforced() { + return isGuaranteedTypeEnforced; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append(""); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 1d4deee..25a0cf6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -846,6 +846,11 @@ private Resource assignContainer( FSSchedulerNode node, PendingAsk pendingAsk, NodeType type, boolean reserved, boolean opportunistic, SchedulerRequestKey schedulerKey) { + if (pendingAsk.isGuaranteedTypeEnforced() && opportunistic) { + // do not attempt to assign an OPPORTUNISTIC container to a resource + // request that has explicitly opted out of oversubscription + return Resources.none(); + } // How much does this request need? Resource capability = pendingAsk.getPerAllocationResource(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java index 6cc8cc7..5aaba51 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java @@ -21,6 +21,8 @@ import org.apache.commons.collections.IteratorUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; @@ -176,8 +178,9 @@ public PendingAsk getPendingAsk(String resourceName) { if (null == request) { return PendingAsk.ZERO; } else{ + boolean guaranteedEnforced = enforceGuaranteedExecutionType(request); return new PendingAsk(request.getCapability(), - request.getNumContainers()); + request.getNumContainers(), guaranteedEnforced); } } finally { readLock.unlock(); @@ -185,6 +188,21 @@ public PendingAsk getPendingAsk(String resourceName) { } + /** + * Check for a given ResourceRequest, if its guaranteed execution type + * needs to be enforced. + * @param request resource request + * @return true if its guaranteed execution type is to be enforced. + * false otherwise + */ + private static boolean enforceGuaranteedExecutionType( + ResourceRequest request) { + ExecutionTypeRequest executionType = request.getExecutionTypeRequest(); + return executionType != null && + executionType.getExecutionType() == ExecutionType.GUARANTEED && + executionType.getEnforceExecutionType(); + } + @Override public int getOutstandingAsksCount(String resourceName) { try { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index af4e1dd..6d9df4d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -24,6 +24,8 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -115,9 +117,15 @@ protected ResourceRequest createResourceRequest( relaxLocality); } + protected ResourceRequest createResourceRequest(int memory, int vcores, + String host, int priority, int numContainers, boolean relaxLocality) { + return createResourceRequest(memory, vcores, host, priority, + numContainers, relaxLocality, false); + } + protected ResourceRequest createResourceRequest( int memory, int vcores, String host, int priority, int numContainers, - boolean relaxLocality) { + boolean relaxLocality, boolean guaranteedExecutionEnforced) { ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class); request.setCapability(BuilderUtils.newResource(memory, vcores)); request.setResourceName(host); @@ -127,6 +135,11 @@ protected ResourceRequest createResourceRequest( request.setPriority(prio); request.setRelaxLocality(relaxLocality); request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL); + if (guaranteedExecutionEnforced) { + ExecutionTypeRequest executionType = ExecutionTypeRequest.newInstance( + ExecutionType.GUARANTEED, true); + request.setExecutionTypeRequest(executionType); + } return request; } @@ -150,6 +163,13 @@ protected ApplicationAttemptId createSchedulingRequest( } protected ApplicationAttemptId createSchedulingRequest( + int memory, String queueId, String userId, + int numContainers, boolean guaranteedExecutionEnforced) { + return createSchedulingRequest(memory, 1, queueId, + userId, numContainers, 1, guaranteedExecutionEnforced); + } + + protected ApplicationAttemptId createSchedulingRequest( int memory, int vcores, String queueId, String userId, int numContainers) { return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1); } @@ -163,6 +183,13 @@ protected ApplicationAttemptId createSchedulingRequest( protected ApplicationAttemptId createSchedulingRequest( int memory, int vcores, String queueId, String userId, int numContainers, int priority) { + return createSchedulingRequest(memory, vcores, queueId, userId, + numContainers, priority, false); + } + + protected ApplicationAttemptId createSchedulingRequest( + int memory, int vcores, String queueId, String userId, int numContainers, + int priority, boolean guaranteedExecutionEnforced) { ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); scheduler.addApplication(id.getApplicationId(), queueId, userId, false); // This conditional is for testAclSubmitApplication where app is rejected @@ -171,8 +198,9 @@ protected ApplicationAttemptId createSchedulingRequest( scheduler.addApplicationAttempt(id, false, false); } List ask = new ArrayList(); - ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, - priority, numContainers, true); + ResourceRequest request = createResourceRequest(memory, vcores, + ResourceRequest.ANY, priority, numContainers, true, + guaranteedExecutionEnforced); ask.add(request); RMApp rmApp = mock(RMApp.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index e70053c..b337757 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2704,6 +2704,83 @@ public void testReservationWithMultiplePriorities() throws IOException { getPriority().getPriority()); } + @Test + public void testResourceRequestOptOutOfOversubscription() throws Exception { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + FairSchedulerConfiguration. + DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation threshold + // of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(4096, 4), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create a scheduling request that leaves some unallocated resources + ApplicationAttemptId appAttempt1 = + createSchedulingRequest(3600, "queue1", "user1", 1, false); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(3600, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + + // node utilization is low after the container runs on the node + ContainerStatus containerStatus = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.singletonList(containerStatus), + Collections.emptyList()), + ResourceUtilization.newInstance(1024, 0, 0.1f)); + + // create another scheduling request that opts out of oversubscription and + // asks for more than what's left unallocated on the node. + ApplicationAttemptId appAttempt2 = + createSchedulingRequest(1536, "queue2", "user1", 1, true); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers2.size() == 0); + + // verify that a reservation is made for the second request + assertTrue("A reservation should be made for the second request", + scheduler.getNode(node.getNodeID()).getReservedContainer(). + getReservedResource().equals(Resource.newInstance(1536, 1))); + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + memoryAllocationIncrement); + } + } + /** * Test that NO OPPORTUNISTIC containers can be allocated on a node that * is fully allocated and with a very high utilization.