From 445fc29d92111462f177ee0358e352c640c3a34f Mon Sep 17 00:00:00 2001 From: "tanu.ajmera" Date: Thu, 28 Jan 2021 13:55:04 +0530 Subject: [PATCH] YARN-10589. Improve logic of multi-node allocation --- .../capacity/allocator/AllocationState.java | 3 +- .../allocator/ContainerAllocation.java | 6 ++ .../allocator/RegularContainerAllocator.java | 7 +- .../TestCapacitySchedulerAsyncScheduling.java | 95 +++++++++++++++++-- 4 files changed, 100 insertions(+), 11 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java index d1580bd995d..35ddffe19f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java @@ -24,5 +24,6 @@ LOCALITY_SKIPPED, QUEUE_SKIPPED, ALLOCATED, - RESERVED + RESERVED, + PARTITION_SKIPPED } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java index b9b9bcff2f6..6199ac22a19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java @@ -54,6 +54,12 @@ public static final ContainerAllocation QUEUE_SKIPPED = new ContainerAllocation(null, null, AllocationState.QUEUE_SKIPPED); + /** + * Skip the partition, and look at the other partitions of nodes + */ + public static final ContainerAllocation PARTITION_SKIPPED = + new ContainerAllocation(null, null, AllocationState.PARTITION_SKIPPED); + RMContainer containerToBeUnreserved; private Resource resourceToBeAllocated = Resources.none(); private AllocationState state; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index cced238b601..b6174aaf426 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -154,7 +154,7 @@ private ContainerAllocation preCheckForNodeCandidateSet( NODE_DO_NOT_MATCH_PARTITION_OR_PLACEMENT_CONSTRAINTS + ActivitiesManager.getDiagnostics(dcOpt), ActivityLevel.NODE); - return ContainerAllocation.PRIORITY_SKIPPED; + return ContainerAllocation.PARTITION_SKIPPED; } if (!application.getCSLeafQueue().getReservationContinueLooking()) { @@ -875,6 +875,11 @@ private ContainerAllocation allocate(Resource clusterResource, result = tryAllocateOnNode(clusterResource, node, schedulingMode, resourceLimits, schedulerKey, reservedContainer); + if (AllocationState.PARTITION_SKIPPED == result.getAllocationState()) { + result = ContainerAllocation.PARTITION_SKIPPED; + break; + } + if (AllocationState.ALLOCATED == result.getAllocationState()) { result = doAllocation(result, node, schedulerKey, reservedContainer); break; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index 5f2bbf0190c..44a1b51b9ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -18,17 +18,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import com.google.common.collect.ImmutableSet; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerExitStatus; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NodeState; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; +import org.apache.hadoop.thirdparty.com.google.common.collect.Sets; +import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -74,6 +69,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; public class TestCapacitySchedulerAsyncScheduling { @@ -130,6 +126,87 @@ public void testAsyncContainerAllocationWithMultiNode() throws Exception { testAsyncContainerAllocation(2); } + private Set toSet(String... elements) { + Set set = Sets.newHashSet(elements); + return set; + } + + @Test(timeout = 300000) + public void testAsyncContainerAllocationBasedOnPartition() throws Exception { + CapacitySchedulerConfiguration conf1 = new CapacitySchedulerConfiguration(conf); + conf1.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES, + "resource-based"); + conf1.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME, + "resource-based"); + String policyName = + CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + + ".resource-based" + ".class"; + conf1.set(policyName, POLICY_CLASS_NAME); + conf1.setBoolean(CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED, + true); + conf1.setInt( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, + 2); + conf1.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX + + ".scheduling-interval-ms", 0); + conf1.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + conf1.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"}); + conf1.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + conf1.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + conf1.setCapacity(A, 10); + conf1.setMaximumCapacity(A, 15); + conf1.setAccessibleNodeLabels(A, toSet("x")); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + conf1.setCapacity(B, 20); + conf1.setAccessibleNodeLabels(B, toSet("y")); + + final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + + // inject node label manager + MockRM rm = new MockRM(TestUtils.getConfigurationWithMultipleQueues(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + mgr.addToCluserNodeLabels(Arrays.asList(NodeLabel.newInstance("x", true))); + mgr.addToCluserNodeLabels(Arrays.asList(NodeLabel.newInstance("y", true))); + + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + + List nms = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + nms.add(rm.registerNode("127.0.0." + i + ":1234", 8000)); + } + + //assign Node Partition name to all nodes + mgr.addLabelsToNode(ImmutableMap.of(nms.get(0).getNodeId(), toSet("x"))); + mgr.addLabelsToNode(ImmutableMap.of(nms.get(1).getNodeId(), toSet("y"))); + + MockRMAppSubmissionData data = + MockRMAppSubmissionData.Builder.createWithMemory(200, rm) + .withAppName("app") + .withUser("user") + .withAcls(null) + .withQueue("a") + .withUnmanagedAM(false) + .build(); + RMApp app = MockRMAppSubmitter.submit(rm, data); + MockAM am = MockRM.launchAndRegisterAM(app, rm, nms.get(0)); + am.allocate("*", 1024, 1, new ArrayList(), "x"); + ContainerId containerId = + ContainerId.newContainerId(am.getApplicationAttemptId(), 1); + Assert.assertTrue(rm.waitForState(nms.get(0), containerId, + RMContainerState.ALLOCATED)); + rm.close(); + } + public void testAsyncContainerAllocation(int numThreads) throws Exception { conf.setInt( CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, -- 2.28.0