diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 9ec0f82..5b78eb8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -28,6 +28,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -144,14 +145,12 @@ private SecretKey clientTokenMasterKey = null; private ConcurrentMap> - justFinishedContainers = - new ConcurrentHashMap>(); + justFinishedContainers = new ConcurrentHashMap<>(); // Tracks the previous finished containers that are waiting to be // verified as received by the AM. If the AM sends the next allocate // request it implicitly acks this list. private ConcurrentMap> - finishedContainersSentToAM = - new ConcurrentHashMap>(); + finishedContainersSentToAM = new ConcurrentHashMap<>(); private volatile Container masterContainer; private float progress = 0; @@ -759,7 +758,7 @@ public float getProgress() { public List getJustFinishedContainers() { this.readLock.lock(); try { - List returnList = new ArrayList(); + List returnList = new ArrayList<>(); for (Collection containerStatusList : justFinishedContainers.values()) { returnList.addAll(containerStatusList); @@ -798,7 +797,7 @@ public float getProgress() { this.writeLock.lock(); try { - List returnList = new ArrayList(); + List returnList = new ArrayList<>(); // A new allocate means the AM received the previously sent // finishedContainers. We can ack this to NM now @@ -806,15 +805,17 @@ public float getProgress() { // Mark every containerStatus as being sent to AM though we may return // only the ones that belong to the current attempt - boolean keepContainersAcressAttempts = this.submissionContext + boolean keepContainersAcrossAppAttempts = this.submissionContext .getKeepContainersAcrossApplicationAttempts(); - for (NodeId nodeId:justFinishedContainers.keySet()) { - - // Clear and get current values - List finishedContainers = justFinishedContainers.put - (nodeId, new ArrayList()); + for (Map.Entry> entry : + justFinishedContainers.entrySet()) { + NodeId nodeId = entry.getKey(); + List finishedContainers = entry.getValue(); + if (finishedContainers.isEmpty()) { + continue; + } - if (keepContainersAcressAttempts) { + if (keepContainersAcrossAppAttempts) { returnList.addAll(finishedContainers); } else { // Filter out containers from previous attempt @@ -826,12 +827,10 @@ public float getProgress() { } } - if (!finishedContainers.isEmpty()) { - finishedContainersSentToAM.putIfAbsent(nodeId, - new ArrayList()); - finishedContainersSentToAM.get(nodeId).addAll(finishedContainers); - } + finishedContainersSentToAM.putIfAbsent(nodeId, new ArrayList<>()); + finishedContainersSentToAM.get(nodeId).addAll(finishedContainers); } + justFinishedContainers.clear(); return returnList; } finally { @@ -936,8 +935,7 @@ public void transferStateFromAttempt(RMAppAttempt attempt) { for (NodeId nodeId : this.finishedContainersSentToAM.keySet()) { List containerStatuses = this.finishedContainersSentToAM.get(nodeId); - this.justFinishedContainers.putIfAbsent(nodeId, - new ArrayList()); + this.justFinishedContainers.putIfAbsent(nodeId, new ArrayList<>()); this.justFinishedContainers.get(nodeId).addAll(containerStatuses); } this.finishedContainersSentToAM.clear(); @@ -1863,10 +1861,9 @@ private void sendFinishedContainersToNM() { // Clear and get current values List currentSentContainers = - finishedContainersSentToAM.put(nodeId, - new ArrayList()); + finishedContainersSentToAM.put(nodeId, new ArrayList<>()); List containerIdList = - new ArrayList(currentSentContainers.size()); + new ArrayList<>(currentSentContainers.size()); for (ContainerStatus containerStatus : currentSentContainers) { containerIdList.add(containerStatus.getContainerId()); } @@ -1897,7 +1894,7 @@ private static void amContainerFinished(RMAppAttemptImpl appAttempt, if (!appAttempt.getSubmissionContext() .getKeepContainersAcrossApplicationAttempts()) { appAttempt.finishedContainersSentToAM.putIfAbsent(nodeId, - new ArrayList()); + new ArrayList<>()); appAttempt.finishedContainersSentToAM.get(nodeId).add(containerStatus); appAttempt.sendFinishedContainersToNM(); } else { @@ -1924,7 +1921,7 @@ public BlacklistManager getAMBlacklistManager() { private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt, RMAppAttemptContainerFinishedEvent containerFinishedEvent) { appAttempt.justFinishedContainers.putIfAbsent(containerFinishedEvent - .getNodeId(), new ArrayList()); + .getNodeId(), new ArrayList<>()); appAttempt.justFinishedContainers.get(containerFinishedEvent .getNodeId()).add(containerFinishedEvent.getContainerStatus()); }