Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java (revision 1592939) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java (working copy) @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType; @@ -109,7 +110,11 @@ * #WAIT_TIME_BEFORE_KILL}, even absent natural termination. */ public static final String NATURAL_TERMINATION_FACTOR = "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor"; - + /** If true, all application master containers will be given least priority + * while considering for preemption among other types of containers of multiple + * applications. */ + public static final String SKIP_AM_CONTAINER_FROM_PREEMPTION = + "yarn.resourcemanager.monitor.capacity.preemption.skip_am_container"; //the dispatcher to send preempt and kill events public EventHandler dispatcher; @@ -124,6 +129,7 @@ private float percentageClusterPreemptionAllowed; private double naturalTerminationFactor; private boolean observeOnly; + private boolean skipAMContainer; public ProportionalCapacityPreemptionPolicy() { clock = new SystemClock(); @@ -162,6 +168,8 @@ percentageClusterPreemptionAllowed = config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1); observeOnly = config.getBoolean(OBSERVE_ONLY, false); + skipAMContainer = config.getBoolean(SKIP_AM_CONTAINER_FROM_PREEMPTION, + false); rc = scheduler.getResourceCalculator(); } @@ -387,6 +395,7 @@ Map> list = new HashMap>(); + List skippedAMContainerlist = new ArrayList(); for (TempQueue qT : queues) { // we act only if we are violating balance by more than @@ -411,8 +420,24 @@ break; } list.put(fc.getApplicationAttemptId(), - preemptFrom(fc, clusterResource, resToObtain)); + preemptFrom(fc, clusterResource, resToObtain, skippedAMContainerlist)); } + // If skipAMContainer is disabled, skippedAMContainerlist will be empty. + for (RMContainer c : skippedAMContainerlist) { + if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain, + Resources.none())) { + break; + } + Set contToPrempt = list.get(c + .getApplicationAttemptId()); + if (null == contToPrempt) { + contToPrempt = new HashSet(); + list.put(c.getApplicationAttemptId(), contToPrempt); + } + contToPrempt.add(c); + Resources.subtractFrom(resToObtain, c.getContainer().getResource()); + } + skippedAMContainerlist.clear(); } } } @@ -426,10 +451,12 @@ * @param app * @param clusterResource * @param rsrcPreempt + * @param skippedAMContainerlist * @return */ - private Set preemptFrom( - FiCaSchedulerApp app, Resource clusterResource, Resource rsrcPreempt) { + private Set preemptFrom(FiCaSchedulerApp app, + Resource clusterResource, Resource rsrcPreempt, + List skippedAMContainerlist) { Set ret = new HashSet(); ApplicationAttemptId appId = app.getApplicationAttemptId(); @@ -457,10 +484,16 @@ sortContainers(containers); for (RMContainer c : containers) { - if (Resources.lessThanOrEqual(rc, clusterResource, - rsrcPreempt, Resources.none())) { + if (Resources.lessThanOrEqual(rc, clusterResource, rsrcPreempt, + Resources.none())) { return ret; } + if (skipAMContainer && c.getContainer().getPriority() + .equals(RMAppAttemptImpl.AM_CONTAINER_PRIORITY)) { + // Skip AM Container (which has priority of 0) for now. + skippedAMContainerlist.add(c); + continue; + } ret.add(c); Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource()); } Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java (revision 1592941) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java (working copy) @@ -58,6 +58,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.SKIP_AM_CONTAINER_FROM_PREEMPTION; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER; import static org.junit.Assert.*; @@ -68,6 +69,7 @@ static final long TS = 3141592653L; int appAlloc = 0; + boolean setContPriority = false; Random rand = null; Clock mClock = null; Configuration conf = null; @@ -351,7 +353,71 @@ assert containers.get(4).equals(rm5); } + + @Test + public void testSkipAMContainer() { + int[][] qData = new int[][] { + // / A B + { 100, 50, 50 }, // abs + { 100, 100, 0 }, // used + { 70, 20, 50 }, // pending + { 0, 0, 0 }, // reserved + { 5, 4, 1 }, // apps + { -1, 1, 1 }, // req granularity + { 2, 0, 0 }, // subqueues + }; + conf.setBoolean(SKIP_AM_CONTAINER_FROM_PREEMPTION, true); + setContPriority = true; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + + // By skipping AM Container, all other 24 containers of appD will be + // preempted + verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appD))); + // By skipping AM Container, all other 24 containers of appC will be + // preempted + verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // Since AM containers of appC and appD are saved, 2 containers from appB + // has to be preempted. + verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB))); + setContPriority = false; + } + + @Test + public void testPreemptSkippedAMContainers() { + int[][] qData = new int[][] { + // / A B + { 100, 10, 90 }, // abs + { 100, 100, 0 }, // used + { 70, 20, 90 }, // pending + { 0, 0, 0 }, // reserved + { 5, 4, 1 }, // apps + { -1, 5, 5 }, // req granularity + { 2, 0, 0 }, // subqueues + }; + conf.setBoolean(SKIP_AM_CONTAINER_FROM_PREEMPTION, true); + setContPriority = true; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + + // All 5 containers of appD will be preempted including AM container. + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appD))); + + // All 5 containers of appB will be preempted including AM container. + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // By skipping AM Container, all other 4 containers of appB will be + // preempted + verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB))); + + // By skipping AM Container, all other 4 containers of appB will be + // preempted + verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA))); + setContPriority = false; + } + static class IsPreemptionRequestFor extends ArgumentMatcher { private final ApplicationAttemptId appAttId; @@ -487,8 +553,21 @@ when(app.getReservedContainers()).thenReturn(cReserved); List cLive = new ArrayList(); + int[] priority = new int[used]; + + // If setContPriority is enabled, try to have containers from different + // priorities. + if (setContPriority) { + for (int i = 0; i < used; i += gran) { + priority[i] = i; + } + } for (int i = 0; i < used; i += gran) { - cLive.add(mockContainer(appAttId, cAlloc, unit, 1)); + if (setContPriority) { + cLive.add(mockContainer(appAttId, cAlloc, unit, priority[i])); + } else { + cLive.add(mockContainer(appAttId, cAlloc, unit, 1)); + } ++cAlloc; } when(app.getLiveContainers()).thenReturn(cLive); @@ -504,6 +583,7 @@ RMContainer mC = mock(RMContainer.class); when(mC.getContainerId()).thenReturn(cId); when(mC.getContainer()).thenReturn(c); + when(mC.getApplicationAttemptId()).thenReturn(appAttId); return mC; }