Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java (date 1486680304000) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java (date 1486753978000) @@ -543,29 +543,25 @@ } void trackContainerForPreemption(RMContainer container) { - if (containersToPreempt.add(container)) { - synchronized (preemptedResources) { + synchronized (preemptedResources) { + if (containersToPreempt.add(container)) { Resources.addTo(preemptedResources, container.getAllocatedResource()); } } } private void untrackContainerForPreemption(RMContainer container) { - if (containersToPreempt.remove(container)) { - synchronized (preemptedResources) { + synchronized (preemptedResources) { + if (containersToPreempt.remove(container)) { Resources.subtractFrom(preemptedResources, container.getAllocatedResource()); } } } - Set getPreemptionContainers() { - return containersToPreempt; - } - - private Resource getPreemptedResources() { + Collection getPreemptionContainers() { synchronized (preemptedResources) { - return preemptedResources; + return new ArrayList(containersToPreempt); } } @@ -582,9 +578,11 @@ return false; } - if (containersToPreempt.contains(container)) { - // The container is already under consideration for preemption - return false; + synchronized (preemptedResources) { + if (containersToPreempt.contains(container)) { + // The container is already under consideration for preemption + return false; + } } // Check if the app's allocation will be over its fairshare even @@ -960,7 +958,8 @@ if (LOG.isTraceEnabled()) { LOG.trace("Assign container on " + node.getNodeName() + " node, assignType: OFF_SWITCH" + ", allowedLocality: " - + allowedLocality + ", priority: " + schedulerKey.getPriority() + + allowedLocality + ", priority: " + + schedulerKey.getPriority() + ", app attempt id: " + this.attemptId); } return assignContainer(node, offswitchAsk, NodeType.OFF_SWITCH, @@ -1135,13 +1134,11 @@ @Override public Resource getResourceUsage() { - /* - * getResourcesToPreempt() returns zero, except when there are containers - * to preempt. Avoid creating an object in the common case. - */ - return getPreemptedResources().equals(Resources.none()) - ? getCurrentConsumption() - : Resources.subtract(getCurrentConsumption(), getPreemptedResources()); + // Subtract copies the object, so that we have a snapshot, + // in case usage changes, while the caller is using the value + synchronized (preemptedResources) { + return Resources.subtract(getCurrentConsumption(), preemptedResources); + } } @Override Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (date 1486680304000) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (date 1486753978000) @@ -849,18 +849,20 @@ application.getWriteLock().unlock(); } + Collection preemptionContainers = + application.getPreemptionContainers(); if (LOG.isDebugEnabled()) { LOG.debug( "allocate: post-update" + " applicationAttemptId=" + appAttemptId + " #ask=" + ask.size() + " reservation= " + application .getCurrentReservation()); - LOG.debug("Preempting " + application.getPreemptionContainers().size() + LOG.debug("Preempting " + preemptionContainers.size() + " container(s)"); } Set preemptionContainerIds = new HashSet(); - for (RMContainer container : application.getPreemptionContainers()) { + for (RMContainer container : preemptionContainers) { preemptionContainerIds.add(container.getContainerId()); }