From bfb3bd018d6dbe5cfec77a80d69c4b3baa51deb1 Mon Sep 17 00:00:00 2001 From: Prabhu Joseph Date: Mon, 20 Jul 2020 15:47:00 +0530 Subject: [PATCH] YARN-10352. Skip schedule on not heartbeated nodes in Multi Node Placement --- .../scheduler/capacity/CapacityScheduler.java | 67 +++++++++++++++------- .../scheduler/placement/MultiNodeSorter.java | 2 +- .../placement/MultiNodeSortingManager.java | 23 +++++++- .../capacity/TestCapacitySchedulerMultiNodes.java | 38 ++++++++++++ ...tCapacitySchedulerMultiNodesWithPreemption.java | 1 + 5 files changed, 108 insertions(+), 23 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index bd2acd7..81f1d64 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -229,8 +229,7 @@ public Configuration getConf() { private RMNodeLabelsManager labelManager; private AppPriorityACLsManager appPriorityACLManager; private boolean multiNodePlacementEnabled; - - private static boolean printedVerboseLoggingForAsyncScheduling = false; + private boolean printedVerboseLoggingForAsyncScheduling; /** * EXPERT @@ -513,7 +512,8 @@ private static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node, Time.monotonicNow() - node.getLastHeartbeatMonotonicTime(); if (timeElapsedFromLastHeartbeat > cs.nmHeartbeatInterval * 2) { if (printVerboseLog && LOG.isDebugEnabled()) { - LOG.debug("Skip scheduling on node because it haven't heartbeated for " + LOG.debug("Skip scheduling on node " + node.getNodeID() + + " because it haven't heartbeated for " + timeElapsedFromLastHeartbeat / 1000.0f + " secs"); } return true; @@ -521,6 +521,20 @@ private static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node, return false; } + private static boolean isPrintSkippedNodeLogging(CapacityScheduler cs) { + // To avoid too verbose DEBUG logging, only print debug log once for + // every 10 secs. + boolean printSkipedNodeLogging = false; + if (LOG.isDebugEnabled()) { + if (Time.monotonicNow() / 1000 % 10 == 0) { + printSkipedNodeLogging = (!cs.printedVerboseLoggingForAsyncScheduling); + } else { + cs.printedVerboseLoggingForAsyncScheduling = false; + } + } + return printSkipedNodeLogging; + } + /** * Schedule on all nodes by starting at a random point. * @param cs @@ -537,20 +551,12 @@ static void schedule(CapacityScheduler cs) throws InterruptedException{ return; } int start = random.nextInt(nodeSize); - - // To avoid too verbose DEBUG logging, only print debug log once for - // every 10 secs. - boolean printSkipedNodeLogging = false; - if (Time.monotonicNow() / 1000 % 10 == 0) { - printSkipedNodeLogging = (!printedVerboseLoggingForAsyncScheduling); - } else { - printedVerboseLoggingForAsyncScheduling = false; - } + boolean printSkippedNodeLogging = isPrintSkippedNodeLogging(cs); // Allocate containers of node [start, end) for (FiCaSchedulerNode node : nodes) { if (current++ >= start) { - if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) { + if (shouldSkipNodeSchedule(node, cs, printSkippedNodeLogging)) { continue; } cs.allocateContainersToNode(node.getNodeID(), false); @@ -564,14 +570,14 @@ static void schedule(CapacityScheduler cs) throws InterruptedException{ if (current++ > start) { break; } - if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) { + if (shouldSkipNodeSchedule(node, cs, printSkippedNodeLogging)) { continue; } cs.allocateContainersToNode(node.getNodeID(), false); } - if (printSkipedNodeLogging) { - printedVerboseLoggingForAsyncScheduling = true; + if (printSkippedNodeLogging) { + cs.printedVerboseLoggingForAsyncScheduling = true; } Thread.sleep(cs.getAsyncScheduleInterval()); @@ -1488,16 +1494,35 @@ private boolean canAllocateMore(CSAssignment assignment, int offswitchCount, || assignedContainers < maxAssignPerHeartbeat); } + private Map getNodesHeartbeated(String partition) { + Map nodesByPartition = new HashMap<>(); + boolean printSkippedNodeLogging = isPrintSkippedNodeLogging(this); + + List nodesPerPartition = nodeTracker + .getNodesPerPartition(partition); + if (nodesPerPartition == null) { + return nodesByPartition; + } + + for (FiCaSchedulerNode node : nodesPerPartition) { + if (!shouldSkipNodeSchedule(node, this, printSkippedNodeLogging)) { + nodesByPartition.put(node.getNodeID(), node); + } + } + if (printSkippedNodeLogging) { + printedVerboseLoggingForAsyncScheduling = true; + } + return nodesByPartition; + } + private CandidateNodeSet getCandidateNodeSet( FiCaSchedulerNode node) { CandidateNodeSet candidates = null; candidates = new SimpleCandidateNodeSet<>(node); if (multiNodePlacementEnabled) { - Map nodesByPartition = new HashMap<>(); - List nodes = nodeTracker - .getNodesPerPartition(node.getPartition()); - if (nodes != null && !nodes.isEmpty()) { - nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n)); + Map nodesByPartition = + getNodesHeartbeated(node.getPartition()); + if (!nodesByPartition.isEmpty()) { candidates = new SimpleCandidateNodeSet( nodesByPartition, node.getPartition()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java index a757ea5..3e3a73f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java @@ -135,7 +135,7 @@ public void reSortClusterNodes() { Map nodesByPartition = new HashMap<>(); List nodes = ((AbstractYarnScheduler) rmContext .getScheduler()).getNodeTracker().getNodesPerPartition(label); - if (nodes != null && !nodes.isEmpty()) { + if (nodes != null) { nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n)); multiNodePolicy.addAndRefreshNodesSet( (Collection) nodesByPartition.values(), label); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java index c8a7e66..f5ea3d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java @@ -24,8 +24,13 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterators; + import org.apache.commons.collections.IteratorUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -48,6 +53,7 @@ private Set policySpecs = new HashSet(); private Configuration conf; private boolean multiNodePlacementEnabled; + private long nmHeartbeatInterval; public MultiNodeSortingManager() { super("MultiNodeSortingManager"); @@ -59,6 +65,9 @@ public void serviceInit(Configuration configuration) throws Exception { LOG.info("Initializing NodeSortingService=" + getName()); super.serviceInit(configuration); this.conf = configuration; + nmHeartbeatInterval = this.conf.getLong( + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS); } @Override @@ -106,6 +115,17 @@ public void registerMultiNodePolicyNames( .join(policySpecs.iterator(), ",")); } + // Skip node which missed 2 heartbeats since the node might be dead and + // we should not continue allocate containers on that. + private Predicate heartbeatFilter = new Predicate() { + @Override + public boolean apply(final N node) { + long timeElapsedFromLastHeartbeat = + Time.monotonicNow() - node.getLastHeartbeatMonotonicTime(); + return timeElapsedFromLastHeartbeat <= (nmHeartbeatInterval * 2); + } + }; + public Iterator getMultiNodeSortIterator(Collection nodes, String partition, String policyName) { // nodeLookupPolicy can be null if app is configured with invalid policy. @@ -134,6 +154,7 @@ public void registerMultiNodePolicyNames( policy.addAndRefreshNodesSet(nodes, partition); } - return policy.getPreferredNodeIterator(nodes, partition); + return Iterators.filter(policy.getPreferredNodeIterator(nodes, partition), + heartbeatFilter); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java index 29de815..b20f8e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java @@ -19,11 +19,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.collect.Iterators; + import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -437,4 +440,39 @@ public void run() { rm1.close(); } + + @Test + public void testMultiNodeSorterAfterHeartbeatInterval() throws Exception { + MockRM rm = new MockRM(conf); + rm.start(); + rm.registerNode("127.0.0.1:1234", 10 * GB); + rm.registerNode("127.0.0.2:1234", 10 * GB); + rm.registerNode("127.0.0.3:1234", 10 * GB); + rm.registerNode("127.0.0.4:1234", 10 * GB); + + Set nodes = new HashSet<>(); + String partition = ""; + + ResourceScheduler scheduler = rm.getRMContext().getScheduler(); + waitforNMRegistered(scheduler, 4, 5); + MultiNodeSortingManager mns = rm.getRMContext() + .getMultiNodeSortingManager(); + MultiNodeSorter sorter = mns + .getMultiNodePolicy(POLICY_CLASS_NAME); + sorter.reSortClusterNodes(); + + Iterator nodeIterator = mns.getMultiNodeSortIterator( + nodes, partition, POLICY_CLASS_NAME); + Assert.assertEquals(4, Iterators.size(nodeIterator)); + + // Validate the count after missing 3 node heartbeats + Thread.sleep(YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS * 3); + + nodeIterator = mns.getMultiNodeSortIterator( + nodes, partition, POLICY_CLASS_NAME); + Assert.assertEquals(0, Iterators.size(nodeIterator)); + + rm.stop(); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodesWithPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodesWithPreemption.java index 65e0a17..e1435ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodesWithPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodesWithPreemption.java @@ -111,6 +111,7 @@ public void setUp() { conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, ProportionalCapacityPreemptionPolicy.class.getCanonicalName()); conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + conf.setLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 60000); } @Test(timeout=60000) -- 2.7.4 (Apple Git-66)