From 81610542295149189761dc8eafefae353e6ddcc9 Mon Sep 17 00:00:00 2001 From: Prabhu Joseph Date: Wed, 22 Jul 2020 15:40:37 +0530 Subject: [PATCH] YARN-10360. Support Multi Node Placement in SingleConstraintAppPlacementAllocator --- .../TestDSWithMultipleNodeManager.java | 54 +++++++++++++++++++++- .../distributedshell/TestDistributedShell.java | 13 ++++-- .../scheduler/placement/AppPlacementAllocator.java | 40 +++++++++++++++- .../placement/LocalityAppPlacementAllocator.java | 37 --------------- .../SingleConstraintAppPlacementAllocator.java | 16 ------- 5 files changed, 99 insertions(+), 61 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java index f3571a6..59a48f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Arrays; +import java.util.Collection; import java.util.HashSet; import java.util.Set; import java.util.Iterator; @@ -29,32 +31,82 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import com.google.common.collect.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Test for Distributed Shell With Multiple Node Managers. + * Parameter 0 tests with Single Node Placement and + * parameter 1 tests with Multiple Node Placement. + */ +@RunWith(value = Parameterized.class) public class TestDSWithMultipleNodeManager { private static final Logger LOG = LoggerFactory.getLogger(TestDSWithMultipleNodeManager.class); static final int NUM_NMS = 2; TestDistributedShell distShellTest; + private final Boolean multiNodePlacementEnabled; + private static final String POLICY_CLASS_NAME = + "org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement." + + "ResourceUsageMultiNodeLookupPolicy"; + + + @Parameterized.Parameters + public static Collection getParams() { + return Arrays.asList(false, true); + } + + public TestDSWithMultipleNodeManager(Boolean multiNodePlacementEnabled) { + this.multiNodePlacementEnabled = multiNodePlacementEnabled; + } + + private YarnConfiguration getConfiguration(boolean multiNodePlacementEnabled) { + YarnConfiguration conf = new YarnConfiguration(); + if (multiNodePlacementEnabled) { + conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, + DominantResourceCalculator.class.getName()); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES, + "resource-based"); + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME, + "resource-based"); + String policyName = + CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + + ".resource-based" + ".class"; + conf.set(policyName, POLICY_CLASS_NAME); + conf.setBoolean(CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED, + true); + } + return conf; + } @Before public void setup() throws Exception { distShellTest = new TestDistributedShell(); - distShellTest.setupInternal(NUM_NMS); + distShellTest.setupInternal(NUM_NMS, + getConfiguration(multiNodePlacementEnabled)); } @After diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 0ecb841..babfbed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -141,18 +141,21 @@ @Before public void setup() throws Exception { - setupInternal(NUM_NMS, timelineVersionWatcher.getTimelineVersion()); + setupInternal(NUM_NMS, timelineVersionWatcher.getTimelineVersion(), + new YarnConfiguration()); } - protected void setupInternal(int numNodeManager) throws Exception { - setupInternal(numNodeManager, DEFAULT_TIMELINE_VERSION); + protected void setupInternal(int numNodeManager, + YarnConfiguration conf) throws Exception { + setupInternal(numNodeManager, DEFAULT_TIMELINE_VERSION, conf); } - private void setupInternal(int numNodeManager, float timelineVersion) + private void setupInternal(int numNodeManager, float timelineVersion, + YarnConfiguration conf) throws Exception { LOG.info("Starting up YARN cluster"); - conf = new YarnConfiguration(); + this.conf = conf; conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, MIN_ALLOCATION_MB); // reduce the teardown waiting time diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java index d71b9a0..b1b3402 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; +import org.apache.commons.collections.IteratorUtils; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector; @@ -26,9 +27,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.Iterator; @@ -59,14 +63,35 @@ protected SchedulerRequestKey schedulerRequestKey; protected RMContext rmContext; private AtomicInteger placementAttempt = new AtomicInteger(0); + private MultiNodeSortingManager multiNodeSortingManager = null; + private String multiNodeSortPolicyName; + + private static final Logger LOG = + LoggerFactory.getLogger(AppPlacementAllocator.class); /** * Get iterator of preferred node depends on requirement and/or availability. * @param candidateNodeSet input CandidateNodeSet * @return iterator of preferred node */ - public abstract Iterator getPreferredNodeIterator( - CandidateNodeSet candidateNodeSet); + public Iterator getPreferredNodeIterator( + CandidateNodeSet candidateNodeSet) { + // Now only handle the case that single node in the candidateNodeSet + // TODO, Add support to multi-hosts inside candidateNodeSet which is passed + // in. + + N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet); + if (singleNode != null) { + return IteratorUtils.singletonIterator(singleNode); + } + + // singleNode will be null if Multi-node placement lookup is enabled, and + // hence could consider sorting policies. + return multiNodeSortingManager.getMultiNodeSortIterator( + candidateNodeSet.getAllNodes().values(), + candidateNodeSet.getPartition(), + multiNodeSortPolicyName); + } /** * Replace existing pending asks by the new requests @@ -200,6 +225,17 @@ public void initialize(AppSchedulingInfo appSchedulingInfo, this.appSchedulingInfo = appSchedulingInfo; this.rmContext = rmContext; this.schedulerRequestKey = schedulerRequestKey; + multiNodeSortPolicyName = appSchedulingInfo + .getApplicationSchedulingEnvs().get( + ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS); + multiNodeSortingManager = (MultiNodeSortingManager) rmContext + .getMultiNodeSortingManager(); + if (LOG.isDebugEnabled()) { + LOG.debug( + "nodeLookupPolicy used for " + appSchedulingInfo.getApplicationId() + + " is " + ((multiNodeSortPolicyName != null) + ? multiNodeSortPolicyName : "")); + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java index a91e872..a440dc9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java @@ -32,7 +32,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; @@ -60,8 +59,6 @@ new ConcurrentHashMap<>(); private volatile String primaryRequestedPartition = RMNodeLabelsManager.NO_LABEL; - private MultiNodeSortingManager multiNodeSortingManager = null; - private String multiNodeSortPolicyName; private final ReentrantReadWriteLock.ReadLock readLock; private final ReentrantReadWriteLock.WriteLock writeLock; @@ -77,40 +74,6 @@ public LocalityAppPlacementAllocator() { public void initialize(AppSchedulingInfo appSchedulingInfo, SchedulerRequestKey schedulerRequestKey, RMContext rmContext) { super.initialize(appSchedulingInfo, schedulerRequestKey, rmContext); - multiNodeSortPolicyName = appSchedulingInfo - .getApplicationSchedulingEnvs().get( - ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS); - multiNodeSortingManager = (MultiNodeSortingManager) rmContext - .getMultiNodeSortingManager(); - if (LOG.isDebugEnabled()) { - LOG.debug( - "nodeLookupPolicy used for " + appSchedulingInfo - .getApplicationId() - + " is " + ((multiNodeSortPolicyName != null) ? - multiNodeSortPolicyName : - "")); - } - } - - @Override - @SuppressWarnings("unchecked") - public Iterator getPreferredNodeIterator( - CandidateNodeSet candidateNodeSet) { - // Now only handle the case that single node in the candidateNodeSet - // TODO, Add support to multi-hosts inside candidateNodeSet which is passed - // in. - - N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet); - if (singleNode != null) { - return IteratorUtils.singletonIterator(singleNode); - } - - // singleNode will be null if Multi-node placement lookup is enabled, and - // hence could consider sorting policies. - return multiNodeSortingManager.getMultiNodeSortIterator( - candidateNodeSet.getAllNodes().values(), - candidateNodeSet.getPartition(), - multiNodeSortPolicyName); } private boolean hasRequestLabelChanged(ResourceRequest requestOne, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java index 9898051..412c30e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java @@ -79,22 +79,6 @@ public SingleConstraintAppPlacementAllocator() { } @Override - @SuppressWarnings("unchecked") - public Iterator getPreferredNodeIterator( - CandidateNodeSet candidateNodeSet) { - // Now only handle the case that single node in the candidateNodeSet - // TODO, Add support to multi-hosts inside candidateNodeSet which is passed - // in. - - N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet); - if (null != singleNode) { - return IteratorUtils.singletonIterator(singleNode); - } - - return IteratorUtils.emptyIterator(); - } - - @Override public PendingAskUpdateResult updatePendingAsk( Collection requests, boolean recoverPreemptedRequestForAContainer) { -- 2.7.4 (Apple Git-66)