Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java (revision 1568227) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java (working copy) @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; @@ -461,8 +462,10 @@ rsrcPreempt, Resources.none())) { return ret; } - ret.add(c); - Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource()); + if (c.getState().equals(RMContainerState.RUNNING)) { + ret.add(c); + Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource()); + } } return ret; Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (revision 1568227) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (working copy) @@ -77,6 +77,9 @@ if (null == liveContainers.remove(rmContainer.getContainerId())) { return false; } + + // Remove from the list of newly allocated containers if found + newlyAllocatedContainers.remove(rmContainer); Container container = rmContainer.getContainer(); ContainerId containerId = container.getId(); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java (revision 1568227) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java (working copy) @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.Deque; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.NavigableSet; @@ -35,6 +36,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; @@ -351,6 +353,60 @@ assert containers.get(4).equals(rm5); } + + @Test + public void testConatinerStateForPreemption() { + int[][] qData = new int[][] { + // / A B C + { 100, 40, 40, 20 }, // abs + { 100, 55, 45, 0 }, // used + { 20, 10, 10, 0 }, // pending + { 0, 0, 0, 0 }, // reserved + { 2, 1, 1, 0 }, // apps + { -1, 1, 1, 0 }, // req granularity + { 3, 0, 0, 0 }, // subqueues + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + CSQueue rootQueue = mCS.getRootQueue(); + + // Send the Container status as ALLOCATED + updateRMContainerState(rootQueue, false); + policy.editSchedule(); + verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class)); + + // Update the Container status as RUNNING + updateRMContainerState(rootQueue, true); + policy.editSchedule(); + + // correct imbalance between over-capacity queues + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA))); + } + + public void updateRMContainerState(CSQueue rootQueue, + boolean setContStateRunning) { + if (rootQueue instanceof LeafQueue) { + LeafQueue leafQueue = (LeafQueue) rootQueue; + NavigableSet ns = (NavigableSet) leafQueue + .getApplications(); + Iterator desc = ns.descendingIterator(); + while (desc.hasNext()) { + FiCaSchedulerApp app = desc.next(); + List containers = new ArrayList( + app.getLiveContainers()); + for (RMContainer rc : containers) { + if (setContStateRunning) { + when(rc.getState()).thenReturn(RMContainerState.RUNNING); + } else { + when(rc.getState()).thenReturn(RMContainerState.ALLOCATED); + } + } + } + } else { + for (CSQueue c : rootQueue.getChildQueues()) { + updateRMContainerState(c, setContStateRunning); + } + } + } static class IsPreemptionRequestFor extends ArgumentMatcher {