diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index aae5292..9616724 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.Container; @@ -683,13 +684,60 @@ ContainerAllocation doAllocation(ContainerAllocation allocationResult, } // Non-exclusive scheduling opportunity is different: we need reset - // it every time to make sure non-labeled resource request will be + // it when: + // - There's pending request for the partition which cannot be addressed + // with existing available resources of the partition, OR + // - It allocated on the default partition + // + // This is to make sure non-labeled resource request will be // most likely allocated on non-labeled nodes first. - application.resetMissedNonPartitionedRequestSchedulingOpportunity(priority); + resetNonPartitionedSchedulingOpportunity(allocationResult, node, + priority); } return allocationResult; } + + private void resetNonPartitionedSchedulingOpportunity( + ContainerAllocation allocationResult, FiCaSchedulerNode node, + Priority priority) { + // Non-exclusive scheduling opportunity need to be reset when it is + // allocated on the default partition + if (StringUtils.equals(node.getPartition(), RMNodeLabelsManager.NO_LABEL)) { + application + .resetMissedNonPartitionedRequestSchedulingOpportunity(priority); + return; + } + + // Total pending resource will be sum of pending resources in the given + // partition and the current resource request for no_label. + Resource pendingResourceWithNewRequest = Resources + .add( + application.getCSContext().getClusterResourceUsage() + .getPending(node.getPartition()), + allocationResult.getResourceToBeAllocated()); + + // Calculate proposed new used capacity if the given resource request is + // accepted. + Resource totalUsed = Resources.add(pendingResourceWithNewRequest, + application.getCSContext().getClusterResourceUsage() + .getUsed(node.getPartition())); + + // Get the total label resource + Resource labelResource = application.getCSContext().getRMContext() + .getNodeLabelManager() + .getResourceByLabel(node.getPartition(), Resources.none()); + + // If the proposed new used capacity (including current resource request) + // is greater than the label capacity, we can reset the scheduling + // opportunity. + if (Resources.greaterThan(rc, + application.getCSContext().getClusterResource(), totalUsed, + labelResource)) { + application + .resetMissedNonPartitionedRequestSchedulingOpportunity(priority); + } + } private ContainerAllocation allocate(Resource clusterResource, FiCaSchedulerNode node, SchedulingMode schedulingMode, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 8009580..b519388 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -603,4 +603,8 @@ public void updateNodeInfoForAMDiagnostics(FiCaSchedulerNode node) { updateAMContainerDiagnostics(AMState.ACTIVATED, diagnosticMessageBldr.toString()); } } + + public CapacitySchedulerContext getCSContext() { + return capacitySchedulerContext; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java index 9bb8827..d862c75 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; @@ -280,6 +281,7 @@ public void testPriorityWithPendingApplications() throws Exception { // If app3 (highest priority among rest) gets active, it indicates that // priority is working with pendingApplications. rm.killApp(app1.getApplicationId()); + rm.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.KILLED); // kick the scheduler, app3 (high among pending) gets free space MockAM am3 = MockRM.launchAM(app3, rm, nm1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index cff1514..bd2c314 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -765,8 +765,6 @@ public RMNodeLabelsManager createNodeLabelManager() { rm1.start(); MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = - - ContainerId nextContainerId; // launch an app to queue b1 (label = y), AM container should be launched in nm3 RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1"); @@ -774,12 +772,13 @@ public RMNodeLabelsManager createNodeLabelManager() { // request containers from am2, priority=1 asks for "" and priority=2 asks // for "y", "y" container should be allocated first - nextContainerId = - ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); am1.allocate("*", 1 * GB, 1, 1, new ArrayList(), ""); am1.allocate("*", 1 * GB, 1, 2, new ArrayList(), "y"); - Assert.assertTrue(rm1.waitForState(nm1, nextContainerId, - RMContainerState.ALLOCATED)); + + // Do a node heartbeat once + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + cs.handle(new NodeUpdateSchedulerEvent( + rm1.getRMContext().getRMNodes().get(nm1.getNodeId()))); // Check pending resource for am2, priority=1 doesn't get allocated before // priority=2 allocated @@ -1671,7 +1670,7 @@ public RMNodeLabelsManager createNodeLabelManager() { // Test case 7 // After c allocated, d will go first because it has less used_capacity(x) // than c - doNMHeartbeat(rm, nm1.getNodeId(), 2); + doNMHeartbeat(rm, nm1.getNodeId(), 1); checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(), cs.getApplicationAttempt(am1.getApplicationAttemptId())); checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(), -- 2.7.4 (Apple Git-66)