diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 1f47b5f..a18b7ed 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -88,6 +88,13 @@ * kill events. */ public static final String OBSERVE_ONLY = "yarn.resourcemanager.monitor.capacity.preemption.observe_only"; + /** If true, perform multiple rounds of preemption for live containers + * per preemption attempt instead of the default single round. Continue + * until desired resources are obtained or no further progress is made. + * During each round preempt at most one container per app. Useful when + * using preemption with the FairOrderingPolicy. */ + public static final String PREEMPT_EVENLY = + "yarn.resourcemanager.monitor.capacity.preemption.preempt_evenly"; /** Time in milliseconds between invocations of this policy */ public static final String MONITORING_INTERVAL = "yarn.resourcemanager.monitor.capacity.preemption.monitoring_interval"; @@ -132,6 +139,8 @@ private float percentageClusterPreemptionAllowed; private double naturalTerminationFactor; private boolean observeOnly; + @VisibleForTesting + protected boolean preemptEvenly; private Map> queueToPartitions = new HashMap<>(); private RMNodeLabelsManager nlm; @@ -173,6 +182,7 @@ public void init(Configuration config, percentageClusterPreemptionAllowed = config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1); observeOnly = config.getBoolean(OBSERVE_ONLY, false); + preemptEvenly = config.getBoolean(PREEMPT_EVENLY, false); rc = scheduler.getResourceCalculator(); nlm = scheduler.getRMContext().getNodeLabelManager(); } @@ -672,8 +682,28 @@ private void addToPreemptMap( break; } - preemptFrom(fc, clusterResource, resToObtainByPartition, + preemptNonLive(fc, clusterResource, resToObtainByPartition, skippedAMContainerlist, skippedAMSize, preemptMap); + if (!preemptEvenly) { + preemptLive(fc, clusterResource, resToObtainByPartition, + skippedAMContainerlist, skippedAMSize, preemptMap); + } + } + + if (preemptEvenly) { + boolean keepTrying = true; + while (keepTrying) { + keepTrying = false; + desc = + leafQueue.getOrderingPolicy().getPreemptionIterator(); + while (desc.hasNext()) { + FiCaSchedulerApp fc = desc.next(); + if (preemptLive(fc, clusterResource, resToObtainByPartition, + skippedAMContainerlist, skippedAMSize, preemptMap)) { + keepTrying = true; + } + } + } } // Can try preempting AMContainers (still saving atmost @@ -732,10 +762,10 @@ private void preemptAMContainers(Resource clusterResource, } /** - * Given a target preemption for a specific application, select containers - * to preempt (after unreserving all reservation for that app). + * Given a target preemption for a specific application, select non-live + * containers to preempt (after unreserving all reservation for that app). */ - private void preemptFrom(FiCaSchedulerApp app, + private void preemptNonLive(FiCaSchedulerApp app, Resource clusterResource, Map resToObtainByPartition, List skippedAMContainerlist, Resource skippedAMSize, Map> preemptMap) { @@ -763,6 +793,40 @@ private void preemptFrom(FiCaSchedulerApp app, } } + //Find the AM containers and account for them + List liveContainers = + new ArrayList(app.getLiveContainers()); + + for (RMContainer c : liveContainers) { + if (resToObtainByPartition.isEmpty()) { + return; + } + + // Skip AM Container from preemption for now. + if (c.isAMContainer()) { + skippedAMContainerlist.add(c); + Resources.addTo(skippedAMSize, c.getAllocatedResource()); + continue; + } + } + } + + /** + * Given a target preemption for a specific application, select live + * containers to preempt. + */ + private boolean preemptLive(FiCaSchedulerApp app, + Resource clusterResource, Map resToObtainByPartition, + List skippedAMContainerlist, Resource skippedAMSize, + Map> preemptMap) { + ApplicationAttemptId appId = app.getApplicationAttemptId(); + if (LOG.isDebugEnabled()) { + LOG.debug("Looking at application=" + app.getApplicationAttemptId() + + " resourceToObtain=" + resToObtainByPartition); + } + + boolean didPreempt = false; + // if more resources are to be freed go through all live containers in // reverse priority and reverse allocation order and mark them for // preemption @@ -773,20 +837,25 @@ private void preemptFrom(FiCaSchedulerApp app, for (RMContainer c : liveContainers) { if (resToObtainByPartition.isEmpty()) { - return; + return didPreempt; } // Skip AM Container from preemption for now. if (c.isAMContainer()) { - skippedAMContainerlist.add(c); - Resources.addTo(skippedAMSize, c.getAllocatedResource()); continue; } // Try to preempt this container - tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, - clusterResource, preemptMap); + if (tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, + clusterResource, preemptMap)) { + didPreempt = true; + if (preemptEvenly) { + //if we are preempting evenly, one live container this go + return didPreempt; + } + } } + return didPreempt; } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 6c0ed6c..96c58f9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -209,6 +209,41 @@ public void testProportionalPreemption() { } @Test + public void testUnevenPreemption() { + runEven(false); + } + + @Test + public void testEvenPreemption() { + runEven(true); + } + + public void runEven(boolean preemptEvenly) { + int[][] qData = new int[][]{ + // / A B C D + { 100, 10, 40, 20, 30 }, // abs + { 100, 100, 100, 100, 100 }, // maxCap + { 100, 30, 60, 10, 0 }, // used + { 45, 20, 5, 20, 0 }, // pending + { 0, 0, 0, 0, 0 }, // reserved + { 3, 2, 1, 1, 0 }, // apps + { -1, 1, 1, 1, 1 }, // req granularity + { 4, 0, 0, 0, 0 }, // subqueues + }; + + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.preemptEvenly = preemptEvenly; + policy.editSchedule(); + if (preemptEvenly) { + verify(mDisp, times(8)).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, times(8)).handle(argThat(new IsPreemptionRequestFor(appB))); + } else { + verify(mDisp, times(1)).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, times(15)).handle(argThat(new IsPreemptionRequestFor(appB))); + } + } + + @Test public void testMaxCap() { int[][] qData = new int[][]{ // / A B C