diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index 4d9922b..24818ba 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.net.Node; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.HostsFileReader; @@ -49,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; @@ -95,7 +97,7 @@ protected void serviceInit(Configuration conf) throws Exception { YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); this.hostsReader = createHostsFileReader(this.includesFile, this.excludesFile); - setDecomissionedNMsMetrics(); + setDecomissionedNMs(); printConfiguredHosts(); } catch (YarnException ex) { disableHostsFileReader(ex); @@ -157,9 +159,76 @@ private void refreshHostsReader(Configuration yarnConf) throws IOException, } } - private void setDecomissionedNMsMetrics() { + private void setDecomissionedNMs() { Set excludeList = hostsReader.getExcludedHosts(); - ClusterMetrics.getMetrics().setDecommisionedNMs(excludeList.size()); + for (final String host : excludeList) { + RMNodeImpl rmNode = new RMNodeImpl(new NodeId() { + + @Override + public String getHost() { + return host; + } + + @Override + protected void setHost(String host) { + + } + + @Override + public int getPort() { + return -1; + } + + @Override + protected void setPort(int port) { + + } + + @Override + protected void build() { + + } + }, rmContext, host, -1, -1, new Node() { + @Override + public String getNetworkLocation() { + return null; + } + + @Override + public void setNetworkLocation(String location) { + + } + + @Override + public String getName() { + return host; + } + + @Override + public Node getParent() { + return null; + } + + @Override + public void setParent(Node parent) { + + } + + @Override + public int getLevel() { + return 0; + } + + @Override + public void setLevel(int i) { + + } + }, null, null); + + rmContext.getRMNodes().putIfAbsent(rmNode.getNodeID(),rmNode); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeEvent(rmNode.getNodeID(), RMNodeEventType.DECOMMISSION)); + } } @VisibleForTesting @@ -334,7 +403,7 @@ private void disableHostsFileReader(Exception ex) { conf.get(YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); this.hostsReader = createHostsFileReader(this.includesFile, this.excludesFile); - setDecomissionedNMsMetrics(); + setDecomissionedNMs(); } catch (IOException ioe2) { // Should *never* happen this.hostsReader = null; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 8448287..31e7c98 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -172,6 +172,9 @@ .addTransition(NodeState.NEW, NodeState.NEW, RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition()) + .addTransition(NodeState.NEW, NodeState.DECOMMISSIONED, + RMNodeEventType.DECOMMISSION, + new DeactivateNodeTransition(NodeState.DECOMMISSIONED)) //Transitions from RUNNING state .addTransition(NodeState.RUNNING, @@ -691,6 +694,8 @@ private void updateMetricsForDeactivatedNode(NodeState initialState, case UNHEALTHY: metrics.decrNumUnhealthyNMs(); break; + case NEW: + break; default: LOG.warn("Unexpected initial state"); } @@ -748,7 +753,7 @@ private static void updateNodeResourceFromEvent(RMNodeImpl rmNode, SingleArcTransition { @Override - public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + public void transition(final RMNodeImpl rmNode, RMNodeEvent event) { // Inform the scheduler RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event; List containers = null; @@ -760,6 +765,15 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { rmNode.context.getInactiveRMNodes().remove(nodeId); rmNode.updateMetricsForRejoinedNode(previouRMNode.getState()); } else { + for (Map.Entry entry : rmNode.context + .getInactiveRMNodes().entrySet()) { + NodeId dummyInactiveNodeId = entry.getKey(); + if (dummyInactiveNodeId.getHost().equals(nodeId.getHost()) && + dummyInactiveNodeId.getPort() == -1) { + rmNode.context.getInactiveRMNodes().remove(dummyInactiveNodeId); + ClusterMetrics.getMetrics().decrDecommisionedNMs(); + } + } // Increment activeNodes explicitly because this is a new node. ClusterMetrics.getMetrics().incrNumActiveNodes(); containers = startEvent.getNMContainerStatuses(); @@ -969,6 +983,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { private final NodeState finalState; public DeactivateNodeTransition(NodeState finalState) { this.finalState = finalState; + LOG.info("KUHU setting state in Deactivate Transition"+finalState); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index e42ed91..8d109fa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -1184,6 +1184,50 @@ public void testInvalidNMUnregistration() throws Exception { checkDecommissionedNMCount(rm, ++decommisionedNMsCount); } + @Test(timeout = 4000) + public void testInitDecommMetric() throws Exception { + Configuration conf = new Configuration(); + rm = new MockRM(conf); + rm.start(); + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:5678", 10240); + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + writeToHostsFile("host1"); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile + .getAbsolutePath()); + rm.getNodesListManager().refreshNodes(conf); + rm.stop(); + rm = new MockRM(conf); + rm.start(); + nm1 = rm.registerNode("host1:1234", 5120); + nm2 = rm.registerNode("host2:5678", 10240); + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + Thread.currentThread().sleep(50); + Assert.assertEquals("Number of Decommissioned nodes should be 1", + ClusterMetrics.getMetrics().getNumDecommisionedNMs(), 1); + Assert.assertEquals("The inactiveRMNodes should contain an entry for the" + + "decommissioned node", rm.getRMContext().getInactiveRMNodes().size(), 1); + + writeToHostsFile(""); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile + .getAbsolutePath()); + rm.getNodesListManager().refreshNodes(conf); + nm1 = rm.registerNode("host1:1234", 5120); + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + Thread.currentThread().sleep(50); + Assert.assertEquals("The decommissioned nodes metric should have " + + "decremented to 0", + ClusterMetrics.getMetrics().getNumDecommisionedNMs(), 0); + Assert.assertEquals("The active nodes metric should be 2", + ClusterMetrics.getMetrics().getNumActiveNMs(), 2); + Assert.assertEquals("The inactive RMNodes entry should have been removed", + rm.getRMContext().getInactiveRMNodes().size(), 0); + } + private void writeToHostsFile(String... hosts) throws IOException { if (!hostFile.exists()) { TEMP_DIR.mkdirs();