diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 9c4a8a6..b290300 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -28,6 +28,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -137,14 +138,12 @@ private SecretKey clientTokenMasterKey = null; private ConcurrentMap> - justFinishedContainers = - new ConcurrentHashMap>(); + justFinishedContainers = new ConcurrentHashMap<>(); // Tracks the previous finished containers that are waiting to be // verified as received by the AM. If the AM sends the next allocate // request it implicitly acks this list. private ConcurrentMap> - finishedContainersSentToAM = - new ConcurrentHashMap>(); + finishedContainersSentToAM = new ConcurrentHashMap<>(); private Container masterContainer; private float progress = 0; @@ -696,7 +695,7 @@ public float getProgress() { public List getJustFinishedContainers() { this.readLock.lock(); try { - List returnList = new ArrayList(); + List returnList = new ArrayList<>(); for (Collection containerStatusList : justFinishedContainers.values()) { returnList.addAll(containerStatusList); @@ -735,7 +734,7 @@ public float getProgress() { this.writeLock.lock(); try { - List returnList = new ArrayList(); + List returnList = new ArrayList<>(); // A new allocate means the AM received the previously sent // finishedContainers. We can ack this to NM now @@ -743,15 +742,17 @@ public float getProgress() { // Mark every containerStatus as being sent to AM though we may return // only the ones that belong to the current attempt - boolean keepContainersAcressAttempts = this.submissionContext + boolean keepContainersAcrossAppAttempts = this.submissionContext .getKeepContainersAcrossApplicationAttempts(); - for (NodeId nodeId:justFinishedContainers.keySet()) { - - // Clear and get current values - List finishedContainers = justFinishedContainers.put - (nodeId, new ArrayList()); + for (Map.Entry> entry : + justFinishedContainers.entrySet()) { + NodeId nodeId = entry.getKey(); + List finishedContainers = entry.getValue(); + if (finishedContainers.isEmpty()) { + continue; + } - if (keepContainersAcressAttempts) { + if (keepContainersAcrossAppAttempts) { returnList.addAll(finishedContainers); } else { // Filter out containers from previous attempt @@ -763,10 +764,11 @@ public float getProgress() { } } - finishedContainersSentToAM.putIfAbsent(nodeId, new ArrayList - ()); + finishedContainersSentToAM.putIfAbsent(nodeId, + new ArrayList()); finishedContainersSentToAM.get(nodeId).addAll(finishedContainers); } + justFinishedContainers.clear(); return returnList; } finally { @@ -1706,7 +1708,7 @@ private void sendFinishedContainersToNM() { finishedContainersSentToAM.put(nodeId, new ArrayList()); List containerIdList = - new ArrayList(currentSentContainers.size()); + new ArrayList<>(currentSentContainers.size()); for (ContainerStatus containerStatus : currentSentContainers) { containerIdList.add(containerStatus.getContainerId()); }