diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 40f051babd6..ffdbb45b8dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -699,7 +699,7 @@ public void addCompletedContainer(ContainerId containerId) { @VisibleForTesting @Private public void removeOrTrackCompletedContainersFromContext( - List containerIds) throws IOException { + List containerIds) { Set removedContainers = new HashSet(); pendingContainersToRemove.addAll(containerIds); @@ -722,7 +722,6 @@ public void removeOrTrackCompletedContainersFromContext( LOG.info("Removed completed containers from NM context: " + removedContainers); } - pendingCompletedContainers.clear(); } private void trackAppsForKeepAlive(List appIds) { @@ -1297,6 +1296,7 @@ public void verifyRMHeartbeatResponseForNodeLabels( @SuppressWarnings("unchecked") public void run() { int lastHeartbeatID = 0; + boolean missedHearbeat = false; while (!isStopped) { // Send heartbeat try { @@ -1350,6 +1350,17 @@ public void run() { removeOrTrackCompletedContainersFromContext(response .getContainersToBeRemovedFromNM()); + // If the last heartbeat was missed, it is possible that the + // RM saw this one as a duplicate and did not process it. + // If so, we can fail to notify the RM of these completed containers + // on the next heartbeat if we clear pendingCompletedContainers. + // If it wasn't a duplicate, the only impact is we might notify + // the RM twice, which it can handle. + if (!missedHearbeat) { + pendingCompletedContainers.clear(); + } + missedHearbeat = false; + logAggregationReportForAppsTempList.clear(); lastHeartbeatID = response.getResponseId(); List containersToCleanup = response @@ -1429,6 +1440,7 @@ public void run() { // TODO Better error handling. Thread can die with the rest of the // NM still running. LOG.error("Caught exception in status-updater", e); + missedHearbeat = true; } finally { synchronized (heartbeatMonitor) { nextHeartBeatInterval = nextHeartBeatInterval <= 0 ?