diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 2d1737a..6e02315 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1236,7 +1236,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { // finished containers so that they can be acked to NM, // but when pulling finished container we will check this flag again. ((RMAppAttemptImpl) app.currentAttempt) - .transferStateFromPreviousAttempt(oldAttempt); + .transferStateFromAttempt(oldAttempt); return initialState; } else { if (numberOfFailure >= app.maxAppAttempts) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 1be1727..c1649d7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -851,7 +851,7 @@ public void recover(RMState state) { attemptState.getMemorySeconds(),attemptState.getVcoreSeconds()); } - public void transferStateFromPreviousAttempt(RMAppAttempt attempt) { + public void transferStateFromAttempt(RMAppAttempt attempt) { this.justFinishedContainers = attempt.getJustFinishedContainersReference(); this.finishedContainersSentToAM = attempt.getFinishedContainersSentToAMReference(); @@ -1051,6 +1051,13 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, appAttempt.progress = 1.0f; RMApp rmApp =appAttempt.rmContext.getRMApps().get( appAttempt.getAppAttemptId().getApplicationId()); + + if (appAttempt.submissionContext + .getKeepContainersAcrossApplicationAttempts() + && !appAttempt.submissionContext.getUnmanagedAM() + && rmApp.getCurrentAppAttempt() != appAttempt) { + appAttempt.transferStateFromAttempt(rmApp.getCurrentAppAttempt()); + } // We will replay the final attempt only if last attempt is in final // state but application is not in final state. if (rmApp.getCurrentAppAttempt() == appAttempt diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 3033496..2886968 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -1011,4 +1011,64 @@ public void addApplicationSync(ApplicationId applicationId, rm2.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED); } + + @Test(timeout = 20000) + public void testContainerCompleteMsgNotLostAfterAMFailedAndRMRestart() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + rm1.start(); + + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // submit app with keepContainersAcrossApplicationAttempts true + RMApp app0 = rm1.submitApp(200, "", UserGroupInformation.getCurrentUser() + .getShortUserName(), null, false, null, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, + null, null, true, true, false, null, 0, null, true); + MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1); + + am0.allocate("127.0.0.1", 1000, 2, new ArrayList()); + nm1.nodeHeartbeat(true); + List conts = am0.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + while (conts.size() == 0) { + nm1.nodeHeartbeat(true); + conts.addAll(am0.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers()); + Thread.sleep(500); + } + + // am failed,and relaunch it + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + rm1.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); + MockAM am1 = MockRM.launchAndRegisterAM(app0, rm1, nm1); + + // rm failover + rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + + // container launched by first am completed + NMContainerStatus amContainer = + TestRMRestart.createNMContainerStatus(am0.getApplicationAttemptId(), 1, + ContainerState.RUNNING); + NMContainerStatus completedContainer= + TestRMRestart.createNMContainerStatus(am0.getApplicationAttemptId(), 2, + ContainerState.COMPLETE); + NMContainerStatus runningContainer = + TestRMRestart.createNMContainerStatus(am0.getApplicationAttemptId(), 3, + ContainerState.RUNNING); + nm1.registerNode(Arrays.asList(amContainer, runningContainer, + completedContainer), null); + Thread.sleep(200); + + // check whether current am could get containerCompleteMsg + RMApp recoveredApp0 = + rm2.getRMContext().getRMApps().get(app0.getApplicationId()); + RMAppAttempt loadedAttempt1 = recoveredApp0.getCurrentAppAttempt(); + assertEquals(1,loadedAttempt1.getJustFinishedContainers().size()); + } + }