diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index d1ccecb..e76530f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -1170,12 +1170,23 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { NodeState initialState = rmNode.getState(); boolean isNodeDecommissioning = initialState.equals(NodeState.DECOMMISSIONING); + if (isNodeDecommissioning) { + List keepAliveApps = statusEvent.getKeepAliveAppIds(); + // no running (and keeping alive) app on this node, get it + // decommissioned. + if (rmNode.runningApplications.isEmpty() && + (keepAliveApps == null || keepAliveApps.isEmpty())) { + RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED); + return NodeState.DECOMMISSIONED; + } + } + if (!remoteNodeHealthStatus.getIsNodeHealthy()) { LOG.info("Node " + rmNode.nodeId + " reported UNHEALTHY with details: " + remoteNodeHealthStatus.getHealthReport()); // if a node in decommissioning receives an unhealthy report, - // it will keep decommissioning. + // it will stay in decommissioning. if (isNodeDecommissioning) { return NodeState.DECOMMISSIONING; } else { @@ -1349,7 +1360,7 @@ private void handleContainerStatus(List containerStatuses) { + " is the first container get launched for application " + containerAppId); } - runningApplications.add(containerAppId); + handleRunningAppOnNode(this, context, containerAppId, nodeId); } // Process running containers diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index e82b93c..7acc862 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer .AllocationExpirationInfo; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; @@ -73,6 +74,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -279,16 +281,17 @@ public void testContainerUpdate() throws InterruptedException{ NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1); RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); node2.handle(new RMNodeStartedEvent(null, null, null)); - + + ApplicationId app0 = BuilderUtils.newApplicationId(0, 0); + ApplicationId app1 = BuilderUtils.newApplicationId(1, 1); ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId( - BuilderUtils.newApplicationAttemptId( - BuilderUtils.newApplicationId(0, 0), 0), 0); + BuilderUtils.newApplicationAttemptId(app0, 0), 0); ContainerId completedContainerIdFromNode2_1 = BuilderUtils.newContainerId( - BuilderUtils.newApplicationAttemptId( - BuilderUtils.newApplicationId(1, 1), 1), 1); + BuilderUtils.newApplicationAttemptId(app1, 1), 1); ContainerId completedContainerIdFromNode2_2 = BuilderUtils.newContainerId( - BuilderUtils.newApplicationAttemptId( - BuilderUtils.newApplicationId(1, 1), 1), 2); + BuilderUtils.newApplicationAttemptId(app1, 1), 2); + rmContext.getRMApps().put(app0, Mockito.mock(RMApp.class)); + rmContext.getRMApps().put(app1, Mockito.mock(RMApp.class)); RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent(null); RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent(null); @@ -652,6 +655,7 @@ public void testUpdateHeartbeatResponseForAppLifeCycle() { NodeId nodeId = node.getNodeID(); ApplicationId runningAppId = BuilderUtils.newApplicationId(0, 1); + rmContext.getRMApps().put(runningAppId, Mockito.mock(RMApp.class)); // Create a running container ContainerId runningContainerId = BuilderUtils.newContainerId( BuilderUtils.newApplicationAttemptId( @@ -919,16 +923,23 @@ public void testResourceUpdateOnRebootedNode() { } // Test unhealthy report on a decommissioning node will make it - // keep decommissioning. + // keep decommissioning as long as there's a running or keep alive app. + // Otherwise, it will go to decommissioned @Test public void testDecommissioningUnhealthy() { RMNodeImpl node = getDecommissioningNode(); NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick", System.currentTimeMillis()); + List keepAliveApps = new ArrayList<>(); + keepAliveApps.add(BuilderUtils.newApplicationId(1, 1)); NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0, - new ArrayList(), null, status, null, null, null); + new ArrayList(), keepAliveApps, status, null, null, + null); node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null)); Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState()); + nodeStatus.setKeepAliveApplications(null); + node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null)); + Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState()); } @Test @@ -951,6 +962,7 @@ public void testContainerExpire() throws Exception { ApplicationId.newInstance(System.currentTimeMillis(), 1); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + rmContext.getRMApps().put(appId, Mockito.mock(RMApp.class)); ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1L); ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2L); AllocationExpirationInfo expirationInfo1 = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index aa5b336..0a11729 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -299,6 +299,8 @@ public void testGracefulDecommissionWithApp() throws Exception { RMApp app = rm.submitApp(2000); MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); ApplicationAttemptId aaid = app.getCurrentAppAttempt().getAppAttemptId(); + nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING); + nm3.nodeHeartbeat(true); // Graceful decommission host1 and host3 writeToHostsFile("host1", "host3");