diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 7e668b4..43147e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -192,7 +192,8 @@ public void editSchedule() { @SuppressWarnings("unchecked") private void preemptOrkillSelectedContainerAfterWait( - Map> selectedCandidates) { + Map> selectedCandidates, + long currentTime) { // preempt (or kill) the selected containers for (Map.Entry> e : selectedCandidates .entrySet()) { @@ -204,8 +205,7 @@ private void preemptOrkillSelectedContainerAfterWait( for (RMContainer container : e.getValue()) { // if we tried to preempt this for more than maxWaitTime if (preemptionCandidates.get(container) != null - && preemptionCandidates.get(container) + maxWaitTime < clock - .getTime()) { + && preemptionCandidates.get(container) + maxWaitTime <= currentTime) { // kill it rmContext.getDispatcher().getEventHandler().handle( new ContainerPreemptEvent(appAttemptId, container, @@ -221,7 +221,7 @@ private void preemptOrkillSelectedContainerAfterWait( rmContext.getDispatcher().getEventHandler().handle( new ContainerPreemptEvent(appAttemptId, container, SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION)); - preemptionCandidates.put(container, clock.getTime()); + preemptionCandidates.put(container, currentTime); } } } @@ -243,13 +243,15 @@ private void syncKillableContainersFromScheduler() { } } - private void cleanupStaledPreemptionCandidates() { + private void cleanupStaledPreemptionCandidates(long currentTime) { // Keep the preemptionCandidates list clean for (Iterator i = preemptionCandidates.keySet().iterator(); i.hasNext(); ) { RMContainer id = i.next(); // garbage collect containers that are irrelevant for preemption - if (preemptionCandidates.get(id) + 2 * maxWaitTime < clock.getTime()) { + // And avoid preempt selected containers for *this execution* + // or within 1 ms + if (preemptionCandidates.get(id) + 2 * maxWaitTime < currentTime) { i.remove(); } } @@ -335,11 +337,13 @@ private void containerBasedPreemptOrKill(CSQueue root, // containers. The bottom line is, we shouldn't preempt a queue which is already // below its guaranteed resource. + long currentTime = clock.getTime(); + // preempt (or kill) the selected containers - preemptOrkillSelectedContainerAfterWait(toPreempt); + preemptOrkillSelectedContainerAfterWait(toPreempt, currentTime); // cleanup staled preemption candidates - cleanupStaledPreemptionCandidates(); + cleanupStaledPreemptionCandidates(currentTime); } @Override