diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 61a0349..0de556b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -312,9 +312,12 @@ public RegisterNodeManagerResponse registerNodeManager( } else { LOG.info("Reconnect from the node at: " + host); this.nmLivelinessMonitor.unregister(nodeId); - this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeReconnectEvent(nodeId, rmNode, - request.getRunningApplications())); + this.rmContext + .getDispatcher() + .getEventHandler() + .handle( + new RMNodeReconnectEvent(nodeId, rmNode, request + .getRunningApplications(), request.getNMContainerStatuses())); } // On every node manager register we will be clearing NMToken keys if // present for any running application. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 1bc98b2..13efe96 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory; import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.RemovalCause; /** * This class is used to keep track of all the applications/containers @@ -601,6 +602,35 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { rmNode.httpAddress = newNode.getHttpAddress(); rmNode.totalCapability = newNode.getTotalCapability(); + // Filter the map to only obtain just launched containers and finished + // containers. + List newlyLaunchedContainers = + new ArrayList(); + List completedContainers = + new ArrayList(); + for (NMContainerStatus remoteContainer : reconnectEvent + .getNMContainerStatuses()) { + ContainerId containerId = remoteContainer.getContainerId(); + + // Process running containers + if (remoteContainer.getContainerState() == ContainerState.RUNNING) { + if (!rmNode.launchedContainers.contains(containerId)) { + // Just launched container. RM knows about it the first time. + rmNode.launchedContainers.add(containerId); + ContainerStatus cStatus = createContainerStatus(remoteContainer); + newlyLaunchedContainers.add(cStatus); + } + } else { + + ContainerStatus cStatus = createContainerStatus(remoteContainer); + completedContainers.add(cStatus); + } + } + if (newlyLaunchedContainers.size() != 0 + || completedContainers.size() != 0) { + rmNode.nodeUpdateQueue.add(new UpdatedContainerInfo( + newlyLaunchedContainers, completedContainers)); + } // Reset heartbeat ID since node just restarted. rmNode.getLastNodeHeartBeatResponse().setResponseId(0); } @@ -622,6 +652,16 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } } + + private ContainerStatus createContainerStatus( + NMContainerStatus remoteContainer) { + ContainerStatus cStatus = + ContainerStatus.newInstance(remoteContainer.getContainerId(), + remoteContainer.getContainerState(), + remoteContainer.getDiagnostics(), + remoteContainer.getContainerExitStatus()); + return cStatus; + } } public static class UpdateNodeResourceWhenRunningTransition diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java index ebbac9a..6df7bd9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java @@ -22,16 +22,19 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; public class RMNodeReconnectEvent extends RMNodeEvent { private RMNode reconnectedNode; private List runningApplications; + private List containerStatuses; public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode, - List runningApps) { + List runningApps, List containerReports) { super(nodeId, RMNodeEventType.RECONNECTED); reconnectedNode = newNode; runningApplications = runningApps; + this.containerStatuses = containerReports; } public RMNode getReconnectedNode() { @@ -41,4 +44,8 @@ public RMNode getReconnectedNode() { public List getRunningApplications() { return runningApplications; } + + public List getNMContainerStatuses() { + return this.containerStatuses; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index 891130f..3f870a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.log4j.Level; @@ -478,6 +480,82 @@ public void testAppCleanupWhenNMReconnects() throws Exception { rm1.stop(); } + @Test + public void testAppCleanupWhenNMRstarts() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + int nmMemory = 8192; + MockNM nm1 = + new MockNM("127.0.0.1:1234", nmMemory, rm1.getResourceTrackerService()); + nm1.registerNode(); + + RMApp app0 = rm1.submitApp(1024); + MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1); + + int noOfContainers = 1; + List allocateAndWaitForContainers = + am0.allocateAndWaitForContainers(noOfContainers, 2048, nm1); + Assert.assertEquals(noOfContainers, allocateAndWaitForContainers.size()); + + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.RUNNING); + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), + allocateAndWaitForContainers.get(0).getId().getContainerId(), + ContainerState.RUNNING); + + rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING); + + ResourceScheduler rs = rm1.getRMContext().getScheduler(); + int allocatedMB = rs.getRootQueueMetrics().getAllocatedMB(); + Assert.assertEquals(3072, allocatedMB); + + List nMContainerStatusForApp = + createNMContainerStatusForApp(am0); + nm1.registerNode(nMContainerStatusForApp, + Arrays.asList(app0.getApplicationId())); + + int counter = 0; + while (rs.getRootQueueMetrics().getAllocatedMB() == allocatedMB) { + nm1.nodeHeartbeat(true); + + Thread.sleep(100); + if (counter++ == 50) { + break; + } + + } + Assert.assertEquals(1024, rs.getRootQueueMetrics().getAllocatedMB()); + + rm1.stop(); + } + + public static List createNMContainerStatusForApp(MockAM am) { + List list = new ArrayList(); + NMContainerStatus amContainer = + createNMContainerStatus(am.getApplicationAttemptId(), 1, + ContainerState.RUNNING); + NMContainerStatus completedContainer = + createNMContainerStatus(am.getApplicationAttemptId(), 2, + ContainerState.COMPLETE); + list.add(amContainer); + list.add(completedContainer); + return list; + } + + public static NMContainerStatus createNMContainerStatus( + ApplicationAttemptId appAttemptId, int id, ContainerState containerState) { + ContainerId containerId = ContainerId.newContainerId(appAttemptId, id); + NMContainerStatus containerReport = + NMContainerStatus.newInstance(containerId, containerState, + Resource.newInstance(1024, 1), "recover container", 0, + Priority.newInstance(0), 0); + return containerReport; + } + public static void main(String[] args) throws Exception { TestApplicationCleanup t = new TestApplicationCleanup(); t.testAppCleanup(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index d877e25..c6da3fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -540,7 +540,7 @@ public void testReconnect() { int initialUnhealthy = cm.getUnhealthyNMs(); int initialDecommissioned = cm.getNumDecommisionedNMs(); int initialRebooted = cm.getNumRebootedNMs(); - node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null)); + node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null, null)); Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs()); Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); Assert.assertEquals("Unhealthy Nodes", @@ -614,7 +614,7 @@ public void testReconnnectUpdate() { Assert.assertEquals(nmVersion1, node.getNodeManagerVersion()); RMNodeImpl reconnectingNode = getRunningNode(nmVersion2); node.handle(new RMNodeReconnectEvent(node.getNodeID(), reconnectingNode, - null)); + null, null)); Assert.assertEquals(nmVersion2, node.getNodeManagerVersion()); } } -- 1.8.4