diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index b87260ed47e..4b88e99aeab 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -115,6 +115,7 @@ protected void serviceInit(Configuration conf) throws Exception { this.hostsReader = createHostsFileReader(this.includesFile, this.excludesFile); setDecommissionedNMs(); + setUnregisteredNMs(); printConfiguredHosts(); } catch (YarnException ex) { disableHostsFileReader(ex); @@ -149,7 +150,7 @@ public void run() { if (result != null) { decrInactiveNMMetrics(rmNode); LOG.info("Removed " +result.getState().toString() + " node " - + result.getHostName() + " from inactive nodes list"); + + result.getHostName()+result.getCommandPort() + " from inactive nodes list"); } } } else { @@ -258,6 +259,20 @@ private void setDecommissionedNMs() { } } + private void setUnregisteredNMs() { + Set includeList = hostsReader.getHosts(); + for (final String host : includeList) { + if (!hostsReader.getExcludedHosts().contains(host)) { + NodeId nodeId = createUnknownNodeId(host); + RMNodeImpl rmNode = new RMNodeImpl(nodeId, + rmContext, host, -1, -1, new UnknownNode(host), + Resource.newInstance(0, 0), "unknown"); + rmContext.getInactiveRMNodes().put(nodeId, rmNode); + rmNode.handle(new RMNodeEvent(nodeId, RMNodeEventType.SHUTDOWN)); + } + } + } + // Handle excluded nodes based on following rules: // Recommission DECOMMISSIONED or DECOMMISSIONING nodes no longer excluded; // Gracefully decommission excluded nodes that are not already @@ -274,7 +289,10 @@ private void handleExcludeNodeList(boolean graceful, int timeout) { Set includes = hostDetails.getIncludedHosts(); Map excludes = hostDetails.getExcludedMap(); + Set activeNodes = new HashSet(); for (RMNode n : this.rmContext.getRMNodes().values()) { + activeNodes.add(n.getHostName()); + activeNodes.add(resolver.resolve(n.getHostName())); NodeState s = n.getState(); // An invalid node (either due to explicit exclude or not include) // should be excluded. @@ -335,6 +353,19 @@ private void handleExcludeNodeList(boolean graceful, int timeout) { this.rmContext.getDispatcher().getEventHandler().handle(e); } + Set copyOfIncludes = new HashSet(includes); + copyOfIncludes.removeAll(activeNodes); + copyOfIncludes.removeAll(excludes.keySet()); + if (!copyOfIncludes.isEmpty()) { + for (String host : copyOfIncludes) { + NodeId nodeId = createUnknownNodeId(host); + RMNodeImpl rmNode = new RMNodeImpl(nodeId, + rmContext, host, -1, -1, new UnknownNode(host), + Resource.newInstance(0, 0), "unknown"); + rmContext.getInactiveRMNodes().put(nodeId, rmNode); + rmNode.handle(new RMNodeEvent(nodeId, RMNodeEventType.SHUTDOWN)); + } + } updateInactiveNodes(); } @@ -528,6 +559,7 @@ private void disableHostsFileReader(Exception ex) { this.hostsReader = createHostsFileReader(this.includesFile, this.excludesFile); setDecommissionedNMs(); + setUnregisteredNMs(); } catch (IOException ioe2) { // Should *never* happen this.hostsReader = null; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 012f58a3698..8ed09b64b89 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -581,6 +581,9 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) message); } + if (this.rmContext.getInactiveRMNodes().remove(NodesListManager.createUnknownNodeId(rmNode.getHostName())) != null) { + ClusterMetrics.getMetrics().decrNumShutdownNMs(); + } // Send ping this.nmLivelinessMonitor.receivedPing(nodeId); this.decommissioningWatcher.update(rmNode, remoteNodeStatus); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index e94dfe0d861..20f292af624 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -216,6 +216,9 @@ .addTransition(NodeState.NEW, NodeState.DECOMMISSIONED, RMNodeEventType.DECOMMISSION, new DeactivateNodeTransition(NodeState.DECOMMISSIONED)) + .addTransition(NodeState.NEW, NodeState.SHUTDOWN, + RMNodeEventType.SHUTDOWN, + new DeactivateNodeTransition(NodeState.SHUTDOWN)) //Transitions from RUNNING state .addTransition(NodeState.RUNNING, @@ -866,7 +869,11 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { previousRMNode = rmNode.context.getInactiveRMNodes().remove(unknownNodeId); if (previousRMNode != null) { - ClusterMetrics.getMetrics().decrDecommisionedNMs(); + if (previousRMNode.getState().equals(NodeState.SHUTDOWN)) { + ClusterMetrics.getMetrics().decrNumShutdownNMs(); + } else { + ClusterMetrics.getMetrics().decrDecommisionedNMs(); + } } // Increment activeNodes explicitly because this is a new node. ClusterMetrics.getMetrics().incrNumActiveNodes(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 1fd34a0bb08..e32701d2ad9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -236,6 +237,64 @@ public void testDecommissionWithIncludeHosts() throws Exception { rm.stop(); } + @Test + public void testIncludeHostsWithNoRegister() throws Exception { + + writeToHostsFile("localhost", "host1", "host2"); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile + .getAbsolutePath()); + + rm = new MockRM(conf); + rm.start(); + rm.drainEvents(); + + Assert.assertEquals("NEW nodes should be in SHUTDOWN State first!", + 3, ClusterMetrics.getMetrics().getNumShutdownNMs()); + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:5678", 10240); + MockNM nm3 = rm.registerNode("localhost:4433", 1024); + + ClusterMetrics metrics = ClusterMetrics.getMetrics(); + assert(metrics != null); + int metricCount = metrics.getNumDecommisionedNMs(); + + NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + nodeHeartbeat = nm2.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + nodeHeartbeat = nm3.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + Assert.assertEquals( + "NEW/SHUTDOWN nodes should be in RUNNING State after registration!", + 3, ClusterMetrics.getMetrics().getNumActiveNMs()); + Assert.assertEquals( + "There should be no SHUTDOWN nodes", + 0, ClusterMetrics.getMetrics().getNumShutdownNMs()); + // To test that IPs also work + String ip = NetUtils.normalizeHostName("localhost"); + writeToHostsFile("host1", ip); + + rm.getNodesListManager().refreshNodes(conf); + + checkShutdownNMCount(rm, ++metricCount); + + nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + Assert + .assertEquals(2, ClusterMetrics.getMetrics().getNumShutdownNMs()); + + nodeHeartbeat = nm2.nodeHeartbeat(true); + Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN + .equals(nodeHeartbeat.getNodeAction())); + + nodeHeartbeat = nm3.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + Assert.assertEquals(metricCount, ClusterMetrics.getMetrics() + .getNumShutdownNMs()); + rm.stop(); + } + /** * Decommissioning using a pre-configured exclude hosts file */ @@ -2210,6 +2269,7 @@ public void testNodeRemovalUtil(boolean doGraceful) throws Exception { latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS); rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId()); + rm.drainEvents(); Assert.assertEquals("Node should have been forgotten!", rmNode, null); Assert.assertEquals("Shutdown nodes should be 0 now", @@ -2240,13 +2300,17 @@ public void testNodeRemovalUtil(boolean doGraceful) throws Exception { writeToHostsFile("host1", ip, "host2"); refreshNodesOption(doGraceful, conf); nm2 = rm.registerNode("host2:5678", 10240); + rm.drainEvents(); nodeHeartbeat = nm2.nodeHeartbeat(true); rm.drainEvents(); + int waitCount = 0; + rm.drainEvents(); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); - Assert.assertEquals("Shutdown nodes should be 0 now", - metrics.getNumShutdownNMs(), 0); Assert.assertEquals("All 3 nodes should be active", metrics.getNumActiveNMs(), 3); + Assert.assertEquals("Shutdown nodes should be 0 now", + metrics.getNumShutdownNMs(), 0); + //Decommission this node, check timer doesn't remove it writeToHostsFile("host1", "host2", ip);