From 2c625ad1a696b0fadd9095f09561405b2fcfd80d Mon Sep 17 00:00:00 2001 From: Prabhu Joseph Date: Fri, 17 Jul 2020 18:50:31 +0530 Subject: [PATCH] YARN-10352. Skip schedule on not heartbeated nodes in Multi Node Placement --- .../scheduler/AbstractYarnScheduler.java | 1 + .../scheduler/ClusterNodeTracker.java | 45 ++++++++++++++++++++++ .../scheduler/capacity/CapacityScheduler.java | 5 ++- .../scheduler/placement/MultiNodeSorter.java | 4 +- .../capacity/TestCapacitySchedulerMultiNodes.java | 36 +++++++++++++++++ 5 files changed, 87 insertions(+), 4 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 70d2714..d96a53f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -215,6 +215,7 @@ public void serviceInit(Configuration conf) throws Exception { YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS); nodeTracker.setConfiguredMaxAllocationWaitTime( configuredMaximumAllocationWaitTime); + nodeTracker.setNMHeartbeatInterval(nmHeartbeatInterval); maxClusterLevelAppPriority = getMaxPriorityFromConf(conf); if (!migration) { this.releaseCache = new Timer("Pending Container Clear Timer"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java index 50c45fc..15ee5c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java @@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -36,6 +37,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Set; import java.util.TreeSet; @@ -72,6 +74,9 @@ private boolean forceConfiguredMaxAllocation = true; private long configuredMaxAllocationWaitTime; private boolean reportedMaxAllocation = false; + private long nmHeartbeatInterval; + + private static boolean printVerboseLogging = false; public ClusterNodeTracker() { maxAllocation = new long[ResourceUtils.getNumberOfCountableResourceTypes()]; @@ -270,6 +275,10 @@ public void setForceConfiguredMaxAllocation(boolean flag) { } } + public void setNMHeartbeatInterval(long nmHeartbeatInterval) { + this.nmHeartbeatInterval = nmHeartbeatInterval; + } + private void updateMaxResources(SchedulerNode node, boolean add) { Resource totalResource = node.getTotalResource(); ResourceInformation[] totalResources; @@ -493,4 +502,40 @@ public void updateNodesPerPartition(String partition, Set nodeIds) { } return nodesPerPartition; } + + public List getNodesHeartbeated(String partition) { + List nodesPerPartition = getNodesPerPartition(partition); + if (nodesPerPartition == null) { + return null; + } + + // To avoid too verbose DEBUG logging, only print debug log once for + // every 10 secs. + boolean printSkipedNodeLogging = false; + if (LOG.isDebugEnabled()) { + if (Time.monotonicNow() / 1000 % 10 == 0) { + printSkipedNodeLogging = (!printVerboseLogging); + } else { + printVerboseLogging = false; + } + } + + ListIterator nodesIter = nodesPerPartition.listIterator(); + while (nodesIter.hasNext()) { + N node = nodesIter.next(); + // Skip node which missed 2 heartbeats since the node might be dead and + // we should not continue allocate containers on that. + long timeElapsedFromLastHeartbeat = + Time.monotonicNow() - node.getLastHeartbeatMonotonicTime(); + if (timeElapsedFromLastHeartbeat > this.nmHeartbeatInterval * 2) { + if (LOG.isDebugEnabled() && printSkipedNodeLogging) { + LOG.debug("Skip scheduling on node " + node.getNodeID() + + " because it haven't heartbeated for " + + timeElapsedFromLastHeartbeat / 1000.0f + " secs"); + } + nodesIter.remove(); + } + } + return nodesPerPartition; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index bd2acd7..66a361f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -513,7 +513,8 @@ private static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node, Time.monotonicNow() - node.getLastHeartbeatMonotonicTime(); if (timeElapsedFromLastHeartbeat > cs.nmHeartbeatInterval * 2) { if (printVerboseLog && LOG.isDebugEnabled()) { - LOG.debug("Skip scheduling on node because it haven't heartbeated for " + LOG.debug("Skip scheduling on node " + node.getNodeID() + + " because it haven't heartbeated for " + timeElapsedFromLastHeartbeat / 1000.0f + " secs"); } return true; @@ -1495,7 +1496,7 @@ private boolean canAllocateMore(CSAssignment assignment, int offswitchCount, if (multiNodePlacementEnabled) { Map nodesByPartition = new HashMap<>(); List nodes = nodeTracker - .getNodesPerPartition(node.getPartition()); + .getNodesHeartbeated(node.getPartition()); if (nodes != null && !nodes.isEmpty()) { nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n)); candidates = new SimpleCandidateNodeSet( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java index a757ea5..75b092b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java @@ -134,8 +134,8 @@ public void reSortClusterNodes() { for (String label : nodeLabels) { Map nodesByPartition = new HashMap<>(); List nodes = ((AbstractYarnScheduler) rmContext - .getScheduler()).getNodeTracker().getNodesPerPartition(label); - if (nodes != null && !nodes.isEmpty()) { + .getScheduler()).getNodeTracker().getNodesHeartbeated(label); + if (nodes != null) { nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n)); multiNodePolicy.addAndRefreshNodesSet( (Collection) nodesByPartition.values(), label); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java index 29de815..cebdcd7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java @@ -437,4 +437,40 @@ public void run() { rm1.close(); } + + @Test + public void testMultiNodeSorterAfterHeartbeatInterval() throws Exception { + CapacitySchedulerConfiguration config = new CapacitySchedulerConfiguration(conf); + config.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + config.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true); + + MockRM rm = new MockRM(conf); + rm.start(); + rm.registerNode("127.0.0.1:1234", 10 * GB); + rm.registerNode("127.0.0.2:1234", 10 * GB); + rm.registerNode("127.0.0.3:1234", 10 * GB); + rm.registerNode("127.0.0.4:1234", 10 * GB); + + ResourceScheduler scheduler = rm.getRMContext().getScheduler(); + waitforNMRegistered(scheduler, 4, 5); + MultiNodeSortingManager mns = rm.getRMContext() + .getMultiNodeSortingManager(); + MultiNodeSorter sorter = mns + .getMultiNodePolicy(POLICY_CLASS_NAME); + sorter.reSortClusterNodes(); + Set nodes = sorter.getMultiNodeLookupPolicy() + .getNodesPerPartition(""); + Assert.assertEquals(4, nodes.size()); + + // Validate the count after missing 3 node heartbeats + Thread.sleep(YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS * 3); + + sorter.reSortClusterNodes(); + nodes = sorter.getMultiNodeLookupPolicy() + .getNodesPerPartition(""); + Assert.assertEquals(0, nodes.size()); + + rm.stop(); + } + } -- 2.7.4 (Apple Git-66)