diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 2451030..df9f34b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -668,6 +668,7 @@ protected boolean finish() { + ", completed=" + numCompletedContainers.get() + ", allocated=" + numAllocatedContainers.get() + ", failed=" + numFailedContainers.get(); + LOG.info(appMessage); success = false; } try { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index d75a871..fbcb7d7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -687,20 +687,7 @@ public float getProgress() { // A new allocate means the AM received the previously sent // finishedContainers. We can ack this to NM now - for (NodeId nodeId:finishedContainersSentToAM.keySet()) { - - // Clear and get current values - List currentSentContainers = - finishedContainersSentToAM - .put(nodeId, new ArrayList()); - List containerIdList = new ArrayList - (currentSentContainers.size()); - for (ContainerStatus containerStatus:currentSentContainers) { - containerIdList.add(containerStatus.getContainerId()); - } - eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent( - nodeId, containerIdList)); - } + sendFinishedContainersToNM(); // Mark every containerStatus as being sent to AM though we may return // only the ones that belong to the current attempt @@ -1592,14 +1579,12 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, ContainerStatus containerStatus = containerFinishedEvent.getContainerStatus(); - // Add all finished containers so that they can be acked to NM - addJustFinishedContainer(appAttempt, containerFinishedEvent); - // Is this container the AmContainer? If the finished container is same as // the AMContainer, AppAttempt fails if (appAttempt.masterContainer != null && appAttempt.masterContainer.getId().equals( containerStatus.getContainerId())) { + appAttempt.sendAMContainerToNM(appAttempt, containerFinishedEvent); // Remember the follow up transition and save the final attempt state. appAttempt.rememberTargetTransitionsAndStoreState(event, @@ -1607,10 +1592,46 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, return RMAppAttemptState.FINAL_SAVING; } + // Add all finished containers so that they can be acked to NM + addJustFinishedContainer(appAttempt, containerFinishedEvent); return this.currentState; } } + + // Ack NM to remove finished containers from context. + private void sendFinishedContainersToNM() { + for (NodeId nodeId : finishedContainersSentToAM.keySet()) { + + // Clear and get current values + List currentSentContainers = + finishedContainersSentToAM.put(nodeId, + new ArrayList()); + List containerIdList = + new ArrayList(currentSentContainers.size()); + for (ContainerStatus containerStatus : currentSentContainers) { + containerIdList.add(containerStatus.getContainerId()); + } + eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId, + containerIdList)); + } + } + + // Add am container to the list so that am container instance will be + // removed from NMContext. + private void sendAMContainerToNM(RMAppAttemptImpl appAttempt, + RMAppAttemptContainerFinishedEvent containerFinishedEvent) { + NodeId nodeId = containerFinishedEvent.getNodeId(); + finishedContainersSentToAM.putIfAbsent(nodeId, + new ArrayList()); + appAttempt.finishedContainersSentToAM.get(nodeId).add( + containerFinishedEvent.getContainerStatus()); + if (!appAttempt.getSubmissionContext() + .getKeepContainersAcrossApplicationAttempts()) { + appAttempt.sendFinishedContainersToNM(); + } + } + private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt, RMAppAttemptContainerFinishedEvent containerFinishedEvent) { appAttempt.justFinishedContainers.putIfAbsent(containerFinishedEvent @@ -1661,16 +1682,16 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, ContainerStatus containerStatus = containerFinishedEvent.getContainerStatus(); - // Add all finished containers so that they can be acked to NM. - addJustFinishedContainer(appAttempt, containerFinishedEvent); - // Is this container the ApplicationMaster container? if (appAttempt.masterContainer.getId().equals( containerStatus.getContainerId())) { new FinalTransition(RMAppAttemptState.FINISHED).transition( appAttempt, containerFinishedEvent); + appAttempt.sendAMContainerToNM(appAttempt, containerFinishedEvent); return RMAppAttemptState.FINISHED; } + // Add all finished containers so that they can be acked to NM. + addJustFinishedContainer(appAttempt, containerFinishedEvent); return RMAppAttemptState.FINISHING; } @@ -1686,14 +1707,13 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, ContainerStatus containerStatus = containerFinishedEvent.getContainerStatus(); - // Add all finished containers so that they can be acked to NM. - addJustFinishedContainer(appAttempt, containerFinishedEvent); - // If this is the AM container, it means the AM container is finished, // but we are not yet acknowledged that the final state has been saved. // Thus, we still return FINAL_SAVING state here. if (appAttempt.masterContainer.getId().equals( containerStatus.getContainerId())) { + appAttempt.sendAMContainerToNM(appAttempt, containerFinishedEvent); + if (appAttempt.targetedFinalState.equals(RMAppAttemptState.FAILED) || appAttempt.targetedFinalState.equals(RMAppAttemptState.KILLED)) { // ignore Container_Finished Event if we were supposed to reach @@ -1708,6 +1728,9 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, appAttempt.eventCausingFinalSaving), RMAppAttemptState.FINISHED); return; } + + // Add all finished containers so that they can be acked to NM. + addJustFinishedContainer(appAttempt, containerFinishedEvent); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index ba592fc..fcb4e45 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -98,9 +98,6 @@ public void testAMRestartWithExistingContainers() throws Exception { Thread.sleep(200); } - ContainerId amContainerId = ContainerId.newInstance(am1 - .getApplicationAttemptId(), 1); - // launch the 2nd container, for testing running container transferred. nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING); ContainerId containerId2 = @@ -199,15 +196,11 @@ public void testAMRestartWithExistingContainers() throws Exception { // completed containerId4 is also transferred to the new attempt. RMAppAttempt newAttempt = app1.getRMAppAttempt(am2.getApplicationAttemptId()); - // 4 containers finished, acquired/allocated/reserved/completed + AM - // container. - waitForContainersToFinish(5, newAttempt); + // 4 containers finished, acquired/allocated/reserved/completed. + waitForContainersToFinish(4, newAttempt); boolean container3Exists = false, container4Exists = false, container5Exists = - false, container6Exists = false, amContainerExists = false; + false, container6Exists = false; for(ContainerStatus status : newAttempt.getJustFinishedContainers()) { - if(status.getContainerId().equals(amContainerId)) { - amContainerExists = true; - } if(status.getContainerId().equals(containerId3)) { // containerId3 is the container ran by previous attempt but finished by the // new attempt. @@ -227,11 +220,8 @@ public void testAMRestartWithExistingContainers() throws Exception { container6Exists = true; } } - Assert.assertTrue(amContainerExists); - Assert.assertTrue(container3Exists); - Assert.assertTrue(container4Exists); - Assert.assertTrue(container5Exists); - Assert.assertTrue(container6Exists); + Assert.assertTrue(container3Exists && container4Exists && container5Exists + && container6Exists); // New SchedulerApplicationAttempt also has the containers info. rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); @@ -250,14 +240,14 @@ public void testAMRestartWithExistingContainers() throws Exception { // all 4 normal containers finished. System.out.println("New attempt's just finished containers: " + newAttempt.getJustFinishedContainers()); - waitForContainersToFinish(6, newAttempt); + waitForContainersToFinish(5, newAttempt); rm1.stop(); } private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt) throws InterruptedException { int count = 0; - while (attempt.getJustFinishedContainers().size() < expectedNum + while (attempt.getJustFinishedContainers().size() != expectedNum && count < 500) { Thread.sleep(100); count++;