diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index 92d586b..951f5a8 100644
--- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -199,6 +199,15 @@ public ResourceUtilization getAggregatedContainersUtilization() {
public ResourceUtilization getNodeUtilization() {
return null;
}
+
+ @Override
+ public long getUntrackedTimeStamp() {
+ return 0;
+ }
+
+ @Override
+ public void setUntrackedTimeStamp(long timeStamp) {
+ }
}
public static RMNode newNodeInfo(String rackName, String hostName,
diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index 2e9cccb..e5013c4 100644
--- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -188,4 +188,13 @@ public ResourceUtilization getAggregatedContainersUtilization() {
public ResourceUtilization getNodeUtilization() {
return node.getNodeUtilization();
}
+
+ @Override
+ public long getUntrackedTimeStamp() {
+ return 0;
+ }
+
+ @Override
+ public void setUntrackedTimeStamp(long timeStamp) {
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 8acee57..66b293f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -648,6 +648,15 @@ public static boolean isAclEnabled(Configuration conf) {
"NONE";
/**
+ * Timeout(msec) for an untracked node to remain in shutdown or decommissioned
+ * state.
+ */
+ public static final String RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC =
+ RM_PREFIX + "node-removal-untracked.timeout-ms";
+ public static final int
+ DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC = 60000;
+
+ /**
* RM proxy users' prefix
*/
public static final String RM_PROXY_USER_PREFIX = RM_PREFIX + "proxyuser.";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index cb3c73a..5194ba2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2780,4 +2780,17 @@
yarn.timeline-service.webapp.rest-csrf.methods-to-ignore
GET,OPTIONS,HEAD
+
+
+
+ The least amount of time(msec.) an inactive (decommissioned or shutdown) node can
+ stay in the nodes list of the resourcemanager after being declared untracked.
+ A node is marked untracked if and only if it is absent from both include and
+ exclude nodemanager lists on the RM. All inactive nodes are checked twice per
+ timeout interval or every 10 minutes, whichever is lesser, and marked appropriately.
+ The same is done when refreshNodes command (graceful or otherwise) is invoked.
+
+ yarn.resourcemanager.node-removal-untracked.timeout-ms
+ 60000
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
index ec2708e..65a9d94 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.HostsFileReader;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -68,6 +69,8 @@
private String excludesFile;
private Resolver resolver;
+ private Timer removalTimer;
+ private int nodeRemovalCheckInterval;
public NodesListManager(RMContext rmContext) {
super(NodesListManager.class.getName());
@@ -105,9 +108,56 @@ protected void serviceInit(Configuration conf) throws Exception {
} catch (IOException ioe) {
disableHostsFileReader(ioe);
}
+
+ final int nodeRemovalTimeout =
+ conf.getInt(
+ YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
+ YarnConfiguration.
+ DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
+ nodeRemovalCheckInterval = (Math.min(nodeRemovalTimeout/2,
+ 600000));
+ removalTimer = new Timer("Node Removal Timer");
+
+ removalTimer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ long now = Time.monotonicNow();
+ for (Map.Entry entry :
+ rmContext.getInactiveRMNodes().entrySet()) {
+ NodeId nodeId = entry.getKey();
+ RMNode rmNode = entry.getValue();
+ if (isUntrackedNode(rmNode.getHostName())) {
+ if (rmNode.getUntrackedTimeStamp() == 0) {
+ rmNode.setUntrackedTimeStamp(now);
+ } else if (now - rmNode.getUntrackedTimeStamp() >
+ nodeRemovalTimeout) {
+ RMNode result = rmContext.getInactiveRMNodes().remove(nodeId);
+ if (result != null) {
+ ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
+ if (rmNode.getState() == NodeState.SHUTDOWN) {
+ clusterMetrics.decrNumShutdownNMs();
+ } else {
+ clusterMetrics.decrDecommisionedNMs();
+ }
+ LOG.info("Removed "+result.getHostName() +
+ " from inactive nodes list");
+ }
+ }
+ } else {
+ rmNode.setUntrackedTimeStamp(0);
+ }
+ }
+ }
+ }, nodeRemovalCheckInterval, nodeRemovalCheckInterval);
+
super.serviceInit(conf);
}
+ @Override
+ public void serviceStop() {
+ removalTimer.cancel();
+ }
+
private void printConfiguredHosts() {
if (!LOG.isDebugEnabled()) {
return;
@@ -131,10 +181,13 @@ public void refreshNodes(Configuration yarnConf) throws IOException,
for (NodeId nodeId: rmContext.getRMNodes().keySet()) {
if (!isValidNode(nodeId.getHost())) {
+ RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
+ RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
+ new RMNodeEvent(nodeId, nodeEventType));
}
}
+ updateInactiveNodes();
}
private void refreshHostsReader(Configuration yarnConf) throws IOException,
@@ -172,6 +225,16 @@ private void setDecomissionedNMs() {
}
@VisibleForTesting
+ public int getNodeRemovalCheckInterval() {
+ return nodeRemovalCheckInterval;
+ }
+
+ @VisibleForTesting
+ public void setNodeRemovalCheckInterval(int interval) {
+ this.nodeRemovalCheckInterval = interval;
+ }
+
+ @VisibleForTesting
public Resolver getResolver() {
return resolver;
}
@@ -374,6 +437,33 @@ private HostsFileReader createHostsFileReader(String includesFile,
return hostsReader;
}
+ private void updateInactiveNodes() {
+ long now = Time.monotonicNow();
+ for(Entry entry :
+ rmContext.getInactiveRMNodes().entrySet()) {
+ NodeId nodeId = entry.getKey();
+ RMNode rmNode = entry.getValue();
+ if (isUntrackedNode(nodeId.getHost()) &&
+ rmNode.getUntrackedTimeStamp() == 0) {
+ rmNode.setUntrackedTimeStamp(now);
+ }
+ }
+ }
+
+ public boolean isUntrackedNode(String hostName) {
+ boolean untracked;
+ String ip = resolver.resolve(hostName);
+
+ synchronized (hostsReader) {
+ Set hostsList = hostsReader.getHosts();
+ Set excludeList = hostsReader.getExcludedHosts();
+ untracked = !hostsList.isEmpty() &&
+ !hostsList.contains(hostName) && !hostsList.contains(ip) &&
+ !excludeList.contains(hostName) && !excludeList.contains(ip);
+ }
+ return untracked;
+ }
+
/**
* Refresh the nodes gracefully
*
@@ -384,11 +474,13 @@ private HostsFileReader createHostsFileReader(String includesFile,
public void refreshNodesGracefully(Configuration conf) throws IOException,
YarnException {
refreshHostsReader(conf);
- for (Entry entry:rmContext.getRMNodes().entrySet()) {
+ for (Entry entry : rmContext.getRMNodes().entrySet()) {
NodeId nodeId = entry.getKey();
if (!isValidNode(nodeId.getHost())) {
+ RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
+ RMNodeEventType.SHUTDOWN : RMNodeEventType.GRACEFUL_DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeEvent(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION));
+ new RMNodeEvent(nodeId, nodeEventType));
} else {
// Recommissioning the nodes
if (entry.getValue().getState() == NodeState.DECOMMISSIONING) {
@@ -397,6 +489,7 @@ public void refreshNodesGracefully(Configuration conf) throws IOException,
}
}
}
+ updateInactiveNodes();
}
/**
@@ -420,8 +513,11 @@ public void refreshNodesGracefully(Configuration conf) throws IOException,
public void refreshNodesForcefully() {
for (Entry entry : rmContext.getRMNodes().entrySet()) {
if (entry.getValue().getState() == NodeState.DECOMMISSIONING) {
+ RMNodeEventType nodeEventType =
+ isUntrackedNode(entry.getKey().getHost()) ?
+ RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeEvent(entry.getKey(), RMNodeEventType.DECOMMISSION));
+ new RMNodeEvent(entry.getKey(), nodeEventType));
}
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index e19d55e..1318d58 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -87,7 +87,7 @@
acceptedStates.contains(NodeState.LOST) ||
acceptedStates.contains(NodeState.REBOOTED)) {
for (RMNode rmNode : context.getInactiveRMNodes().values()) {
- if (acceptedStates.contains(rmNode.getState())) {
+ if ((rmNode != null) && acceptedStates.contains(rmNode.getState())) {
results.add(rmNode);
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index b0bc565..238e5bc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -320,7 +320,8 @@ public RegisterNodeManagerResponse registerNodeManager(
}
// Check if this node is a 'valid' node
- if (!this.nodesListManager.isValidNode(host)) {
+ if (!this.nodesListManager.isValidNode(host) ||
+ this.nodesListManager.isUntrackedNode(host)) {
String message =
"Disallowed NodeManager from " + host
+ ", Sending SHUTDOWN signal to the NodeManager.";
@@ -451,8 +452,9 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
// 1. Check if it's a valid (i.e. not excluded) node, if not, see if it is
// in decommissioning.
- if (!this.nodesListManager.isValidNode(nodeId.getHost())
- && !isNodeInDecommissioning(nodeId)) {
+ if ((!this.nodesListManager.isValidNode(nodeId.getHost()) &&
+ !isNodeInDecommissioning(nodeId)) ||
+ this.nodesListManager.isUntrackedNode(nodeId.getHost())) {
String message =
"Disallowed NodeManager nodeId: " + nodeId + " hostname: "
+ nodeId.getHost();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index d8df9f1..e599576 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -168,4 +168,8 @@ public void updateNodeHeartbeatResponseForContainersDecreasing(
NodeHeartbeatResponse response);
public List pullNewlyIncreasedContainers();
+
+ long getUntrackedTimeStamp();
+
+ void setUntrackedTimeStamp(long timer);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 5f8317e..4260861 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -39,6 +39,7 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -120,6 +121,7 @@
private long lastHealthReportTime;
private String nodeManagerVersion;
+ private long timeStamp;
/* Aggregated resource utilization for the containers. */
private ResourceUtilization containersUtilization;
/* Resource utilization for the node. */
@@ -259,6 +261,9 @@
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
+ .addTransition(NodeState.DECOMMISSIONING, NodeState.SHUTDOWN,
+ RMNodeEventType.SHUTDOWN,
+ new DeactivateNodeTransition(NodeState.SHUTDOWN))
// TODO (in YARN-3223) update resource when container finished.
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
@@ -346,6 +351,7 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
this.healthReport = "Healthy";
this.lastHealthReportTime = System.currentTimeMillis();
this.nodeManagerVersion = nodeManagerVersion;
+ this.timeStamp = 0;
this.latestNodeHeartBeatResponse.setResponseId(0);
@@ -1011,7 +1017,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
}
/**
- * Put a node in deactivated (decommissioned) status.
+ * Put a node in deactivated (decommissioned or shutdown) status.
* @param rmNode
* @param finalState
*/
@@ -1028,6 +1034,10 @@ public static void deactivateNode(RMNodeImpl rmNode, NodeState finalState) {
LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
+ finalState);
rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode);
+ if (finalState == NodeState.SHUTDOWN &&
+ rmNode.context.getNodesListManager().isUntrackedNode(rmNode.hostName)) {
+ rmNode.setUntrackedTimeStamp(Time.monotonicNow());
+ }
}
/**
@@ -1383,4 +1393,14 @@ private void handleLogAggregationStatus(
public Resource getOriginalTotalCapability() {
return this.originalTotalCapability;
}
+
+ @Override
+ public long getUntrackedTimeStamp() {
+ return this.timeStamp;
+ }
+
+ @Override
+ public void setUntrackedTimeStamp(long ts) {
+ this.timeStamp = ts;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 89aff29..921b18e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -260,6 +260,15 @@ public ResourceUtilization getAggregatedContainersUtilization() {
public ResourceUtilization getNodeUtilization() {
return this.nodeUtilization;
}
+
+ @Override
+ public long getUntrackedTimeStamp() {
+ return 0;
+ }
+
+ @Override
+ public void setUntrackedTimeStamp(long timeStamp) {
+ }
};
private static RMNode buildRMNode(int rack, final Resource perNode,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index 9ed79a3..dd37b67 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -31,6 +31,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
@@ -48,8 +50,6 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
@@ -141,12 +141,12 @@ public void testDecommissionWithIncludeHosts() throws Exception {
rm.getNodesListManager().refreshNodes(conf);
- checkDecommissionedNMCount(rm, ++metricCount);
+ checkShutdownNMCount(rm, ++metricCount);
nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
Assert
- .assertEquals(1, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
+ .assertEquals(1, ClusterMetrics.getMetrics().getNumShutdownNMs());
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN
@@ -155,7 +155,8 @@ public void testDecommissionWithIncludeHosts() throws Exception {
nodeHeartbeat = nm3.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
Assert.assertEquals(metricCount, ClusterMetrics.getMetrics()
- .getNumDecommisionedNMs());
+ .getNumShutdownNMs());
+ rm.stop();
}
/**
@@ -228,7 +229,7 @@ public void testAddNewIncludePathToConfiguration() throws Exception {
MockNM nm2 = rm.registerNode("host2:5678", 10240);
ClusterMetrics metrics = ClusterMetrics.getMetrics();
assert(metrics != null);
- int initialMetricCount = metrics.getNumDecommisionedNMs();
+ int initialMetricCount = metrics.getNumShutdownNMs();
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals(
NodeAction.NORMAL,
@@ -241,16 +242,16 @@ public void testAddNewIncludePathToConfiguration() throws Exception {
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
.getAbsolutePath());
rm.getNodesListManager().refreshNodes(conf);
- checkDecommissionedNMCount(rm, ++initialMetricCount);
+ checkShutdownNMCount(rm, ++initialMetricCount);
nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals(
- "Node should not have been decomissioned.",
+ "Node should not have been shutdown.",
NodeAction.NORMAL,
nodeHeartbeat.getNodeAction());
- nodeHeartbeat = nm2.nodeHeartbeat(true);
- Assert.assertEquals("Node should have been decomissioned but is in state" +
- nodeHeartbeat.getNodeAction(),
- NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction());
+ NodeState nodeState =
+ rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId()).getState();
+ Assert.assertEquals("Node should have been shutdown but is in state" +
+ nodeState, NodeState.SHUTDOWN, nodeState);
}
/**
@@ -1123,8 +1124,6 @@ public void testInvalidNMUnregistration() throws Exception {
rm.start();
ResourceTrackerService resourceTrackerService = rm
.getResourceTrackerService();
- int shutdownNMsCount = ClusterMetrics.getMetrics()
- .getNumShutdownNMs();
int decommisionedNMsCount = ClusterMetrics.getMetrics()
.getNumDecommisionedNMs();
@@ -1149,10 +1148,12 @@ public void testInvalidNMUnregistration() throws Exception {
rm.getNodesListManager().refreshNodes(conf);
NodeHeartbeatResponse heartbeatResponse = nm1.nodeHeartbeat(true);
Assert.assertEquals(NodeAction.SHUTDOWN, heartbeatResponse.getNodeAction());
+ int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
checkShutdownNMCount(rm, shutdownNMsCount);
- checkDecommissionedNMCount(rm, ++decommisionedNMsCount);
+ checkDecommissionedNMCount(rm, decommisionedNMsCount);
request.setNodeId(nm1.getNodeId());
resourceTrackerService.unRegisterNodeManager(request);
+ shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
checkShutdownNMCount(rm, shutdownNMsCount);
checkDecommissionedNMCount(rm, decommisionedNMsCount);
@@ -1168,8 +1169,9 @@ public void testInvalidNMUnregistration() throws Exception {
rm.getNodesListManager().refreshNodes(conf);
request.setNodeId(nm2.getNodeId());
resourceTrackerService.unRegisterNodeManager(request);
- checkShutdownNMCount(rm, shutdownNMsCount);
- checkDecommissionedNMCount(rm, ++decommisionedNMsCount);
+ checkShutdownNMCount(rm, ++shutdownNMsCount);
+ checkDecommissionedNMCount(rm, decommisionedNMsCount);
+ rm.stop();
}
@Test(timeout = 30000)
@@ -1304,6 +1306,186 @@ public void testIncorrectRecommission() throws Exception {
rm.stop();
}
+ /**
+ * Remove a node from all lists and check if its forgotten
+ */
+ @Test
+ public void testNodeRemovalNormally() throws Exception {
+ testNodeRemovalUtil(false);
+ }
+
+ @Test
+ public void testNodeRemovalGracefully() throws Exception {
+ testNodeRemovalUtil(true);
+ }
+
+ public void refreshNodesOption(boolean doGraceful, Configuration conf)
+ throws Exception {
+ if (doGraceful) {
+ rm.getNodesListManager().refreshNodesGracefully(conf);
+ } else {
+ rm.getNodesListManager().refreshNodes(conf);
+ }
+ }
+
+ public void testNodeRemovalUtil(boolean doGraceful) throws Exception {
+ Configuration conf = new Configuration();
+ int timeoutValue = 500;
+ File excludeHostFile = new File(TEMP_DIR + File.separator +
+ "excludeHostFile.txt");
+ conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, "");
+ conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, "");
+ conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
+ timeoutValue);
+ CountDownLatch latch = new CountDownLatch(1);
+ rm = new MockRM(conf);
+ rm.init(conf);
+ rm.start();
+ RMContext rmContext = rm.getRMContext();
+ refreshNodesOption(doGraceful, conf);
+ MockNM nm1 = rm.registerNode("host1:1234", 5120);
+ MockNM nm2 = rm.registerNode("host2:5678", 10240);
+ MockNM nm3 = rm.registerNode("localhost:4433", 1024);
+ ClusterMetrics metrics = ClusterMetrics.getMetrics();
+ assert (metrics != null);
+
+ //check all 3 nodes joined in as NORMAL
+ NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
+ Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+ nodeHeartbeat = nm2.nodeHeartbeat(true);
+ Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+ nodeHeartbeat = nm3.nodeHeartbeat(true);
+ Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+ rm.drainEvents();
+ Assert.assertEquals("All 3 nodes should be active",
+ metrics.getNumActiveNMs(), 3);
+
+ //Remove nm2 from include list, should now be shutdown with timer test
+ String ip = NetUtils.normalizeHostName("localhost");
+ writeToHostsFile("host1", ip);
+ conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
+ .getAbsolutePath());
+ refreshNodesOption(doGraceful, conf);
+ nm1.nodeHeartbeat(true);
+ rm.drainEvents();
+ Assert.assertTrue("Node should not be in active node list",
+ !rmContext.getRMNodes().containsKey(nm2.getNodeId()));
+
+ RMNode rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertEquals("Node should be in inactive node list",
+ rmNode.getState(), NodeState.SHUTDOWN);
+ Assert.assertEquals("Active nodes should be 2",
+ metrics.getNumActiveNMs(), 2);
+ Assert.assertEquals("Shutdown nodes should be 1",
+ metrics.getNumShutdownNMs(), 1);
+
+ int nodeRemovalTimeout =
+ conf.getInt(
+ YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
+ YarnConfiguration.
+ DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
+ int nodeRemovalInterval =
+ rmContext.getNodesListManager().getNodeRemovalCheckInterval();
+ long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout;
+ latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
+
+ rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertEquals("Node should have been forgotten!",
+ rmNode, null);
+ Assert.assertEquals("Shutdown nodes should be 0 now",
+ metrics.getNumShutdownNMs(), 0);
+
+ //Check node removal and re-addition before timer expires
+ writeToHostsFile("host1", ip, "host2");
+ refreshNodesOption(doGraceful, conf);
+ nm2 = rm.registerNode("host2:5678", 10240);
+ rm.drainEvents();
+ writeToHostsFile("host1", ip);
+ refreshNodesOption(doGraceful, conf);
+ rm.drainEvents();
+ rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertEquals("Node should be shutdown",
+ rmNode.getState(), NodeState.SHUTDOWN);
+ Assert.assertEquals("Active nodes should be 2",
+ metrics.getNumActiveNMs(), 2);
+ Assert.assertEquals("Shutdown nodes should be 1",
+ metrics.getNumShutdownNMs(), 1);
+
+ //add back the node before timer expires
+ latch.await(maxThreadSleeptime - 2000, TimeUnit.MILLISECONDS);
+ writeToHostsFile("host1", ip, "host2");
+ refreshNodesOption(doGraceful, conf);
+ nm2 = rm.registerNode("host2:5678", 10240);
+ nodeHeartbeat = nm2.nodeHeartbeat(true);
+ rm.drainEvents();
+ Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+ Assert.assertEquals("Shutdown nodes should be 0 now",
+ metrics.getNumShutdownNMs(), 0);
+ Assert.assertEquals("All 3 nodes should be active",
+ metrics.getNumActiveNMs(), 3);
+
+ //Decommission this node, check timer doesn't remove it
+ writeToHostsFile("host1", "host2", ip);
+ writeToHostsFile(excludeHostFile, "host2");
+ conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile
+ .getAbsolutePath());
+ refreshNodesOption(doGraceful, conf);
+ rm.drainEvents();
+ rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
+ rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
+ (rmNode.getState() == NodeState.DECOMMISSIONED) ||
+ (rmNode.getState() == NodeState.DECOMMISSIONING));
+ if (rmNode.getState() == NodeState.DECOMMISSIONED) {
+ Assert.assertEquals("Decommissioned/ing nodes should be 1 now",
+ metrics.getNumDecommisionedNMs(), 1);
+ }
+ latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
+
+ rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
+ rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
+ (rmNode.getState() == NodeState.DECOMMISSIONED) ||
+ (rmNode.getState() == NodeState.DECOMMISSIONING));
+ if (rmNode.getState() == NodeState.DECOMMISSIONED) {
+ Assert.assertEquals("Decommissioned/ing nodes should be 1 now",
+ metrics.getNumDecommisionedNMs(), 1);
+ }
+
+ //Test decommed/ing node that transitions to untracked,timer should remove
+ writeToHostsFile("host1", ip, "host2");
+ writeToHostsFile(excludeHostFile, "host2");
+ refreshNodesOption(doGraceful, conf);
+ nm1.nodeHeartbeat(true);
+ //nm2.nodeHeartbeat(true);
+ nm3.nodeHeartbeat(true);
+ latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
+ rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
+ rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertNotEquals("Timer for this node was not canceled!",
+ rmNode, null);
+ Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
+ (rmNode.getState() == NodeState.DECOMMISSIONED) ||
+ (rmNode.getState() == NodeState.DECOMMISSIONING));
+
+ writeToHostsFile("host1", ip);
+ writeToHostsFile(excludeHostFile, "");
+ refreshNodesOption(doGraceful, conf);
+ latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
+ rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
+ rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertEquals("Node should have been forgotten!",
+ rmNode, null);
+ Assert.assertEquals("Shutdown nodes should be 0 now",
+ metrics.getNumDecommisionedNMs(), 0);
+ Assert.assertEquals("Shutdown nodes should be 0 now",
+ metrics.getNumShutdownNMs(), 0);
+ Assert.assertEquals("Active nodes should be 2",
+ metrics.getNumActiveNMs(), 2);
+
+ rm.stop();
+ }
+
private void writeToHostsFile(String... hosts) throws IOException {
writeToHostsFile(hostFile, hosts);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
index 3fd1fd5..4b6ca12 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
@@ -272,8 +272,10 @@ public void testNodesQueryStateLost() throws JSONException, Exception {
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId);
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
info.getString("nodeHTTPAddress"));
- WebServicesTestUtils.checkStringMatch("state", rmNode.getState()
- .toString(), info.getString("state"));
+ if (rmNode != null) {
+ WebServicesTestUtils.checkStringMatch("state",
+ rmNode.getState().toString(), info.getString("state"));
+ }
}
}
@@ -304,8 +306,10 @@ public void testSingleNodeQueryStateLost() throws JSONException, Exception {
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId);
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
info.getString("nodeHTTPAddress"));
- WebServicesTestUtils.checkStringMatch("state",
- rmNode.getState().toString(), info.getString("state"));
+ if (rmNode != null) {
+ WebServicesTestUtils.checkStringMatch("state",
+ rmNode.getState().toString(), info.getString("state"));
+ }
}
@Test