diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 2d6de3750e9..c2496e2bd47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -179,7 +179,7 @@ private long launchAMEndTime = 0; private long scheduledTime = 0; private long containerAllocatedTime = 0; - private boolean nonWorkPreservingAMContainerFinished = false; + private boolean amContainerFinished = false; // Set to null initially. Will eventually get set // if an RMAppAttemptUnregistrationEvent occurs @@ -1498,6 +1498,13 @@ public void transition(RMAppAttemptImpl appAttempt, case KILLED: { appAttempt.invalidateAMHostAndPort(); + + // If app is killed by user request, amContainerFinished is not called. + // Thus, there is no chance to clear existing justFinishedContainers and finishedContainersSentToAM + // Clear them in here before proceeding next step. + appAttempt.sendFinishedContainersToNM(appAttempt.finishedContainersSentToAM); + appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers); + // Forward diagnostics received in attempt kill event. appEvent = new RMAppFailedAttemptEvent(applicationId, @@ -2017,25 +2024,19 @@ private static void amContainerFinished(RMAppAttemptImpl appAttempt, LOG.warn("No ContainerStatus in containerFinishedEvent"); } - if (!appAttempt.getSubmissionContext() - .getKeepContainersAcrossApplicationAttempts()) { - appAttempt.finishedContainersSentToAM.putIfAbsent(nodeId, - new ArrayList<>()); - appAttempt.finishedContainersSentToAM.get(nodeId).add(containerStatus); - appAttempt.sendFinishedContainersToNM( - appAttempt.finishedContainersSentToAM); - // there might be some completed containers that have not been pulled - // by the AM heartbeat, explicitly add them for cleanup. - appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers); + appAttempt.finishedContainersSentToAM.putIfAbsent(nodeId, + new ArrayList<>()); + appAttempt.finishedContainersSentToAM.get(nodeId).add(containerStatus); + appAttempt.sendFinishedContainersToNM( + appAttempt.finishedContainersSentToAM); + // there might be some completed containers that have not been pulled + // by the AM heartbeat, explicitly add them for cleanup. + appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers); - // mark the fact that AM container has finished so that future finished - // containers will be cleaned up without the engagement of AM containers - // (through heartbeat) - appAttempt.nonWorkPreservingAMContainerFinished = true; - } else { - appAttempt.sendFinishedAMContainerToNM(nodeId, - containerStatus.getContainerId()); - } + // mark the fact that AM container has finished so that future finished + // containers will be cleaned up without the engagement of AM containers + // (through heartbeat) + appAttempt.amContainerFinished = true; } private void addAMNodeToBlackList(NodeId nodeId) { @@ -2060,7 +2061,7 @@ private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt, appAttempt.justFinishedContainers.get(containerFinishedEvent .getNodeId()).add(containerFinishedEvent.getContainerStatus()); - if (appAttempt.nonWorkPreservingAMContainerFinished) { + if (appAttempt.amContainerFinished) { // AM container has finished, so no more AM heartbeats to do the cleanup. appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers); }