diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index f5943a8..bbf6cce 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -199,6 +199,15 @@ public ResourceUtilization getAggregatedContainersUtilization() { public ResourceUtilization getNodeUtilization() { return null; } + + @Override + public long getTimeStamp() { + return 0; + } + + @Override + public void setTimeStamp(long timeStamp) { + } } public static RMNode newNodeInfo(String rackName, String hostName, diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index e778188..fa568f1 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -187,4 +187,13 @@ public ResourceUtilization getAggregatedContainersUtilization() { public ResourceUtilization getNodeUtilization() { return node.getNodeUtilization(); } + + @Override + public long getTimeStamp() { + return 0; + } + + @Override + public void setTimeStamp(long timeStamp) { + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index f493fd3..4706abe 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -629,6 +629,19 @@ private static void addDeprecatedKeys() { "NONE"; /** + * Interval(msec) for checking removal of shutdown and decommissioned nodes. + */ + public static final String RM_NODE_REMOVAL_CHK_INTERVAL_MSEC = RM_PREFIX + + "node-removal.interval-ms"; + public static final int DEFAULT_RM_NODE_REMOVAL_CHK_INTERVAL_MSEC = 30000; + /** + * Timeout(msec) for an untracked node to remain in shutdown or decommissioned + * state. + */ + public static final String RM_NODE_REMOVAL_TIMEOUT_MSEC = RM_PREFIX + + "node-removal.timeout-ms"; + public static final int DEFAULT_RM_NODE_REMOVAL_TIMEOUT_MSEC = 10000; + /** * RM proxy users' prefix */ public static final String RM_PROXY_USER_PREFIX = RM_PREFIX + "proxyuser."; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 9bbdb94..8d8675f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2451,4 +2451,20 @@ yarn.am.blacklisting.disable-failure-threshold 0.8f + + + + The amount of time(msec.) an inactive node can stay in the RMNodes + list after being declared untracked. + + yarn.resourcemanager.node-removal.timeout-ms + 10000 + + + The interval(msec.) for the timer that checks NM candidates + that are due for removal from inactive list. + + yarn.resourcemanager.node-removal.interval-ms + 30000 + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index 1e8b98a..e0f9656 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -71,6 +71,7 @@ private String excludesFile; private Resolver resolver; + private Timer removalTimer; public NodesListManager(RMContext rmContext) { super(NodesListManager.class.getName()); @@ -107,9 +108,51 @@ protected void serviceInit(Configuration conf) throws Exception { } catch (IOException ioe) { disableHostsFileReader(ioe); } + removalTimer = new Timer("Node Removal Timer"); + int nodeCheckInterval = conf. + getInt(YarnConfiguration.RM_NODE_REMOVAL_CHK_INTERVAL_MSEC, + YarnConfiguration.DEFAULT_RM_NODE_REMOVAL_CHK_INTERVAL_MSEC); + final int nodeRemovalTimeout = conf. + getInt(YarnConfiguration.RM_NODE_REMOVAL_TIMEOUT_MSEC, + YarnConfiguration.DEFAULT_RM_NODE_REMOVAL_TIMEOUT_MSEC); + removalTimer.schedule(new TimerTask() { + @Override + public void run() { + for (Map.Entry entry : rmContext.getInactiveRMNodes(). + entrySet()) { + if (entry != null) { + NodeId nodeId = entry.getKey(); + RMNode rmNode = entry.getValue(); + if (rmNode.getTimeStamp() == 0 && isUntrackedNode(rmNode + .getHostName())) { + rmNode.setTimeStamp(System.currentTimeMillis()); + } else if (!isUntrackedNode(rmNode.getHostName())) { + rmNode.setTimeStamp(0); + } else if (System.currentTimeMillis() - rmNode.getTimeStamp() > + nodeRemovalTimeout) { + RMNode result = rmContext.getInactiveRMNodes().remove(nodeId); + if (result != null) { + ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics(); + if (rmNode.getState() == NodeState.SHUTDOWN) { + clusterMetrics.decrNumShutdownNMs(); + } else { + clusterMetrics.decrDecommisionedNMs(); + } + } + } + } + } + } + }, nodeCheckInterval, nodeCheckInterval); super.serviceInit(conf); } + @Override + public void serviceStop() { + removalTimer.cancel(); + removalTimer.purge(); + } + private void printConfiguredHosts() { if (!LOG.isDebugEnabled()) { return; @@ -133,8 +176,20 @@ public void refreshNodes(Configuration yarnConf) throws IOException, for (NodeId nodeId: rmContext.getRMNodes().keySet()) { if (!isValidNode(nodeId.getHost())) { + RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ? + RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION; this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); + new RMNodeEvent(nodeId, nodeEventType)); + } + } + for(Entry entry : rmContext.getInactiveRMNodes(). + entrySet()) { + NodeId nodeId = entry.getKey(); + RMNode rmNode = entry.getValue(); + if (entry.getKey() != null) { + if (isUntrackedNode(nodeId.getHost())) { + rmNode.setTimeStamp(System.currentTimeMillis()); + } } } } @@ -290,6 +345,26 @@ public boolean isValidNode(String hostName) { } } + public boolean isUntrackedNode(String hostName) { + String ip = resolver.resolve(hostName); + includesFile = + conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH); + synchronized (hostsReader) { + Set hostsList = hostsReader.getHosts(); + Set excludeList = hostsReader.getExcludedHosts(); + if(hostsList.isEmpty()) { + return false; + } + if (!(hostsList.contains(hostName) || + hostsList.contains(ip)) && !(excludeList.contains(hostName) || + excludeList.contains(ip))) { + return true; + } + } + return false; + } + /** * Provides the currently unusable nodes. Copies it into provided collection. * @param unUsableNodes @@ -398,8 +473,10 @@ public void refreshNodesGracefully(Configuration conf) throws IOException, for (Entry entry:rmContext.getRMNodes().entrySet()) { NodeId nodeId = entry.getKey(); if (!isValidNode(nodeId.getHost())) { + RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ? + RMNodeEventType.SHUTDOWN : RMNodeEventType.GRACEFUL_DECOMMISSION; this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION)); + new RMNodeEvent(nodeId, nodeEventType)); } else { // Recommissioning the nodes if (entry.getValue().getState() == NodeState.DECOMMISSIONING @@ -409,6 +486,16 @@ public void refreshNodesGracefully(Configuration conf) throws IOException, } } } + for (Entry entry:rmContext.getInactiveRMNodes(). + entrySet()) { + if (entry != null) { + NodeId nodeId = entry.getKey(); + RMNode rmNode = entry.getValue(); + if (isUntrackedNode(nodeId.getHost())) { + rmNode.setTimeStamp(System.currentTimeMillis()); + } + } + } } /** @@ -432,8 +519,11 @@ public void refreshNodesGracefully(Configuration conf) throws IOException, public void refreshNodesForcefully() { for (Entry entry : rmContext.getRMNodes().entrySet()) { if (entry.getValue().getState() == NodeState.DECOMMISSIONING) { + RMNodeEventType nodeEventType = isUntrackedNode(entry. + getKey().getHost()) ? RMNodeEventType.SHUTDOWN : + RMNodeEventType.DECOMMISSION; this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(entry.getKey(), RMNodeEventType.DECOMMISSION)); + new RMNodeEvent(entry.getKey(), nodeEventType)); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index cc30593..b48ca1e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -86,8 +86,10 @@ acceptedStates.contains(NodeState.LOST) || acceptedStates.contains(NodeState.REBOOTED)) { for (RMNode rmNode : context.getInactiveRMNodes().values()) { - if (acceptedStates.contains(rmNode.getState())) { - results.add(rmNode); + if (rmNode != null) { + if (acceptedStates.contains(rmNode.getState())) { + results.add(rmNode); + } } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index bd24b25..f42c160 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -280,7 +280,8 @@ public RegisterNodeManagerResponse registerNodeManager( } // Check if this node is a 'valid' node - if (!this.nodesListManager.isValidNode(host)) { + if (!this.nodesListManager.isValidNode(host) || + this.nodesListManager.isUntrackedNode(host)) { String message = "Disallowed NodeManager from " + host + ", Sending SHUTDOWN signal to the NodeManager."; @@ -398,8 +399,9 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // 1. Check if it's a valid (i.e. not excluded) node, if not, see if it is // in decommissioning. - if (!this.nodesListManager.isValidNode(nodeId.getHost()) - && !isNodeInDecommissioning(nodeId)) { + if ((!this.nodesListManager.isValidNode(nodeId.getHost()) + && !isNodeInDecommissioning(nodeId)) + || this.nodesListManager.isUntrackedNode(nodeId.getHost())) { String message = "Disallowed NodeManager nodeId: " + nodeId + " hostname: " + nodeId.getHost(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 1a172e8..ce7adf2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -168,4 +168,8 @@ public void updateNodeHeartbeatResponseForContainersDecreasing( NodeHeartbeatResponse response); public List pullNewlyIncreasedContainers(); + + long getTimeStamp(); + + void setTimeStamp(long timer); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 146b0d3..314a251 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -115,6 +115,7 @@ private long lastHealthReportTime; private String nodeManagerVersion; + private long timeStamp; /* Aggregated resource utilization for the containers. */ private ResourceUtilization containersUtilization; /* Resource utilization for the node. */ @@ -248,6 +249,9 @@ .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) + .addTransition(NodeState.DECOMMISSIONING, NodeState.SHUTDOWN, + RMNodeEventType.SHUTDOWN, + new DeactivateNodeTransition(NodeState.SHUTDOWN)) // TODO (in YARN-3223) update resource when container finished. .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, @@ -733,11 +737,12 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { List containers = null; NodeId nodeId = rmNode.nodeId; - if (rmNode.context.getInactiveRMNodes().containsKey(nodeId)) { + RMNode previousRMNode = rmNode.context.getInactiveRMNodes().get(nodeId); + if (previousRMNode != null) { // Old node rejoining - RMNode previouRMNode = rmNode.context.getInactiveRMNodes().get(nodeId); + previousRMNode.setTimeStamp(0); rmNode.context.getInactiveRMNodes().remove(nodeId); - rmNode.updateMetricsForRejoinedNode(previouRMNode.getState()); + rmNode.updateMetricsForRejoinedNode(previousRMNode.getState()); } else { // Increment activeNodes explicitly because this is a new node. ClusterMetrics.getMetrics().incrNumActiveNodes(); @@ -957,7 +962,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } /** - * Put a node in deactivated (decommissioned) status. + * Put a node in deactivated (decommissioned) or shutdown status. * @param rmNode * @param finalState */ @@ -970,6 +975,10 @@ public static void deactivateNode(RMNodeImpl rmNode, NodeState finalState) { LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now " + finalState); rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode); + if(finalState == NodeState.SHUTDOWN && rmNode.context.getNodesListManager(). + isUntrackedNode(rmNode.hostName)) { + rmNode.setTimeStamp(System.currentTimeMillis()); + } } /** @@ -1318,4 +1327,14 @@ private void handleLogAggregationStatus( writeLock.unlock(); } } + + @Override + public long getTimeStamp() { + return this.timeStamp; + } + + @Override + public void setTimeStamp(long ts) { + this.timeStamp = ts; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 96207f3..17ea498 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -260,6 +260,15 @@ public ResourceUtilization getAggregatedContainersUtilization() { public ResourceUtilization getNodeUtilization() { return this.nodeUtilization; } + + @Override + public long getTimeStamp() { + return 0; + } + + @Override + public void setTimeStamp(long timeStamp) { + } }; private static RMNode buildRMNode(int rack, final Resource perNode, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index e42ed91..f6ee9c7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -126,7 +126,7 @@ public void testDecommissionWithIncludeHosts() throws Exception { ClusterMetrics metrics = ClusterMetrics.getMetrics(); assert(metrics != null); - int metricCount = metrics.getNumDecommisionedNMs(); + int metricCount = metrics.getNumShutdownNMs(); NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); @@ -141,12 +141,12 @@ public void testDecommissionWithIncludeHosts() throws Exception { rm.getNodesListManager().refreshNodes(conf); - checkDecommissionedNMCount(rm, ++metricCount); + checkShutdownNMCount(rm, ++metricCount); nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); Assert - .assertEquals(1, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + .assertEquals(1, ClusterMetrics.getMetrics().getNumShutdownNMs()); nodeHeartbeat = nm2.nodeHeartbeat(true); Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN @@ -155,7 +155,181 @@ public void testDecommissionWithIncludeHosts() throws Exception { nodeHeartbeat = nm3.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); Assert.assertEquals(metricCount, ClusterMetrics.getMetrics() - .getNumDecommisionedNMs()); + .getNumShutdownNMs()); + rm.stop(); + } + + /** + * Remove a node from all lists and check if its forgotten + */ + @Test + public void testNodeRemovalNormally() throws Exception { + testNodeRemovalUtil(false); + } + + @Test + public void testNodeRemovalGracefully() throws Exception { + testNodeRemovalUtil(true); + } + + public void refreshNodesOption(boolean doGraceful, Configuration conf) + throws Exception { + if (doGraceful) { + rm.getNodesListManager().refreshNodesGracefully(conf); + } else { + rm.getNodesListManager().refreshNodes(conf); + } + } + public void testNodeRemovalUtil(boolean doGraceful) throws Exception { + Configuration conf = new Configuration(); + File excludeHostFile = new File(TEMP_DIR + File.separator + + "excludeHostFile.txt"); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, ""); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, ""); + + final DrainDispatcher dispatcher = new DrainDispatcher(); + rm = new MockRM(conf) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm.start(); + refreshNodesOption(doGraceful, conf); + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:5678", 10240); + MockNM nm3 = rm.registerNode("localhost:4433", 1024); + dispatcher.await(); + ClusterMetrics metrics = ClusterMetrics.getMetrics(); + assert(metrics != null); + //check all 3 nodes joined in as NORMAL + NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + nodeHeartbeat = nm2.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + nodeHeartbeat = nm3.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + dispatcher.await(); + Assert.assertEquals("All 3 nodes should be active", + metrics.getNumActiveNMs(), 3); + + //Remove nm2 from include list, should now be shutdown with timer test + String ip = NetUtils.normalizeHostName("localhost"); + writeToHostsFile("host1", ip); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile + .getAbsolutePath()); + refreshNodesOption(doGraceful, conf); + nm1.nodeHeartbeat(true); + dispatcher.await(); + Assert.assertTrue("Node should not be in active node list", + !rm.getRMContext().getRMNodes().containsKey(nm2.getNodeId())); + RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId()); + Assert.assertEquals("Node should be in inactive node list", + rmNode.getState(), NodeState.SHUTDOWN); + Assert.assertEquals("Active nodes should be 2", + metrics.getNumActiveNMs(), 2); + Assert.assertEquals("Shutdown nodes should be 1", + metrics.getNumShutdownNMs(), 1); + + int nodeRemovalTimeout = conf. + getInt(YarnConfiguration.RM_NODE_REMOVAL_TIMEOUT_MSEC, + YarnConfiguration.DEFAULT_RM_NODE_REMOVAL_TIMEOUT_MSEC); + int nodeRemovalInterval = conf. + getInt(YarnConfiguration.RM_NODE_REMOVAL_CHK_INTERVAL_MSEC, + YarnConfiguration.DEFAULT_RM_NODE_REMOVAL_CHK_INTERVAL_MSEC); + long maxThreadSleeptime = nodeRemovalInterval+nodeRemovalTimeout; + Thread.sleep(maxThreadSleeptime); + rmNode = rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId()); + Assert.assertEquals("Node should have been forgotten!", + rmNode, null); + Assert.assertEquals("Shutdown nodes should be 0 now", + metrics.getNumShutdownNMs(), 0); + + //Check node removal and re-addition before timer expires + writeToHostsFile("host1", ip, "host2"); + refreshNodesOption(doGraceful, conf); + nm2 = rm.registerNode("host2:5678", 10240); + dispatcher.await(); + writeToHostsFile("host1", ip); + refreshNodesOption(doGraceful, conf); + dispatcher.await(); + rmNode = rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId()); + Assert.assertEquals("Node should be shutdown", + rmNode.getState(), NodeState.SHUTDOWN); + Assert.assertEquals("Active nodes should be 2", + metrics.getNumActiveNMs(), 2); + Assert.assertEquals("Shutdown nodes should be 1", + metrics.getNumShutdownNMs(), 1); + + //add back the node before timer expires + Thread.sleep(maxThreadSleeptime - 2000); + writeToHostsFile("host1", ip, "host2"); + refreshNodesOption(doGraceful, conf); + nm2 = rm.registerNode("host2:5678", 10240); + nodeHeartbeat = nm2.nodeHeartbeat(true); + dispatcher.await(); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + Assert.assertEquals("Shutdown nodes should be 0 now", + metrics.getNumShutdownNMs(), 0); + Assert.assertEquals("All 3 nodes should be active", + metrics.getNumActiveNMs(), 3); + + //Decommission this node, check timer doesn't remove it + writeToHostsFile("host1", "host2", ip); + writeToExcludeHostsFile(excludeHostFile, "host2"); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile + .getAbsolutePath()); + refreshNodesOption(doGraceful, conf); + Thread.sleep(100); + rmNode = doGraceful ? rm.getRMContext().getRMNodes().get(nm2.getNodeId()) : + rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId()); + Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING", + (rmNode.getState() == NodeState.DECOMMISSIONED) || + ( rmNode.getState() == NodeState.DECOMMISSIONING)); + if (rmNode.getState() == NodeState.DECOMMISSIONED){ + Assert.assertEquals("Decommissioned/ing nodes should be 1 now", + metrics.getNumDecommisionedNMs(), 1); + } + Thread.sleep(maxThreadSleeptime); + rmNode = doGraceful ? rm.getRMContext().getRMNodes().get(nm2.getNodeId()) : + rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId()); + Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING", + (rmNode.getState() == NodeState.DECOMMISSIONED) || + (rmNode.getState() == NodeState.DECOMMISSIONING)); + if (rmNode.getState() == NodeState.DECOMMISSIONED){ + Assert.assertEquals("Decommissioned/ing nodes should be 1 now", + metrics.getNumDecommisionedNMs(), 1); + } + //Test decommed/ing node that transitions to untracked,timer should remove + writeToHostsFile("host1", ip, "host2"); + writeToExcludeHostsFile(excludeHostFile, "host2"); + refreshNodesOption(doGraceful, conf); + nm1.nodeHeartbeat(true); + //nm2.nodeHeartbeat(true); + nm3.nodeHeartbeat(true); + Thread.sleep(maxThreadSleeptime); + rmNode = doGraceful ? rm.getRMContext().getRMNodes().get(nm2.getNodeId()) : + rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId()); + Assert.assertNotEquals("Timer for this node was not canceled!", + rmNode, null); + Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING", + (rmNode.getState() == NodeState.DECOMMISSIONED) || + (rmNode.getState() == NodeState.DECOMMISSIONING)); + writeToHostsFile("host1", ip); + writeToExcludeHostsFile(excludeHostFile, ""); + refreshNodesOption(doGraceful, conf); + Thread.sleep(maxThreadSleeptime); + rmNode = doGraceful ? rm.getRMContext().getRMNodes().get(nm2.getNodeId()) : + rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId()); + Assert.assertEquals("Node should have been forgotten!", + rmNode, null); + Assert.assertEquals("Shutdown nodes should be 0 now", + metrics.getNumDecommisionedNMs(), 0); + Assert.assertEquals("Shutdown nodes should be 0 now", + metrics.getNumShutdownNMs(), 0); + Assert.assertEquals("Active nodes should be 2", + metrics.getNumActiveNMs(), 2); + rm.stop(); } /** @@ -234,7 +408,7 @@ public void testAddNewIncludePathToConfiguration() throws Exception { MockNM nm2 = rm.registerNode("host2:5678", 10240); ClusterMetrics metrics = ClusterMetrics.getMetrics(); assert(metrics != null); - int initialMetricCount = metrics.getNumDecommisionedNMs(); + int initialMetricCount = metrics.getNumShutdownNMs(); NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertEquals( NodeAction.NORMAL, @@ -247,16 +421,17 @@ public void testAddNewIncludePathToConfiguration() throws Exception { conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile .getAbsolutePath()); rm.getNodesListManager().refreshNodes(conf); - checkDecommissionedNMCount(rm, ++initialMetricCount); + checkShutdownNMCount(rm, ++initialMetricCount); nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertEquals( - "Node should not have been decomissioned.", + "Node should not have been shutdown.", NodeAction.NORMAL, nodeHeartbeat.getNodeAction()); - nodeHeartbeat = nm2.nodeHeartbeat(true); - Assert.assertEquals("Node should have been decomissioned but is in state" + - nodeHeartbeat.getNodeAction(), - NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction()); + NodeState nodeState = rm.getRMContext().getInactiveRMNodes().get(nm2 + .getNodeId()).getState(); + Assert.assertEquals("Node should have been shutdown but is in state" + + nodeState, + NodeState.SHUTDOWN, nodeState); } /** @@ -1135,8 +1310,6 @@ public void testInvalidNMUnregistration() throws Exception { rm.start(); ResourceTrackerService resourceTrackerService = rm .getResourceTrackerService(); - int shutdownNMsCount = ClusterMetrics.getMetrics() - .getNumShutdownNMs(); int decommisionedNMsCount = ClusterMetrics.getMetrics() .getNumDecommisionedNMs(); @@ -1161,10 +1334,12 @@ public void testInvalidNMUnregistration() throws Exception { rm.getNodesListManager().refreshNodes(conf); NodeHeartbeatResponse heartbeatResponse = nm1.nodeHeartbeat(true); Assert.assertEquals(NodeAction.SHUTDOWN, heartbeatResponse.getNodeAction()); + int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs(); checkShutdownNMCount(rm, shutdownNMsCount); - checkDecommissionedNMCount(rm, ++decommisionedNMsCount); + checkDecommissionedNMCount(rm, decommisionedNMsCount); request.setNodeId(nm1.getNodeId()); resourceTrackerService.unRegisterNodeManager(request); + shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs(); checkShutdownNMCount(rm, shutdownNMsCount); checkDecommissionedNMCount(rm, decommisionedNMsCount); @@ -1180,8 +1355,9 @@ public void testInvalidNMUnregistration() throws Exception { rm.getNodesListManager().refreshNodes(conf); request.setNodeId(nm2.getNodeId()); resourceTrackerService.unRegisterNodeManager(request); - checkShutdownNMCount(rm, shutdownNMsCount); - checkDecommissionedNMCount(rm, ++decommisionedNMsCount); + checkShutdownNMCount(rm, ++shutdownNMsCount); + checkDecommissionedNMCount(rm, decommisionedNMsCount); + rm.stop(); } private void writeToHostsFile(String... hosts) throws IOException { @@ -1204,6 +1380,25 @@ private void writeToHostsFile(String... hosts) throws IOException { } } + private void writeToExcludeHostsFile(File excludeFile,String... hosts) throws IOException { + if (!excludeFile.exists()) { + TEMP_DIR.mkdirs(); + excludeFile.createNewFile(); + } + FileOutputStream fStream = null; + try { + fStream = new FileOutputStream(excludeFile); + for (int i = 0; i < hosts.length; i++) { + fStream.write(hosts[i].getBytes()); + fStream.write("\n".getBytes()); + } + } finally { + if (fStream != null) { + IOUtils.closeStream(fStream); + fStream = null; + } + } + } private void checkDecommissionedNMCount(MockRM rm, int count) throws InterruptedException { int waitCount = 0; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java index 206edb1..e8df11a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java @@ -271,8 +271,10 @@ public void testNodesQueryStateLost() throws JSONException, Exception { RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId); WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "", info.getString("nodeHTTPAddress")); - WebServicesTestUtils.checkStringMatch("state", rmNode.getState() - .toString(), info.getString("state")); + if (rmNode != null) { + WebServicesTestUtils.checkStringMatch("state", rmNode.getState() + .toString(), info.getString("state")); + } } } @@ -303,8 +305,10 @@ public void testSingleNodeQueryStateLost() throws JSONException, Exception { RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId); WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "", info.getString("nodeHTTPAddress")); - WebServicesTestUtils.checkStringMatch("state", - rmNode.getState().toString(), info.getString("state")); + if( rmNode != null) { + WebServicesTestUtils.checkStringMatch("state", + rmNode.getState().toString(), info.getString("state")); + } } @Test