diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index 1ad74bf..59f2fb1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -30,10 +30,12 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -41,9 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.*; import com.google.common.annotations.VisibleForTesting; @@ -133,19 +133,61 @@ private void refreshHostsReader(Configuration yarnConf) throws IOException, yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); hostsReader.updateFileNames(includesFile, excludesFile); + + Set preExclude = new HashSet( + hostsReader.getExcludedHosts()); + hostsReader.refresh( includesFile.isEmpty() ? null : this.rmContext .getConfigurationProvider().getConfigurationInputStream( this.conf, includesFile), excludesFile.isEmpty() ? null : this.rmContext.getConfigurationProvider() .getConfigurationInputStream(this.conf, excludesFile)); + updateInactiveRMnodes(hostsReader.getExcludedHosts(), preExclude); printConfiguredHosts(); } } + public void updateInactiveRMnodes(Set excludes, + Set preExcludes) { + //remove nodes which not excluded any more + if (preExcludes != null) { + preExcludes.removeAll(excludes); + for (String host : preExcludes) { + NodeId nodeId = NodeId.newInstance(host, 0); + if (null != rmContext.getInactiveRMNodes().remove(nodeId)) { + ClusterMetrics.getMetrics().decrDecommisionedNMs(); + } + } + } + + HashSet excludedNodes = new HashSet<>(excludes); + + //current running nodes will be handle in refreshNodes, ignore them here + for (NodeId nodeId: rmContext.getRMNodes().keySet()) { + excludedNodes.remove(nodeId.getHost()); + //may contain IP address + excludedNodes.remove(NetUtils.normalizeHostName(nodeId.getHost())); + } + + Resource empty = Resource.newInstance(0, 0); + for (String host: excludedNodes) { + NodeId nodeId = NodeId.newInstance(host, 0); //use port 0 to fake a node + if (rmContext.getInactiveRMNodes().containsKey(nodeId)) { + continue; //already excluded + } + + RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, 0, 0, + new NodeBase(host, ""), empty, "N/A"); + rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); + rmContext.getDispatcher().getEventHandler().handle( + new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); + } + } + private void setDecomissionedNMsMetrics() { Set excludeList = hostsReader.getExcludedHosts(); - ClusterMetrics.getMetrics().setDecommisionedNMs(excludeList.size()); + updateInactiveRMnodes(excludeList, null); } public boolean isValidNode(String hostName) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 09b9278..466c984 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -148,6 +148,8 @@ .addTransition(NodeState.NEW, NodeState.NEW, RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition()) + .addTransition(NodeState.NEW, NodeState.DECOMMISSIONED, + RMNodeEventType.DECOMMISSION, new ExcludeNodeTransition()) //Transitions from RUNNING state .addTransition(NodeState.RUNNING, @@ -593,6 +595,17 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } } + public static class ExcludeNodeTransition implements + SingleArcTransition { + + @Override + public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + rmNode.context.getRMNodes().remove(rmNode.nodeId); + rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode); + ClusterMetrics.getMetrics().incrDecommisionedNMs(); + } + } + public static class ReconnectNodeTransition implements SingleArcTransition { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index de17acd..d11946d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -1903,8 +1903,15 @@ protected Dispatcher createDispatcher() { ClusterMetrics.getMetrics().getNumDecommisionedNMs()); // restart RM. - rm2 = new MockRM(conf); + final DrainDispatcher dispatcher2 = new DrainDispatcher(); + rm2 = new MockRM(conf) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher2; + } + }; rm2.start(); + dispatcher2.await(); Assert .assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 94a0e4c..aa8390b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -191,11 +191,12 @@ protected Dispatcher createDispatcher() { // To test that IPs also work String ip = NetUtils.normalizeHostName("localhost"); - writeToHostsFile("host2", ip); + writeToHostsFile("host2", ip, "host3"); rm.getNodesListManager().refreshNodes(conf); - checkDecommissionedNMCount(rm, metricCount + 2); + dispatcher.await(); + checkDecommissionedNMCount(rm, metricCount + 3); nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); @@ -210,6 +211,8 @@ protected Dispatcher createDispatcher() { writeToHostsFile(""); rm.getNodesListManager().refreshNodes(conf); + dispatcher.await(); + checkDecommissionedNMCount(rm, metricCount + 2); nm3 = rm.registerNode("localhost:4433", 1024); dispatcher.await();