From 11f21cba679353a030d88f29a0145f7ea6b5df96 Mon Sep 17 00:00:00 2001 From: "tanu.ajmera" Date: Thu, 28 Jan 2021 13:55:04 +0530 Subject: [PATCH] YARN-10589. Improve logic of multi-node allocation --- .../scheduler/AppSchedulingInfo.java | 15 ++- .../capacity/allocator/AllocationState.java | 3 +- .../allocator/ContainerAllocation.java | 6 ++ .../allocator/RegularContainerAllocator.java | 21 +++- .../TestCapacitySchedulerAsyncScheduling.java | 96 +++++++++++++++++-- 5 files changed, 128 insertions(+), 13 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 8e65e6a42e3..093d88d3dd9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -806,7 +806,20 @@ public boolean precheckNode(SchedulerRequestKey schedulerKey, try { AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get(schedulerKey); - return (ap != null) && (ap.getPlacementAttempt() < retryAttempts) && + return (ap != null) && (ap.getPlacementAttempt() < retryAttempts); + } finally { + this.readLock.unlock(); + } + } + + public boolean precheckNodePartition(SchedulerRequestKey schedulerKey, + SchedulerNode schedulerNode, SchedulingMode schedulingMode, + Optional dcOpt) { + this.readLock.lock(); + try { + AppPlacementAllocator ap = + schedulerKeyToAppPlacementAllocator.get(schedulerKey); + return (ap != null) && ap.precheckNode(schedulerNode, schedulingMode, dcOpt); } finally { this.readLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java index d1580bd995d..35ddffe19f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java @@ -24,5 +24,6 @@ LOCALITY_SKIPPED, QUEUE_SKIPPED, ALLOCATED, - RESERVED + RESERVED, + PARTITION_SKIPPED } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java index b9b9bcff2f6..6199ac22a19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java @@ -54,6 +54,12 @@ public static final ContainerAllocation QUEUE_SKIPPED = new ContainerAllocation(null, null, AllocationState.QUEUE_SKIPPED); + /** + * Skip the partition, and look at the other partitions of nodes + */ + public static final ContainerAllocation PARTITION_SKIPPED = + new ContainerAllocation(null, null, AllocationState.PARTITION_SKIPPED); + RMContainer containerToBeUnreserved; private Resource resourceToBeAllocated = Resources.none(); private AllocationState state; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index cced238b601..60bd21b34df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -156,6 +156,16 @@ private ContainerAllocation preCheckForNodeCandidateSet( ActivityLevel.NODE); return ContainerAllocation.PRIORITY_SKIPPED; } + if (!appInfo.precheckNodePartition + (schedulerKey, node, schedulingMode, dcOpt)) { + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, schedulerKey, + ActivityDiagnosticConstant. + NODE_DO_NOT_MATCH_PARTITION_OR_PLACEMENT_CONSTRAINTS + + ActivitiesManager.getDiagnostics(dcOpt), + ActivityLevel.NODE); + return ContainerAllocation.PARTITION_SKIPPED; + } if (!application.getCSLeafQueue().getReservationContinueLooking()) { if (!shouldAllocOrReserveNewContainer(schedulerKey, required)) { @@ -860,7 +870,11 @@ private ContainerAllocation allocate(Resource clusterResource, result = preCheckForNodeCandidateSet(clusterResource, node, schedulingMode, resourceLimits, schedulerKey); if (null != result) { - continue; + if (result == ContainerAllocation.PARTITION_SKIPPED) { + return result; + } else { + continue; + } } } else { // pre-check when allocating reserved container @@ -928,7 +942,10 @@ public CSAssignment assignContainers(Resource clusterResource, schedulingMode, resourceLimits, schedulerKey, null); AllocationState allocationState = result.getAllocationState(); - if (allocationState == AllocationState.PRIORITY_SKIPPED) { + // When the partition not meet the priority it will return + // AllocationState.PARTITION_SKIPPED, this should also skip. + if (allocationState == AllocationState.PRIORITY_SKIPPED + || allocationState == AllocationState.PARTITION_SKIPPED) { continue; } return getCSAssignmentFromAllocateResult(clusterResource, result, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index 5f2bbf0190c..4b3ceaa2599 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -20,15 +20,9 @@ import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerExitStatus; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NodeState; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; +import org.apache.hadoop.thirdparty.com.google.common.collect.Sets; +import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -74,6 +68,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; public class TestCapacitySchedulerAsyncScheduling { @@ -130,6 +125,89 @@ public void testAsyncContainerAllocationWithMultiNode() throws Exception { testAsyncContainerAllocation(2); } + private Set toSet(String... elements) { + Set set = Sets.newHashSet(elements); + return set; + } + + @Test(timeout = 300000) + public void testAsyncContainerAllocationBasedOnPartition() throws Exception { + CapacitySchedulerConfiguration conf1 = + new CapacitySchedulerConfiguration(conf); + conf1.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES, + "resource-based"); + conf1.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME, + "resource-based"); + String policyName = + CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + + ".resource-based" + ".class"; + conf1.set(policyName, POLICY_CLASS_NAME); + conf1.setBoolean(CapacitySchedulerConfiguration. + MULTI_NODE_PLACEMENT_ENABLED, true); + conf1.setInt(CapacitySchedulerConfiguration. + SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, 2); + conf1.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX + + ".scheduling-interval-ms", 0); + conf1.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + conf1.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] {"a", "b", "c"}); + conf1.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + conf1.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100); + + final String queueA = CapacitySchedulerConfiguration.ROOT + ".a"; + conf1.setCapacity(queueA, 10); + conf1.setMaximumCapacity(queueA, 15); + conf1.setAccessibleNodeLabels(queueA, toSet("x")); + + final String queueB = CapacitySchedulerConfiguration.ROOT + ".b"; + conf1.setCapacity(queueB, 20); + conf1.setAccessibleNodeLabels(queueB, toSet("y")); + + final RMNodeLabelsManager mgr1 = new NullRMNodeLabelsManager(); + mgr1.init(conf1); + + // inject node label manager + MockRM rm = new MockRM(TestUtils.getConfigurationWithMultipleQueues(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr1; + } + }; + + mgr1.addToCluserNodeLabels(Arrays.asList(NodeLabel.newInstance("x", true))); + mgr1.addToCluserNodeLabels(Arrays.asList(NodeLabel.newInstance("y", true))); + + rm.getRMContext().setNodeLabelManager(mgr1); + rm.start(); + + List nms = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + nms.add(rm.registerNode("127.0.0." + i + ":1234", 8000)); + } + keepNMHeartbeat(nms, 1000); + + //assign Node Partition name to all nodes + mgr1.addLabelsToNode(ImmutableMap.of(nms.get(0).getNodeId(), toSet("x"))); + mgr1.addLabelsToNode(ImmutableMap.of(nms.get(1).getNodeId(), toSet("y"))); + + MockRMAppSubmissionData data = + MockRMAppSubmissionData.Builder.createWithMemory(200, rm) + .withAppName("app") + .withUser("user") + .withAcls(null) + .withQueue("a") + .withUnmanagedAM(false) + .build(); + RMApp app = MockRMAppSubmitter.submit(rm, data); + MockAM am = MockRM.launchAndRegisterAM(app, rm, nms.get(0)); + am.allocate("*", 1024, 1, new ArrayList(), "x"); + ContainerId containerId = + ContainerId.newContainerId(am.getApplicationAttemptId(), 1); + Assert.assertTrue(rm.waitForState(nms.get(0), containerId, + RMContainerState.ALLOCATED)); + rm.close(); + } + public void testAsyncContainerAllocation(int numThreads) throws Exception { conf.setInt( CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, -- 2.28.0