diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 0b40dd533a0..09ad3391b09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -659,7 +659,7 @@ public void addCompletedContainer(ContainerId containerId) { @VisibleForTesting @Private public void removeOrTrackCompletedContainersFromContext( - List containerIds) throws IOException { + List containerIds) { Set removedContainers = new HashSet(); pendingContainersToRemove.addAll(containerIds); @@ -676,13 +676,13 @@ public void removeOrTrackCompletedContainersFromContext( removedContainers.add(containerId); iter.remove(); } + pendingCompletedContainers.remove(containerId); } if (!removedContainers.isEmpty()) { LOG.info("Removed completed containers from NM context: " + removedContainers); } - pendingCompletedContainers.clear(); } private void trackAppsForKeepAlive(List appIds) { @@ -790,6 +790,7 @@ protected void startStatusUpdater() { @SuppressWarnings("unchecked") public void run() { int lastHeartbeatID = 0; + boolean missedHearbeat = false; while (!isStopped) { // Send heartbeat try { @@ -836,6 +837,20 @@ public void run() { removeOrTrackCompletedContainersFromContext(response .getContainersToBeRemovedFromNM()); + // If the last heartbeat was missed, it is possible that the + // RM saw this one as a duplicate and did not process it. + // If so, we can fail to notify the RM of these completed containers + // on the next heartbeat if we clear pendingCompletedContainers. + // If it wasn't a duplicate, the only impact is we might notify + // the RM twice, which it can handle. + if (!missedHearbeat) { + pendingCompletedContainers.clear(); + } else { + LOG.info("skipped clearing pending completed containers due to " + + "missed heartbeat"); + missedHearbeat = false; + } + logAggregationReportForAppsTempList.clear(); lastHeartbeatID = response.getResponseId(); List containersToCleanup = response @@ -911,6 +926,7 @@ public void run() { // TODO Better error handling. Thread can die with the rest of the // NM still running. LOG.error("Caught exception in status-updater", e); + missedHearbeat = true; } finally { synchronized (heartbeatMonitor) { nextHeartBeatInterval = nextHeartBeatInterval <= 0 ? diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 0d73a724076..fb45144adb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -692,15 +692,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) } else if (heartBeatID == 2 || heartBeatID == 3) { List statuses = request.getNodeStatus().getContainersStatuses(); - if (heartBeatID == 2) { - // NM should send completed containers again, since the last - // heartbeat is lost. - Assert.assertEquals(4, statuses.size()); - } else { - // NM should not send completed containers again, since the last - // heartbeat is successful. - Assert.assertEquals(2, statuses.size()); - } + // NM should send completed containers on heartbeat 2, + // since heartbeat 1 was lost. It will send them again on + // heartbeat 3, because it does not clear them if the previous + // heartbeat was lost in case the RM treated it as a duplicate. + Assert.assertEquals(4, statuses.size()); Assert.assertEquals(4, context.getContainers().size()); boolean container2Exist = false, container3Exist = false, @@ -731,14 +727,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) container5Exist = true; } } - if (heartBeatID == 2) { - Assert.assertTrue(container2Exist && container3Exist - && container4Exist && container5Exist); - } else { - // NM do not send completed containers again - Assert.assertTrue(container2Exist && !container3Exist - && container4Exist && !container5Exist); - } + Assert.assertTrue(container2Exist && container3Exist + && container4Exist && container5Exist); if (heartBeatID == 3) { finishedContainersPulledByAM.add(containerStatus3.getContainerId());