diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index 92d586b..cd8f39d 100644
--- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -199,6 +199,15 @@ public ResourceUtilization getAggregatedContainersUtilization() {
public ResourceUtilization getNodeUtilization() {
return null;
}
+
+ @Override
+ public long getTimeStamp() {
+ return 0;
+ }
+
+ @Override
+ public void setTimeStamp(long timeStamp) {
+ }
}
public static RMNode newNodeInfo(String rackName, String hostName,
diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index 2e9cccb..12339f5 100644
--- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -188,4 +188,13 @@ public ResourceUtilization getAggregatedContainersUtilization() {
public ResourceUtilization getNodeUtilization() {
return node.getNodeUtilization();
}
+
+ @Override
+ public long getTimeStamp() {
+ return 0;
+ }
+
+ @Override
+ public void setTimeStamp(long timeStamp) {
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 37c81ec..8524409 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -639,6 +639,19 @@ private static void addDeprecatedKeys() {
"NONE";
/**
+ * Interval(msec) for checking removal of shutdown and decommissioned nodes.
+ */
+ public static final String RM_NODE_REMOVAL_CHK_INTERVAL_MSEC = RM_PREFIX
+ + "node-removal.interval-ms";
+ public static final int DEFAULT_RM_NODE_REMOVAL_CHK_INTERVAL_MSEC = 30000;
+ /**
+ * Timeout(msec) for an untracked node to remain in shutdown or decommissioned
+ * state.
+ */
+ public static final String RM_NODE_REMOVAL_TIMEOUT_MSEC = RM_PREFIX
+ + "node-removal.timeout-ms";
+ public static final int DEFAULT_RM_NODE_REMOVAL_TIMEOUT_MSEC = 10000;
+ /**
* RM proxy users' prefix
*/
public static final String RM_PROXY_USER_PREFIX = RM_PREFIX + "proxyuser.";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 49cced6..9e019c7 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2508,4 +2508,21 @@
yarn.node-labels.fs-store.impl.class
org.apache.hadoop.yarn.nodelabels.FileSystemNodeLabelsStore
+
+
+
+ The amount of time(msec.) an inactive node can stay in the RMNodes
+ list after being declared untracked.
+
+ yarn.resourcemanager.node-removal.timeout-ms
+ 10000
+
+
+
+ The interval(msec.) for the timer that checks NM candidates
+ that are due for removal from inactive list.
+
+ yarn.resourcemanager.node-removal.interval-ms
+ 30000
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
index 4d9922b..c4f2989 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
@@ -66,6 +66,7 @@
private String excludesFile;
private Resolver resolver;
+ private Timer removalTimer;
public NodesListManager(RMContext rmContext) {
super(NodesListManager.class.getName());
@@ -102,9 +103,51 @@ protected void serviceInit(Configuration conf) throws Exception {
} catch (IOException ioe) {
disableHostsFileReader(ioe);
}
+ removalTimer = new Timer("Node Removal Timer");
+ int nodeCheckInterval =
+ conf.getInt(YarnConfiguration.RM_NODE_REMOVAL_CHK_INTERVAL_MSEC,
+ YarnConfiguration.DEFAULT_RM_NODE_REMOVAL_CHK_INTERVAL_MSEC);
+ final int nodeRemovalTimeout =
+ conf.getInt(YarnConfiguration.RM_NODE_REMOVAL_TIMEOUT_MSEC,
+ YarnConfiguration.DEFAULT_RM_NODE_REMOVAL_TIMEOUT_MSEC);
+
+ removalTimer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ for (Map.Entry entry :
+ rmContext.getInactiveRMNodes().entrySet()) {
+ NodeId nodeId = entry.getKey();
+ RMNode rmNode = entry.getValue();
+ if (rmNode.getTimeStamp() == 0 &&
+ isUntrackedNode(rmNode.getHostName())) {
+ rmNode.setTimeStamp(System.currentTimeMillis());
+ } else if (!isUntrackedNode(rmNode.getHostName())) {
+ rmNode.setTimeStamp(0);
+ } else if (System.currentTimeMillis() - rmNode.getTimeStamp() >
+ nodeRemovalTimeout) {
+ RMNode result = rmContext.getInactiveRMNodes().remove(nodeId);
+ if (result != null) {
+ ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
+ if (rmNode.getState() == NodeState.SHUTDOWN) {
+ clusterMetrics.decrNumShutdownNMs();
+ } else {
+ clusterMetrics.decrDecommisionedNMs();
+ }
+ }
+ }
+ }
+ }
+ }, nodeCheckInterval, nodeCheckInterval);
+
super.serviceInit(conf);
}
+ @Override
+ public void serviceStop() {
+ removalTimer.cancel();
+ removalTimer.purge();
+ }
+
private void printConfiguredHosts() {
if (!LOG.isDebugEnabled()) {
return;
@@ -128,8 +171,19 @@ public void refreshNodes(Configuration yarnConf) throws IOException,
for (NodeId nodeId: rmContext.getRMNodes().keySet()) {
if (!isValidNode(nodeId.getHost())) {
+ RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
+ RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
+ new RMNodeEvent(nodeId, nodeEventType));
+ }
+ }
+
+ for(Entry entry :
+ rmContext.getInactiveRMNodes().entrySet()) {
+ NodeId nodeId = entry.getKey();
+ RMNode rmNode = entry.getValue();
+ if (isUntrackedNode(nodeId.getHost()) && rmNode.getTimeStamp() == 0) {
+ rmNode.setTimeStamp(System.currentTimeMillis());
}
}
}
@@ -365,6 +419,22 @@ private HostsFileReader createHostsFileReader(String includesFile,
return hostsReader;
}
+ public boolean isUntrackedNode(String hostName) {
+ boolean untracked;
+ String ip = resolver.resolve(hostName);
+ includesFile =
+ conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
+ YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH);
+ synchronized (hostsReader) {
+ Set hostsList = hostsReader.getHosts();
+ Set excludeList = hostsReader.getExcludedHosts();
+ untracked = !hostsList.isEmpty() &&
+ !hostsList.contains(hostName) && !hostsList.contains(ip) &&
+ !excludeList.contains(hostName) && !excludeList.contains(ip);
+ }
+ return untracked;
+ }
+
/**
* Refresh the nodes gracefully
*
@@ -375,11 +445,13 @@ private HostsFileReader createHostsFileReader(String includesFile,
public void refreshNodesGracefully(Configuration conf) throws IOException,
YarnException {
refreshHostsReader(conf);
- for (Entry entry:rmContext.getRMNodes().entrySet()) {
+ for (Entry entry : rmContext.getRMNodes().entrySet()) {
NodeId nodeId = entry.getKey();
if (!isValidNode(nodeId.getHost())) {
+ RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
+ RMNodeEventType.SHUTDOWN : RMNodeEventType.GRACEFUL_DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeEvent(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION));
+ new RMNodeEvent(nodeId, nodeEventType));
} else {
// Recommissioning the nodes
if (entry.getValue().getState() == NodeState.DECOMMISSIONING
@@ -389,6 +461,15 @@ public void refreshNodesGracefully(Configuration conf) throws IOException,
}
}
}
+
+ for (Entry entry :
+ rmContext.getInactiveRMNodes().entrySet()) {
+ RMNode rmNode = entry.getValue();
+ if (isUntrackedNode(entry.getKey().getHost()) &&
+ rmNode.getTimeStamp() == 0) {
+ rmNode.setTimeStamp(System.currentTimeMillis());
+ }
+ }
}
/**
@@ -412,8 +493,11 @@ public void refreshNodesGracefully(Configuration conf) throws IOException,
public void refreshNodesForcefully() {
for (Entry entry : rmContext.getRMNodes().entrySet()) {
if (entry.getValue().getState() == NodeState.DECOMMISSIONING) {
+ RMNodeEventType nodeEventType =
+ isUntrackedNode(entry.getKey().getHost()) ?
+ RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeEvent(entry.getKey(), RMNodeEventType.DECOMMISSION));
+ new RMNodeEvent(entry.getKey(), nodeEventType));
}
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index cc30593..015a712 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -86,7 +86,7 @@
acceptedStates.contains(NodeState.LOST) ||
acceptedStates.contains(NodeState.REBOOTED)) {
for (RMNode rmNode : context.getInactiveRMNodes().values()) {
- if (acceptedStates.contains(rmNode.getState())) {
+ if ((rmNode != null) && acceptedStates.contains(rmNode.getState())) {
results.add(rmNode);
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 902244b..1967310 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -285,7 +285,8 @@ public RegisterNodeManagerResponse registerNodeManager(
}
// Check if this node is a 'valid' node
- if (!this.nodesListManager.isValidNode(host)) {
+ if (!this.nodesListManager.isValidNode(host) ||
+ this.nodesListManager.isUntrackedNode(host)) {
String message =
"Disallowed NodeManager from " + host
+ ", Sending SHUTDOWN signal to the NodeManager.";
@@ -403,8 +404,9 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
// 1. Check if it's a valid (i.e. not excluded) node, if not, see if it is
// in decommissioning.
- if (!this.nodesListManager.isValidNode(nodeId.getHost())
- && !isNodeInDecommissioning(nodeId)) {
+ if ((!this.nodesListManager.isValidNode(nodeId.getHost()) &&
+ !isNodeInDecommissioning(nodeId)) ||
+ this.nodesListManager.isUntrackedNode(nodeId.getHost())) {
String message =
"Disallowed NodeManager nodeId: " + nodeId + " hostname: "
+ nodeId.getHost();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index d8df9f1..1725fa9 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -168,4 +168,8 @@ public void updateNodeHeartbeatResponseForContainersDecreasing(
NodeHeartbeatResponse response);
public List pullNewlyIncreasedContainers();
+
+ long getTimeStamp();
+
+ void setTimeStamp(long timer);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 8448287..9d800b2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -115,6 +115,7 @@
private long lastHealthReportTime;
private String nodeManagerVersion;
+ private long timeStamp;
/* Aggregated resource utilization for the containers. */
private ResourceUtilization containersUtilization;
/* Resource utilization for the node. */
@@ -248,6 +249,9 @@
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
+ .addTransition(NodeState.DECOMMISSIONING, NodeState.SHUTDOWN,
+ RMNodeEventType.SHUTDOWN,
+ new DeactivateNodeTransition(NodeState.SHUTDOWN))
// TODO (in YARN-3223) update resource when container finished.
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
@@ -754,11 +758,12 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
List containers = null;
NodeId nodeId = rmNode.nodeId;
- if (rmNode.context.getInactiveRMNodes().containsKey(nodeId)) {
+ RMNode previousRMNode = rmNode.context.getInactiveRMNodes().get(nodeId);
+ if (previousRMNode != null) {
// Old node rejoining
- RMNode previouRMNode = rmNode.context.getInactiveRMNodes().get(nodeId);
+ previousRMNode.setTimeStamp(0);
rmNode.context.getInactiveRMNodes().remove(nodeId);
- rmNode.updateMetricsForRejoinedNode(previouRMNode.getState());
+ rmNode.updateMetricsForRejoinedNode(previousRMNode.getState());
} else {
// Increment activeNodes explicitly because this is a new node.
ClusterMetrics.getMetrics().incrNumActiveNodes();
@@ -978,7 +983,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
}
/**
- * Put a node in deactivated (decommissioned) status.
+ * Put a node in deactivated (decommissioned) or shutdown status.
* @param rmNode
* @param finalState
*/
@@ -991,6 +996,10 @@ public static void deactivateNode(RMNodeImpl rmNode, NodeState finalState) {
LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
+ finalState);
rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode);
+ if (finalState == NodeState.SHUTDOWN &&
+ rmNode.context.getNodesListManager().isUntrackedNode(rmNode.hostName)) {
+ rmNode.setTimeStamp(System.currentTimeMillis());
+ }
}
/**
@@ -1340,4 +1349,14 @@ private void handleLogAggregationStatus(
writeLock.unlock();
}
}
+
+ @Override
+ public long getTimeStamp() {
+ return this.timeStamp;
+ }
+
+ @Override
+ public void setTimeStamp(long ts) {
+ this.timeStamp = ts;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 89aff29..423536d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -260,6 +260,15 @@ public ResourceUtilization getAggregatedContainersUtilization() {
public ResourceUtilization getNodeUtilization() {
return this.nodeUtilization;
}
+
+ @Override
+ public long getTimeStamp() {
+ return 0;
+ }
+
+ @Override
+ public void setTimeStamp(long timeStamp) {
+ }
};
private static RMNode buildRMNode(int rack, final Resource perNode,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index e42ed91..c925185 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -126,7 +126,7 @@ public void testDecommissionWithIncludeHosts() throws Exception {
ClusterMetrics metrics = ClusterMetrics.getMetrics();
assert(metrics != null);
- int metricCount = metrics.getNumDecommisionedNMs();
+ int metricCount = metrics.getNumShutdownNMs();
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
@@ -141,12 +141,12 @@ public void testDecommissionWithIncludeHosts() throws Exception {
rm.getNodesListManager().refreshNodes(conf);
- checkDecommissionedNMCount(rm, ++metricCount);
+ checkShutdownNMCount(rm, ++metricCount);
nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
Assert
- .assertEquals(1, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
+ .assertEquals(1, ClusterMetrics.getMetrics().getNumShutdownNMs());
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN
@@ -155,7 +155,8 @@ public void testDecommissionWithIncludeHosts() throws Exception {
nodeHeartbeat = nm3.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
Assert.assertEquals(metricCount, ClusterMetrics.getMetrics()
- .getNumDecommisionedNMs());
+ .getNumShutdownNMs());
+ rm.stop();
}
/**
@@ -234,7 +235,7 @@ public void testAddNewIncludePathToConfiguration() throws Exception {
MockNM nm2 = rm.registerNode("host2:5678", 10240);
ClusterMetrics metrics = ClusterMetrics.getMetrics();
assert(metrics != null);
- int initialMetricCount = metrics.getNumDecommisionedNMs();
+ int initialMetricCount = metrics.getNumShutdownNMs();
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals(
NodeAction.NORMAL,
@@ -247,16 +248,16 @@ public void testAddNewIncludePathToConfiguration() throws Exception {
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
.getAbsolutePath());
rm.getNodesListManager().refreshNodes(conf);
- checkDecommissionedNMCount(rm, ++initialMetricCount);
+ checkShutdownNMCount(rm, ++initialMetricCount);
nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals(
- "Node should not have been decomissioned.",
+ "Node should not have been shutdown.",
NodeAction.NORMAL,
nodeHeartbeat.getNodeAction());
- nodeHeartbeat = nm2.nodeHeartbeat(true);
- Assert.assertEquals("Node should have been decomissioned but is in state" +
- nodeHeartbeat.getNodeAction(),
- NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction());
+ NodeState nodeState =
+ rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId()).getState();
+ Assert.assertEquals("Node should have been shutdown but is in state" +
+ nodeState, NodeState.SHUTDOWN, nodeState);
}
/**
@@ -824,14 +825,14 @@ public void testNodeRegistrationWithMinimumAllocations() throws Exception {
req.setResource(capability);
RegisterNodeManagerResponse response1 =
resourceTrackerService.registerNodeManager(req);
- Assert.assertEquals(NodeAction.SHUTDOWN,response1.getNodeAction());
+ Assert.assertEquals(NodeAction.SHUTDOWN, response1.getNodeAction());
capability.setMemory(2048);
capability.setVirtualCores(1);
req.setResource(capability);
RegisterNodeManagerResponse response2 =
resourceTrackerService.registerNodeManager(req);
- Assert.assertEquals(NodeAction.SHUTDOWN,response2.getNodeAction());
+ Assert.assertEquals(NodeAction.SHUTDOWN, response2.getNodeAction());
capability.setMemory(1024);
capability.setVirtualCores(4);
@@ -845,7 +846,7 @@ public void testNodeRegistrationWithMinimumAllocations() throws Exception {
req.setResource(capability);
RegisterNodeManagerResponse response4 =
resourceTrackerService.registerNodeManager(req);
- Assert.assertEquals(NodeAction.NORMAL,response4.getNodeAction());
+ Assert.assertEquals(NodeAction.NORMAL, response4.getNodeAction());
}
@Test
@@ -1135,8 +1136,6 @@ public void testInvalidNMUnregistration() throws Exception {
rm.start();
ResourceTrackerService resourceTrackerService = rm
.getResourceTrackerService();
- int shutdownNMsCount = ClusterMetrics.getMetrics()
- .getNumShutdownNMs();
int decommisionedNMsCount = ClusterMetrics.getMetrics()
.getNumDecommisionedNMs();
@@ -1161,10 +1160,12 @@ public void testInvalidNMUnregistration() throws Exception {
rm.getNodesListManager().refreshNodes(conf);
NodeHeartbeatResponse heartbeatResponse = nm1.nodeHeartbeat(true);
Assert.assertEquals(NodeAction.SHUTDOWN, heartbeatResponse.getNodeAction());
+ int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
checkShutdownNMCount(rm, shutdownNMsCount);
- checkDecommissionedNMCount(rm, ++decommisionedNMsCount);
+ checkDecommissionedNMCount(rm, decommisionedNMsCount);
request.setNodeId(nm1.getNodeId());
resourceTrackerService.unRegisterNodeManager(request);
+ shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
checkShutdownNMCount(rm, shutdownNMsCount);
checkDecommissionedNMCount(rm, decommisionedNMsCount);
@@ -1180,8 +1181,190 @@ public void testInvalidNMUnregistration() throws Exception {
rm.getNodesListManager().refreshNodes(conf);
request.setNodeId(nm2.getNodeId());
resourceTrackerService.unRegisterNodeManager(request);
- checkShutdownNMCount(rm, shutdownNMsCount);
- checkDecommissionedNMCount(rm, ++decommisionedNMsCount);
+ checkShutdownNMCount(rm, ++shutdownNMsCount);
+ checkDecommissionedNMCount(rm, decommisionedNMsCount);
+ rm.stop();
+ }
+
+ /**
+ * Remove a node from all lists and check if its forgotten
+ */
+ @Test
+ public void testNodeRemovalNormally() throws Exception {
+ testNodeRemovalUtil(false);
+ }
+
+ @Test
+ public void testNodeRemovalGracefully() throws Exception {
+ testNodeRemovalUtil(true);
+ }
+
+ public void refreshNodesOption(boolean doGraceful, Configuration conf)
+ throws Exception {
+ if (doGraceful) {
+ rm.getNodesListManager().refreshNodesGracefully(conf);
+ } else {
+ rm.getNodesListManager().refreshNodes(conf);
+ }
+ }
+
+ public void testNodeRemovalUtil(boolean doGraceful) throws Exception {
+ Configuration conf = new Configuration();
+ File excludeHostFile = new File(TEMP_DIR + File.separator +
+ "excludeHostFile.txt");
+ conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, "");
+ conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, "");
+
+ final DrainDispatcher dispatcher = new DrainDispatcher();
+ rm = new MockRM(conf) {
+ @Override
+ protected Dispatcher createDispatcher() {
+ return dispatcher;
+ }
+ };
+ rm.start();
+
+ refreshNodesOption(doGraceful, conf);
+ MockNM nm1 = rm.registerNode("host1:1234", 5120);
+ MockNM nm2 = rm.registerNode("host2:5678", 10240);
+ MockNM nm3 = rm.registerNode("localhost:4433", 1024);
+ dispatcher.await();
+ ClusterMetrics metrics = ClusterMetrics.getMetrics();
+ assert(metrics != null);
+
+ //check all 3 nodes joined in as NORMAL
+ NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
+ Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+ nodeHeartbeat = nm2.nodeHeartbeat(true);
+ Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+ nodeHeartbeat = nm3.nodeHeartbeat(true);
+ Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+ dispatcher.await();
+ Assert.assertEquals("All 3 nodes should be active",
+ metrics.getNumActiveNMs(), 3);
+
+ //Remove nm2 from include list, should now be shutdown with timer test
+ String ip = NetUtils.normalizeHostName("localhost");
+ writeToHostsFile("host1", ip);
+ conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
+ .getAbsolutePath());
+ refreshNodesOption(doGraceful, conf);
+ nm1.nodeHeartbeat(true);
+ dispatcher.await();
+ Assert.assertTrue("Node should not be in active node list",
+ !rm.getRMContext().getRMNodes().containsKey(nm2.getNodeId()));
+
+ RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertEquals("Node should be in inactive node list",
+ rmNode.getState(), NodeState.SHUTDOWN);
+ Assert.assertEquals("Active nodes should be 2",
+ metrics.getNumActiveNMs(), 2);
+ Assert.assertEquals("Shutdown nodes should be 1",
+ metrics.getNumShutdownNMs(), 1);
+
+ int nodeRemovalTimeout =
+ conf.getInt(YarnConfiguration.RM_NODE_REMOVAL_TIMEOUT_MSEC,
+ YarnConfiguration.DEFAULT_RM_NODE_REMOVAL_TIMEOUT_MSEC);
+ int nodeRemovalInterval =
+ conf.getInt(YarnConfiguration.RM_NODE_REMOVAL_CHK_INTERVAL_MSEC,
+ YarnConfiguration.DEFAULT_RM_NODE_REMOVAL_CHK_INTERVAL_MSEC);
+ long maxThreadSleeptime = nodeRemovalInterval+nodeRemovalTimeout;
+ Thread.sleep(maxThreadSleeptime);
+ rmNode = rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertEquals("Node should have been forgotten!",
+ rmNode, null);
+ Assert.assertEquals("Shutdown nodes should be 0 now",
+ metrics.getNumShutdownNMs(), 0);
+
+ //Check node removal and re-addition before timer expires
+ writeToHostsFile("host1", ip, "host2");
+ refreshNodesOption(doGraceful, conf);
+ nm2 = rm.registerNode("host2:5678", 10240);
+ dispatcher.await();
+ writeToHostsFile("host1", ip);
+ refreshNodesOption(doGraceful, conf);
+ dispatcher.await();
+ rmNode = rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertEquals("Node should be shutdown",
+ rmNode.getState(), NodeState.SHUTDOWN);
+ Assert.assertEquals("Active nodes should be 2",
+ metrics.getNumActiveNMs(), 2);
+ Assert.assertEquals("Shutdown nodes should be 1",
+ metrics.getNumShutdownNMs(), 1);
+
+ //add back the node before timer expires
+ Thread.sleep(maxThreadSleeptime - 2000);
+ writeToHostsFile("host1", ip, "host2");
+ refreshNodesOption(doGraceful, conf);
+ nm2 = rm.registerNode("host2:5678", 10240);
+ nodeHeartbeat = nm2.nodeHeartbeat(true);
+ dispatcher.await();
+ Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+ Assert.assertEquals("Shutdown nodes should be 0 now",
+ metrics.getNumShutdownNMs(), 0);
+ Assert.assertEquals("All 3 nodes should be active",
+ metrics.getNumActiveNMs(), 3);
+
+ //Decommission this node, check timer doesn't remove it
+ writeToHostsFile("host1", "host2", ip);
+ writeToExcludeHostsFile(excludeHostFile, "host2");
+ conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile
+ .getAbsolutePath());
+ refreshNodesOption(doGraceful, conf);
+ Thread.sleep(100);
+ rmNode = doGraceful ? rm.getRMContext().getRMNodes().get(nm2.getNodeId()) :
+ rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
+ (rmNode.getState() == NodeState.DECOMMISSIONED) ||
+ ( rmNode.getState() == NodeState.DECOMMISSIONING));
+ if (rmNode.getState() == NodeState.DECOMMISSIONED) {
+ Assert.assertEquals("Decommissioned/ing nodes should be 1 now",
+ metrics.getNumDecommisionedNMs(), 1);
+ }
+ Thread.sleep(maxThreadSleeptime);
+
+ rmNode = doGraceful ? rm.getRMContext().getRMNodes().get(nm2.getNodeId()) :
+ rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
+ (rmNode.getState() == NodeState.DECOMMISSIONED) ||
+ (rmNode.getState() == NodeState.DECOMMISSIONING));
+ if (rmNode.getState() == NodeState.DECOMMISSIONED) {
+ Assert.assertEquals("Decommissioned/ing nodes should be 1 now",
+ metrics.getNumDecommisionedNMs(), 1);
+ }
+
+ //Test decommed/ing node that transitions to untracked,timer should remove
+ writeToHostsFile("host1", ip, "host2");
+ writeToExcludeHostsFile(excludeHostFile, "host2");
+ refreshNodesOption(doGraceful, conf);
+ nm1.nodeHeartbeat(true);
+ //nm2.nodeHeartbeat(true);
+ nm3.nodeHeartbeat(true);
+ Thread.sleep(maxThreadSleeptime);
+ rmNode = doGraceful ? rm.getRMContext().getRMNodes().get(nm2.getNodeId()) :
+ rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertNotEquals("Timer for this node was not canceled!",
+ rmNode, null);
+ Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
+ (rmNode.getState() == NodeState.DECOMMISSIONED) ||
+ (rmNode.getState() == NodeState.DECOMMISSIONING));
+
+ writeToHostsFile("host1", ip);
+ writeToExcludeHostsFile(excludeHostFile, "");
+ refreshNodesOption(doGraceful, conf);
+ Thread.sleep(maxThreadSleeptime);
+ rmNode = doGraceful ? rm.getRMContext().getRMNodes().get(nm2.getNodeId()) :
+ rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertEquals("Node should have been forgotten!",
+ rmNode, null);
+ Assert.assertEquals("Shutdown nodes should be 0 now",
+ metrics.getNumDecommisionedNMs(), 0);
+ Assert.assertEquals("Shutdown nodes should be 0 now",
+ metrics.getNumShutdownNMs(), 0);
+ Assert.assertEquals("Active nodes should be 2",
+ metrics.getNumActiveNMs(), 2);
+
+ rm.stop();
}
private void writeToHostsFile(String... hosts) throws IOException {
@@ -1204,6 +1387,25 @@ private void writeToHostsFile(String... hosts) throws IOException {
}
}
+ private void writeToExcludeHostsFile(File excludeFile,String... hosts) throws IOException {
+ if (!excludeFile.exists()) {
+ TEMP_DIR.mkdirs();
+ excludeFile.createNewFile();
+ }
+ FileOutputStream fStream = null;
+ try {
+ fStream = new FileOutputStream(excludeFile);
+ for (int i = 0; i < hosts.length; i++) {
+ fStream.write(hosts[i].getBytes());
+ fStream.write("\n".getBytes());
+ }
+ } finally {
+ if (fStream != null) {
+ IOUtils.closeStream(fStream);
+ fStream = null;
+ }
+ }
+ }
private void checkDecommissionedNMCount(MockRM rm, int count)
throws InterruptedException {
int waitCount = 0;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
index 3fd1fd5..4b6ca12 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
@@ -272,8 +272,10 @@ public void testNodesQueryStateLost() throws JSONException, Exception {
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId);
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
info.getString("nodeHTTPAddress"));
- WebServicesTestUtils.checkStringMatch("state", rmNode.getState()
- .toString(), info.getString("state"));
+ if (rmNode != null) {
+ WebServicesTestUtils.checkStringMatch("state",
+ rmNode.getState().toString(), info.getString("state"));
+ }
}
}
@@ -304,8 +306,10 @@ public void testSingleNodeQueryStateLost() throws JSONException, Exception {
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId);
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
info.getString("nodeHTTPAddress"));
- WebServicesTestUtils.checkStringMatch("state",
- rmNode.getState().toString(), info.getString("state"));
+ if (rmNode != null) {
+ WebServicesTestUtils.checkStringMatch("state",
+ rmNode.getState().toString(), info.getString("state"));
+ }
}
@Test