diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 7a4e79c..34a59de 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; @@ -916,6 +917,8 @@ private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode no LOG.info("Unknown application " + containerId.getApplicationAttemptId().getApplicationId() + " launched container " + containerId + " on node: " + node); + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); return; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index 45ccd1c..0f30d9b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -232,20 +232,7 @@ protected Dispatcher createDispatcher() { containerStatuses.put(app.getApplicationId(), containerStatusList); NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true); - dispatcher.await(); - List contsToClean = resp.getContainersToCleanup(); - int cleanedConts = contsToClean.size(); - waitCount = 0; - while (cleanedConts < 1 && waitCount++ < 200) { - LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts); - Thread.sleep(100); - resp = nm1.nodeHeartbeat(true); - dispatcher.await(); - contsToClean = resp.getContainersToCleanup(); - cleanedConts += contsToClean.size(); - } - LOG.info("Got cleanup for " + contsToClean.get(0)); - Assert.assertEquals(1, cleanedConts); + waitForContainerCleanup(dispatcher, nm1, resp); // Now to test the case when RM already gave cleanup, and NM suddenly // realizes that the container is running. @@ -258,26 +245,36 @@ protected Dispatcher createDispatcher() { containerStatuses.put(app.getApplicationId(), containerStatusList); resp = nm1.nodeHeartbeat(containerStatuses, true); - dispatcher.await(); - contsToClean = resp.getContainersToCleanup(); - cleanedConts = contsToClean.size(); // The cleanup list won't be instantaneous as it is given out by scheduler // and not RMNodeImpl. + waitForContainerCleanup(dispatcher, nm1, resp); + + rm.stop(); + } + + protected void waitForContainerCleanup(DrainDispatcher dispatcher, MockNM nm, + NodeHeartbeatResponse resp) throws Exception { + int waitCount; + dispatcher.await(); + List contsToClean = resp.getContainersToCleanup(); + int cleanedConts = contsToClean.size(); waitCount = 0; while (cleanedConts < 1 && waitCount++ < 200) { LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts); Thread.sleep(100); - resp = nm1.nodeHeartbeat(true); + resp = nm.nodeHeartbeat(true); dispatcher.await(); contsToClean = resp.getContainersToCleanup(); cleanedConts += contsToClean.size(); } - LOG.info("Got cleanup for " + contsToClean.get(0)); + if (contsToClean.isEmpty()) { + LOG.error("Failed to get any containers to cleanup"); + } else { + LOG.info("Got cleanup for " + contsToClean.get(0)); + } Assert.assertEquals(1, cleanedConts); - - rm.stop(); } - + private void waitForAppCleanupMessageRecved(MockNM nm, ApplicationId appId) throws Exception { while (true) { @@ -400,6 +397,58 @@ public void testAppCleanupWhenRMRestartedBeforeAppFinished() throws Exception { rm2.stop(); } + @SuppressWarnings("resource") + @Test (timeout = 60000) + public void testContainerCleanupWhenRMRestartedAppNotRegistered() throws + Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // start RM + final DrainDispatcher dispatcher = new DrainDispatcher(); + MockRM rm1 = new MockRM(conf, memStore) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.RUNNING); + rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING); + + // start new RM + final DrainDispatcher dispatcher2 = new DrainDispatcher(); + MockRM rm2 = new MockRM(conf, memStore) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher2; + } + }; + rm2.start(); + + // nm1 register to rm2, and do a heartbeat + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + nm1.registerNode(Arrays.asList(app0.getApplicationId())); + rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); + + // Add unknown container for application unknown to scheduler + NodeHeartbeatResponse response = nm1.nodeHeartbeat(am0 + .getApplicationAttemptId(), 2, ContainerState.RUNNING); + + waitForContainerCleanup(dispatcher2, nm1, response); + + rm1.stop(); + rm2.stop(); + } + public static void main(String[] args) throws Exception { TestApplicationCleanup t = new TestApplicationCleanup(); t.testAppCleanup();