diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index 4d9922b..b887fed 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.net.Node; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.HostsFileReader; @@ -49,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; @@ -95,7 +97,7 @@ protected void serviceInit(Configuration conf) throws Exception { YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); this.hostsReader = createHostsFileReader(this.includesFile, this.excludesFile); - setDecomissionedNMsMetrics(); + setDecomissionedNMs(); printConfiguredHosts(); } catch (YarnException ex) { disableHostsFileReader(ex); @@ -157,9 +159,24 @@ private void refreshHostsReader(Configuration yarnConf) throws IOException, } } - private void setDecomissionedNMsMetrics() { + private void setDecomissionedNMs() { Set excludeList = hostsReader.getExcludedHosts(); - ClusterMetrics.getMetrics().setDecommisionedNMs(excludeList.size()); + for (final String host : excludeList) { + DummyNodeId nodeId = new DummyNodeId(host); + RMNodeImpl rmNode = new RMNodeImpl(nodeId, + rmContext, host, -1, -1, new DummyNode(host), null, null); + + RMNode prevRMNode = + rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); + if (prevRMNode != null) { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeEvent(prevRMNode.getNodeID(), + RMNodeEventType.DECOMMISSION)); + } else { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); + } + } } @VisibleForTesting @@ -334,7 +351,7 @@ private void disableHostsFileReader(Exception ex) { conf.get(YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); this.hostsReader = createHostsFileReader(this.includesFile, this.excludesFile); - setDecomissionedNMsMetrics(); + setDecomissionedNMs(); } catch (IOException ioe2) { // Should *never* happen this.hostsReader = null; @@ -417,4 +434,93 @@ public void refreshNodesForcefully() { } } } + + /** + * A Dummy NodeId needed upon startup for populating inactive nodes Map. + */ + public static class DummyNodeId extends NodeId { + + private String host; + + public DummyNodeId(String host) { + this.host = host; + } + + @Override + public String getHost() { + return this.host; + } + + @Override + protected void setHost(String hst) { + + } + + @Override + public int getPort() { + return -1; + } + + @Override + protected void setPort(int port) { + + } + + @Override + protected void build() { + + } + } + + private static class DummyNode implements Node { + + private String host; + + public DummyNode(String host) { + this.host = host; + } + + @Override + public String getNetworkLocation() { + return null; + } + + @Override + public void setNetworkLocation(String location) { + + } + + @Override + public String getName() { + return host; + } + + @Override + public Node getParent() { + return null; + } + + @Override + public void setParent(Node parent) { + + } + + @Override + public int getLevel() { + return 0; + } + + @Override + public void setLevel(int i) { + + } + + public String getHost() { + return host; + } + + public void setHost(String hst) { + this.host = hst; + } + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 3873e5f..85475fb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -172,6 +173,9 @@ .addTransition(NodeState.NEW, NodeState.NEW, RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition()) + .addTransition(NodeState.NEW, NodeState.DECOMMISSIONED, + RMNodeEventType.DECOMMISSION, + new DeactivateNodeTransition(NodeState.DECOMMISSIONED)) //Transitions from RUNNING state .addTransition(NodeState.RUNNING, @@ -691,6 +695,8 @@ private void updateMetricsForDeactivatedNode(NodeState initialState, case UNHEALTHY: metrics.decrNumUnhealthyNMs(); break; + case NEW: + break; default: LOG.warn("Unexpected initial state"); } @@ -766,14 +772,21 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { // Inform the scheduler RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event; List containers = null; - - NodeId nodeId = rmNode.nodeId; + RMNode previousRMNode; + final NodeId nodeId = rmNode.nodeId; if (rmNode.context.getInactiveRMNodes().containsKey(nodeId)) { // Old node rejoining - RMNode previouRMNode = rmNode.context.getInactiveRMNodes().get(nodeId); + previousRMNode = rmNode.context.getInactiveRMNodes().get(nodeId); rmNode.context.getInactiveRMNodes().remove(nodeId); - rmNode.updateMetricsForRejoinedNode(previouRMNode.getState()); + rmNode.updateMetricsForRejoinedNode(previousRMNode.getState()); } else { + NodesListManager.DummyNodeId dummyNodeId = + new NodesListManager.DummyNodeId(nodeId.getHost()); + previousRMNode = rmNode.context.getInactiveRMNodes().get(dummyNodeId); + if (previousRMNode != null) { + rmNode.context.getInactiveRMNodes().remove(dummyNodeId); + ClusterMetrics.getMetrics().decrDecommisionedNMs(); + } // Increment activeNodes explicitly because this is a new node. ClusterMetrics.getMetrics().incrNumActiveNodes(); containers = startEvent.getNMContainerStatuses(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index bad68f4..d9aea90 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -1922,8 +1922,15 @@ protected Dispatcher createDispatcher() { ClusterMetrics.getMetrics().getNumDecommisionedNMs()); // restart RM. - rm2 = new MockRM(conf); + final DrainDispatcher dispatcher1 = new DrainDispatcher(); + rm2 = new MockRM(conf) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher1; + } + }; rm2.start(); + dispatcher1.await(); Assert .assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index e42ed91..e711c4b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -112,7 +112,7 @@ public void testGetNextHeartBeatInterval() throws Exception { @Test public void testDecommissionWithIncludeHosts() throws Exception { - writeToHostsFile("localhost", "host1", "host2"); + writeToHostsFile(hostFile, "localhost", "host1", "host2"); Configuration conf = new Configuration(); conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile .getAbsolutePath()); @@ -137,7 +137,7 @@ public void testDecommissionWithIncludeHosts() throws Exception { // To test that IPs also work String ip = NetUtils.normalizeHostName("localhost"); - writeToHostsFile("host1", ip); + writeToHostsFile(hostFile, "host1", ip); rm.getNodesListManager().refreshNodes(conf); @@ -167,7 +167,7 @@ public void testDecommissionWithExcludeHosts() throws Exception { conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile .getAbsolutePath()); - writeToHostsFile(""); + writeToHostsFile(hostFile, ""); final DrainDispatcher dispatcher = new DrainDispatcher(); rm = new MockRM(conf) { @Override @@ -192,7 +192,7 @@ protected Dispatcher createDispatcher() { // To test that IPs also work String ip = NetUtils.normalizeHostName("localhost"); - writeToHostsFile("host2", ip); + writeToHostsFile(hostFile, "host2", ip); rm.getNodesListManager().refreshNodes(conf); @@ -209,7 +209,7 @@ protected Dispatcher createDispatcher() { NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction())); dispatcher.await(); - writeToHostsFile(""); + writeToHostsFile(hostFile, ""); rm.getNodesListManager().refreshNodes(conf); nm3 = rm.registerNode("localhost:4433", 1024); @@ -243,7 +243,7 @@ public void testAddNewIncludePathToConfiguration() throws Exception { Assert.assertEquals( NodeAction.NORMAL, nodeHeartbeat.getNodeAction()); - writeToHostsFile("host1"); + writeToHostsFile(hostFile, "host1"); conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile .getAbsolutePath()); rm.getNodesListManager().refreshNodes(conf); @@ -280,7 +280,7 @@ public void testAddNewExcludePathToConfiguration() throws Exception { Assert.assertEquals( NodeAction.NORMAL, nodeHeartbeat.getNodeAction()); - writeToHostsFile("host2"); + writeToHostsFile(hostFile, "host2"); conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile .getAbsolutePath()); rm.getNodesListManager().refreshNodes(conf); @@ -298,7 +298,7 @@ public void testAddNewExcludePathToConfiguration() throws Exception { @Test public void testNodeRegistrationSuccess() throws Exception { - writeToHostsFile("host2"); + writeToHostsFile(hostFile, "host2"); Configuration conf = new Configuration(); conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile .getAbsolutePath()); @@ -322,7 +322,7 @@ public void testNodeRegistrationSuccess() throws Exception { @Test public void testNodeRegistrationWithLabels() throws Exception { - writeToHostsFile("host2"); + writeToHostsFile(hostFile, "host2"); Configuration conf = new Configuration(); conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile.getAbsolutePath()); @@ -371,7 +371,7 @@ protected RMNodeLabelsManager createNodeLabelManager() { @Test public void testNodeRegistrationWithInvalidLabels() throws Exception { - writeToHostsFile("host2"); + writeToHostsFile(hostFile, "host2"); Configuration conf = new Configuration(); conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile.getAbsolutePath()); @@ -424,7 +424,7 @@ protected RMNodeLabelsManager createNodeLabelManager() { @Test public void testNodeRegistrationWithInvalidLabelsSyntax() throws Exception { - writeToHostsFile("host2"); + writeToHostsFile(hostFile, "host2"); Configuration conf = new Configuration(); conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile.getAbsolutePath()); @@ -477,7 +477,7 @@ protected RMNodeLabelsManager createNodeLabelManager() { @Test public void testNodeRegistrationWithCentralLabelConfig() throws Exception { - writeToHostsFile("host2"); + writeToHostsFile(hostFile, "host2"); Configuration conf = new Configuration(); conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile.getAbsolutePath()); @@ -536,7 +536,7 @@ private NodeStatus getNodeStatusObject(NodeId nodeId) { @Test public void testNodeHeartBeatWithLabels() throws Exception { - writeToHostsFile("host2"); + writeToHostsFile(hostFile, "host2"); Configuration conf = new Configuration(); conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile.getAbsolutePath()); @@ -621,7 +621,7 @@ protected RMNodeLabelsManager createNodeLabelManager() { @Test public void testNodeHeartBeatWithInvalidLabels() throws Exception { - writeToHostsFile("host2"); + writeToHostsFile(hostFile, "host2"); Configuration conf = new Configuration(); conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile.getAbsolutePath()); @@ -681,7 +681,7 @@ protected RMNodeLabelsManager createNodeLabelManager() { @Test public void testNodeHeartbeatWithCentralLabelConfig() throws Exception { - writeToHostsFile("host2"); + writeToHostsFile(hostFile, "host2"); Configuration conf = new Configuration(); conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile.getAbsolutePath()); @@ -738,7 +738,7 @@ protected RMNodeLabelsManager createNodeLabelManager() { @Test public void testNodeRegistrationVersionLessThanRM() throws Exception { - writeToHostsFile("host2"); + writeToHostsFile(hostFile, "host2"); Configuration conf = new Configuration(); conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile .getAbsolutePath()); @@ -768,7 +768,7 @@ public void testNodeRegistrationVersionLessThanRM() throws Exception { @Test public void testNodeRegistrationFailure() throws Exception { - writeToHostsFile("host1"); + writeToHostsFile(hostFile, "host1"); Configuration conf = new Configuration(); conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile .getAbsolutePath()); @@ -802,7 +802,7 @@ public void testSetRMIdentifierInRegistration() throws Exception { // Verify the RMIdentifier is correctly set in RegisterNodeManagerResponse Assert.assertEquals(ResourceManager.getClusterTimeStamp(), - response.getRMIdentifier()); + response.getRMIdentifier()); } @Test @@ -1155,7 +1155,7 @@ public void testInvalidNMUnregistration() throws Exception { MockNM nm1 = new MockNM("host1:1234", 5120, resourceTrackerService); RegisterNodeManagerResponse response = nm1.registerNode(); Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction()); - writeToHostsFile("host2"); + writeToHostsFile(hostFile, "host2"); conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile.getAbsolutePath()); rm.getNodesListManager().refreshNodes(conf); @@ -1174,7 +1174,7 @@ public void testInvalidNMUnregistration() throws Exception { MockNM nm2 = new MockNM("host2:1234", 5120, resourceTrackerService); RegisterNodeManagerResponse response2 = nm2.registerNode(); Assert.assertEquals(NodeAction.NORMAL, response2.getNodeAction()); - writeToHostsFile("host1"); + writeToHostsFile(hostFile, "host1"); conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile.getAbsolutePath()); rm.getNodesListManager().refreshNodes(conf); @@ -1184,7 +1184,128 @@ public void testInvalidNMUnregistration() throws Exception { checkDecommissionedNMCount(rm, ++decommisionedNMsCount); } - private void writeToHostsFile(String... hosts) throws IOException { + @Test(timeout = 4000) + public void testInitDecommMetric() throws Exception { + testInitDecommMetricHelper(true); + testInitDecommMetricHelper(false); + } + + public void testInitDecommMetricHelper(boolean hasIncludeList) + throws Exception { + Configuration conf = new Configuration(); + final DrainDispatcher dispatcher = new DrainDispatcher(); + rm = new MockRM(conf) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm.start(); + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:5678", 10240); + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + File excludeHostFile = + new File(TEMP_DIR + File.separator + "excludeHostFile.txt"); + writeToHostsFile(excludeHostFile, "host1"); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + excludeHostFile.getAbsolutePath()); + + if (hasIncludeList) { + writeToHostsFile(hostFile, "host1", "host2"); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + } + rm.getNodesListManager().refreshNodes(conf); + rm.stop(); + dispatcher.stop(); + final DrainDispatcher dispatcher1 = new DrainDispatcher(); + MockRM rm1 = new MockRM(conf) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher1; + } + }; + rm1.start(); + nm1 = rm1.registerNode("host1:1234", 5120); + nm2 = rm1.registerNode("host2:5678", 10240); + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + dispatcher1.await(); + Assert.assertEquals("Number of Decommissioned nodes should be 1", + 1, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + Assert.assertEquals("The inactiveRMNodes should contain an entry for the" + + "decommissioned node", + 1, rm1.getRMContext().getInactiveRMNodes().size()); + excludeHostFile = + new File(TEMP_DIR + File.separator + "excludeHostFile.txt"); + writeToHostsFile(excludeHostFile, ""); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + excludeHostFile.getAbsolutePath()); + rm1.getNodesListManager().refreshNodes(conf); + nm1 = rm1.registerNode("host1:1234", 5120); + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + dispatcher1.await(); + Assert.assertEquals("The decommissioned nodes metric should have " + + "decremented to 0", + 0, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + Assert.assertEquals("The active nodes metric should be 2", + 2, ClusterMetrics.getMetrics().getNumActiveNMs()); + Assert.assertEquals("The inactive RMNodes entry should have been removed", + 0, rm1.getRMContext().getInactiveRMNodes().size()); + rm1.stop(); + dispatcher1.stop(); + } + + @Test(timeout = 4000) + public void testInitDecommMetricNoRegistration() throws Exception { + Configuration conf = new Configuration(); + final DrainDispatcher dispatcher = new DrainDispatcher(); + rm = new MockRM(conf) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm.start(); + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:5678", 10240); + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + //host3 will not register or heartbeat + File excludeHostFile = + new File(TEMP_DIR + File.separator + "excludeHostFile.txt"); + writeToHostsFile(excludeHostFile, "host3", "host2"); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + excludeHostFile.getAbsolutePath()); + writeToHostsFile(hostFile, "host1", "host2"); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + rm.getNodesListManager().refreshNodes(conf); + dispatcher.await(); + Assert.assertEquals("The decommissioned nodes metric should be 1 ", + 1, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + rm.stop(); + + final DrainDispatcher dispatcher1 = new DrainDispatcher(); + MockRM rm1 = new MockRM(conf) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher1; + } + }; + rm1.start(); + rm1.getNodesListManager().refreshNodes(conf); + dispatcher1.await(); + Assert.assertEquals("The decommissioned nodes metric should be 2 ", + 2, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + rm1.stop(); + } + + private void writeToHostsFile(File hostFile, String... hosts) + throws IOException { if (!hostFile.exists()) { TEMP_DIR.mkdirs(); hostFile.createNewFile();