diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index 90d7b51..11e292a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -30,6 +30,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.HostsFileReader; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -38,6 +39,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import com.google.common.annotations.VisibleForTesting; @@ -123,6 +126,13 @@ public void refreshNodes(Configuration yarnConf) throws IOException, .getConfigurationInputStream(this.conf, excludesFile)); printConfiguredHosts(); } + + for (NodeId nodeId: rmContext.getRMNodes().keySet()) { + if (!isValidNode(nodeId.getHost())) { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); + } + } } private void setDecomissionedNMsMetrics() { @@ -208,7 +218,6 @@ private void disableHostsFileReader(Exception ex) { } } - @VisibleForTesting public HostsFileReader getHostsReader() { return this.hostsReader; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 4222888..63316e5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -269,55 +269,64 @@ public RegisterNodeManagerResponse registerNodeManager( } } - // Check if this node is a 'valid' node - if (!this.nodesListManager.isValidNode(host)) { - String message = - "Disallowed NodeManager from " + host - + ", Sending SHUTDOWN signal to the NodeManager."; - LOG.info(message); - response.setDiagnosticsMessage(message); - response.setNodeAction(NodeAction.SHUTDOWN); - return response; - } - - // Check if this node has minimum allocations - if (capability.getMemory() < minAllocMb - || capability.getVirtualCores() < minAllocVcores) { - String message = - "NodeManager from " + host - + " doesn't satisfy minimum allocations, Sending SHUTDOWN" - + " signal to the NodeManager."; - LOG.info(message); - response.setDiagnosticsMessage(message); - response.setNodeAction(NodeAction.SHUTDOWN); - return response; - } - - response.setContainerTokenMasterKey(containerTokenSecretManager - .getCurrentKey()); - response.setNMTokenMasterKey(nmTokenSecretManager - .getCurrentKey()); + // synchronize on hostsReader to make sure NodeListManager#refreshNodes + // is run either before check the node or after RMNode is created. + // If it is run before check the node, NodeListManager#isValidNode will + // return false for the node to be decommissioned. + // If it is run after RMNode is created, the newly created RMNode will + // be decommissioned by NodeListManager#refreshNodes immediately. + synchronized (this.nodesListManager.getHostsReader()) { + // Check if this node is a 'valid' node + if (!this.nodesListManager.isValidNode(host)) { + String message = + "Disallowed NodeManager from " + host + + ", Sending SHUTDOWN signal to the NodeManager."; + LOG.info(message); + response.setDiagnosticsMessage(message); + response.setNodeAction(NodeAction.SHUTDOWN); + return response; + } - RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort, - resolve(host), capability, nodeManagerVersion); + // Check if this node has minimum allocations + if (capability.getMemory() < minAllocMb + || capability.getVirtualCores() < minAllocVcores) { + String message = + "NodeManager from " + host + + " doesn't satisfy minimum allocations, Sending SHUTDOWN" + + " signal to the NodeManager."; + LOG.info(message); + response.setDiagnosticsMessage(message); + response.setNodeAction(NodeAction.SHUTDOWN); + return response; + } - RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); - if (oldNode == null) { - this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses(), - request.getRunningApplications())); - } else { - LOG.info("Reconnect from the node at: " + host); - this.nmLivelinessMonitor.unregister(nodeId); - this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeReconnectEvent(nodeId, rmNode, - request.getRunningApplications())); + response.setContainerTokenMasterKey(containerTokenSecretManager + .getCurrentKey()); + response.setNMTokenMasterKey(nmTokenSecretManager + .getCurrentKey()); + + RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, + httpPort, resolve(host), capability, nodeManagerVersion); + + RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, + rmNode); + if (oldNode == null) { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses(), + request.getRunningApplications())); + } else { + LOG.info("Reconnect from the node at: " + host); + this.nmLivelinessMonitor.unregister(nodeId); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeReconnectEvent(nodeId, rmNode, + request.getRunningApplications())); + } + // On every node manager register we will be clearing NMToken keys if + // present for any running application. + this.nmTokenSecretManager.removeNodeKey(nodeId); + this.nmLivelinessMonitor.register(nodeId); } - // On every node manager register we will be clearing NMToken keys if - // present for any running application. - this.nmTokenSecretManager.removeNodeKey(nodeId); - this.nmLivelinessMonitor.register(nodeId); - + // Handle received container status, this should be processed after new // RMNode inserted if (!rmContext.isWorkPreservingRecoveryEnabled()) { @@ -349,15 +358,25 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) NodeStatus remoteNodeStatus = request.getNodeStatus(); /** * Here is the node heartbeat sequence... - * 1. Check if it's a registered node - * 2. Check if it's a valid (i.e. not excluded) node - * 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat + * 1. Check if it's a valid (i.e. not excluded) node + * 2. Check if it's a registered node + * 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat * 4. Send healthStatus to RMNode */ NodeId nodeId = remoteNodeStatus.getNodeId(); - // 1. Check if it's a registered node + // 1. Check if it's a valid (i.e. not excluded) node + if (!this.nodesListManager.isValidNode(nodeId.getHost())) { + String message = + "Disallowed NodeManager nodeId: " + nodeId + " hostname: " + + nodeId.getHost(); + LOG.info(message); + shutDown.setDiagnosticsMessage(message); + return shutDown; + } + + // 2. Check if it's a registered node RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); if (rmNode == null) { /* node does not exist */ @@ -370,18 +389,6 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // Send ping this.nmLivelinessMonitor.receivedPing(nodeId); - // 2. Check if it's a valid (i.e. not excluded) node - if (!this.nodesListManager.isValidNode(rmNode.getHostName())) { - String message = - "Disallowed NodeManager nodeId: " + nodeId + " hostname: " - + rmNode.getNodeAddress(); - LOG.info(message); - shutDown.setDiagnosticsMessage(message); - this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); - return shutDown; - } - // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse(); if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 077f464..28d1d63 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -130,17 +130,17 @@ public void testDecommissionWithIncludeHosts() throws Exception { rm.getNodesListManager().refreshNodes(conf); + checkDecommissionedNMCount(rm, ++metricCount); + nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); Assert - .assertEquals(0, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + .assertEquals(1, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); nodeHeartbeat = nm2.nodeHeartbeat(true); Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN .equals(nodeHeartbeat.getNodeAction())); - checkDecommissionedNMCount(rm, ++metricCount); - nodeHeartbeat = nm3.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); Assert.assertEquals(metricCount, ClusterMetrics.getMetrics() @@ -185,6 +185,8 @@ protected Dispatcher createDispatcher() { rm.getNodesListManager().refreshNodes(conf); + checkDecommissionedNMCount(rm, metricCount + 2); + nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); nodeHeartbeat = nm2.nodeHeartbeat(true); @@ -195,7 +197,7 @@ protected Dispatcher createDispatcher() { Assert.assertTrue("The decommisioned metrics are not updated", NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction())); dispatcher.await(); - checkDecommissionedNMCount(rm, metricCount + 2); + writeToHostsFile(""); rm.getNodesListManager().refreshNodes(conf); @@ -234,6 +236,7 @@ public void testAddNewIncludePathToConfiguration() throws Exception { conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile .getAbsolutePath()); rm.getNodesListManager().refreshNodes(conf); + checkDecommissionedNMCount(rm, ++initialMetricCount); nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertEquals( "Node should not have been decomissioned.", @@ -243,7 +246,6 @@ public void testAddNewIncludePathToConfiguration() throws Exception { Assert.assertEquals("Node should have been decomissioned but is in state" + nodeHeartbeat.getNodeAction(), NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction()); - checkDecommissionedNMCount(rm, ++initialMetricCount); } /** @@ -271,6 +273,7 @@ public void testAddNewExcludePathToConfiguration() throws Exception { conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile .getAbsolutePath()); rm.getNodesListManager().refreshNodes(conf); + checkDecommissionedNMCount(rm, ++initialMetricCount); nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertEquals( "Node should not have been decomissioned.", @@ -280,7 +283,6 @@ public void testAddNewExcludePathToConfiguration() throws Exception { Assert.assertEquals("Node should have been decomissioned but is in state" + nodeHeartbeat.getNodeAction(), NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction()); - checkDecommissionedNMCount(rm, ++initialMetricCount); } @Test