diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 72ee7db..ab56bb9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -273,6 +273,19 @@ public synchronized void recoverContainersOnNode( SchedulerApplicationAttempt schedulerAttempt = schedulerApp.getCurrentAppAttempt(); + if (!rmApp.getApplicationSubmissionContext() + .getKeepContainersAcrossApplicationAttempts()) { + // Do not recover containers for stopped attempt or previous attempt. + if (schedulerAttempt.isStopped() + || !schedulerAttempt.getApplicationAttemptId().equals( + container.getContainerId().getApplicationAttemptId())) { + LOG.info("Skip recovering container " + container + + " for already stopped attempt."); + killOrphanContainerOnNode(nm, container); + continue; + } + } + // create container RMContainer rmContainer = recoverAndCreateContainer(container, nm); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index df64d4c..d6af0d7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -513,6 +513,19 @@ public void testAMfailedBetweenRMRestart() throws Exception { // just-recovered containers. assertNull(scheduler.getRMContainer(runningContainer.getContainerId())); assertNull(scheduler.getRMContainer(completedContainer.getContainerId())); + + rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1); + + MockNM nm2 = + new MockNM("127.1.1.1:4321", 8192, rm2.getResourceTrackerService()); + NMContainerStatus previousAttemptContainer = + TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 4, + ContainerState.RUNNING); + nm2.registerNode(Arrays.asList(previousAttemptContainer), null); + // Wait for RM to settle down on recovering containers; + Thread.sleep(3000); + // check containers from previous failed attempt should not be recovered. + assertNull(scheduler.getRMContainer(previousAttemptContainer.getContainerId())); } // Apps already completed before RM restart. Restarted RM scheduler should not