From 573b36095cfd035d47a58518c1b399d64848fd83 Mon Sep 17 00:00:00 2001 From: "tanu.ajmera" Date: Thu, 28 Jan 2021 13:55:04 +0530 Subject: [PATCH] YARN-10589. Improve logic of multi-node allocation --- .../scheduler/AppSchedulingInfo.java | 16 +++- .../capacity/allocator/AllocationState.java | 3 +- .../allocator/ContainerAllocation.java | 6 ++ .../allocator/RegularContainerAllocator.java | 20 +++- .../TestCapacitySchedulerAsyncScheduling.java | 95 +++++++++++++++++-- 5 files changed, 126 insertions(+), 14 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 8e65e6a42e3..c9bdad62a82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -806,8 +806,20 @@ public boolean precheckNode(SchedulerRequestKey schedulerKey, try { AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get(schedulerKey); - return (ap != null) && (ap.getPlacementAttempt() < retryAttempts) && - ap.precheckNode(schedulerNode, schedulingMode, dcOpt); + return (ap != null) && (ap.getPlacementAttempt() < retryAttempts); + } finally { + this.readLock.unlock(); + } + } + + public boolean precheckNodePartition(SchedulerRequestKey schedulerKey, + SchedulerNode schedulerNode, SchedulingMode schedulingMode, + Optional dcOpt) { + this.readLock.lock(); + try { + AppPlacementAllocator ap = + schedulerKeyToAppPlacementAllocator.get(schedulerKey); + return (ap != null) && ap.precheckNode(schedulerNode, schedulingMode, dcOpt); } finally { this.readLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java index d1580bd995d..35ddffe19f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java @@ -24,5 +24,6 @@ LOCALITY_SKIPPED, QUEUE_SKIPPED, ALLOCATED, - RESERVED + RESERVED, + PARTITION_SKIPPED } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java index b9b9bcff2f6..6199ac22a19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java @@ -54,6 +54,12 @@ public static final ContainerAllocation QUEUE_SKIPPED = new ContainerAllocation(null, null, AllocationState.QUEUE_SKIPPED); + /** + * Skip the partition, and look at the other partitions of nodes + */ + public static final ContainerAllocation PARTITION_SKIPPED = + new ContainerAllocation(null, null, AllocationState.PARTITION_SKIPPED); + RMContainer containerToBeUnreserved; private Resource resourceToBeAllocated = Resources.none(); private AllocationState state; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index cced238b601..1afdba8f766 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -156,6 +156,15 @@ private ContainerAllocation preCheckForNodeCandidateSet( ActivityLevel.NODE); return ContainerAllocation.PRIORITY_SKIPPED; } + if (!appInfo.precheckNodePartition(schedulerKey, node, schedulingMode, dcOpt)) { + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, schedulerKey, + ActivityDiagnosticConstant. + NODE_DO_NOT_MATCH_PARTITION_OR_PLACEMENT_CONSTRAINTS + + ActivitiesManager.getDiagnostics(dcOpt), + ActivityLevel.NODE); + return ContainerAllocation.PARTITION_SKIPPED; + } if (!application.getCSLeafQueue().getReservationContinueLooking()) { if (!shouldAllocOrReserveNewContainer(schedulerKey, required)) { @@ -860,7 +869,11 @@ private ContainerAllocation allocate(Resource clusterResource, result = preCheckForNodeCandidateSet(clusterResource, node, schedulingMode, resourceLimits, schedulerKey); if (null != result) { - continue; + if (result == ContainerAllocation.PARTITION_SKIPPED) { + return result; + } else { + continue; + } } } else { // pre-check when allocating reserved container @@ -928,7 +941,10 @@ public CSAssignment assignContainers(Resource clusterResource, schedulingMode, resourceLimits, schedulerKey, null); AllocationState allocationState = result.getAllocationState(); - if (allocationState == AllocationState.PRIORITY_SKIPPED) { + // When the partition not meet the priority it will return + // AllocationState.PARTITION_SKIPPED, this should also skip. + if (allocationState == AllocationState.PRIORITY_SKIPPED + || allocationState == AllocationState.PARTITION_SKIPPED) { continue; } return getCSAssignmentFromAllocateResult(clusterResource, result, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index 5f2bbf0190c..ec6c5fbe066 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -18,17 +18,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerExitStatus; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NodeState; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; +import org.apache.hadoop.thirdparty.com.google.common.collect.Sets; +import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -74,6 +69,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; public class TestCapacitySchedulerAsyncScheduling { @@ -130,6 +126,87 @@ public void testAsyncContainerAllocationWithMultiNode() throws Exception { testAsyncContainerAllocation(2); } + private Set toSet(String... elements) { + Set set = Sets.newHashSet(elements); + return set; + } + + @Test(timeout = 300000) + public void testAsyncContainerAllocationBasedOnPartition() throws Exception { + CapacitySchedulerConfiguration conf1 = new CapacitySchedulerConfiguration(conf); + conf1.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES, + "resource-based"); + conf1.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME, + "resource-based"); + String policyName = + CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + + ".resource-based" + ".class"; + conf1.set(policyName, POLICY_CLASS_NAME); + conf1.setBoolean(CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED, + true); + conf1.setInt( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, + 2); + conf1.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX + + ".scheduling-interval-ms", 0); + conf1.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + conf1.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"}); + conf1.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + conf1.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + conf1.setCapacity(A, 10); + conf1.setMaximumCapacity(A, 15); + conf1.setAccessibleNodeLabels(A, toSet("x")); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + conf1.setCapacity(B, 20); + conf1.setAccessibleNodeLabels(B, toSet("y")); + + final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + + // inject node label manager + MockRM rm = new MockRM(TestUtils.getConfigurationWithMultipleQueues(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + mgr.addToCluserNodeLabels(Arrays.asList(NodeLabel.newInstance("x", true))); + mgr.addToCluserNodeLabels(Arrays.asList(NodeLabel.newInstance("y", true))); + + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + + List nms = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + nms.add(rm.registerNode("127.0.0." + i + ":1234", 8000)); + } + + //assign Node Partition name to all nodes + mgr.addLabelsToNode(ImmutableMap.of(nms.get(0).getNodeId(), toSet("x"))); + mgr.addLabelsToNode(ImmutableMap.of(nms.get(1).getNodeId(), toSet("y"))); + + MockRMAppSubmissionData data = + MockRMAppSubmissionData.Builder.createWithMemory(200, rm) + .withAppName("app") + .withUser("user") + .withAcls(null) + .withQueue("a") + .withUnmanagedAM(false) + .build(); + RMApp app = MockRMAppSubmitter.submit(rm, data); + MockAM am = MockRM.launchAndRegisterAM(app, rm, nms.get(0)); + am.allocate("*", 1024, 1, new ArrayList(), "x"); + ContainerId containerId = + ContainerId.newContainerId(am.getApplicationAttemptId(), 1); + Assert.assertTrue(rm.waitForState(nms.get(0), containerId, + RMContainerState.ALLOCATED)); + rm.close(); + } + public void testAsyncContainerAllocation(int numThreads) throws Exception { conf.setInt( CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, -- 2.28.0