diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 0743f60..6362d2a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -116,9 +116,6 @@ public static final String NATURAL_TERMINATION_FACTOR = "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor"; - public static final String BASE_YARN_RM_PREEMPTION = "yarn.scheduler.capacity."; - public static final String SUFFIX_DISABLE_PREEMPTION = ".disable_preemption"; - // the dispatcher to send preempt and kill events public EventHandler dispatcher; @@ -227,7 +224,7 @@ private void containerBasedPreemptOrKill(CSQueue root, // extract a summary of the queues from scheduler TempQueue tRoot; synchronized (scheduler) { - tRoot = cloneQueues(root, clusterResources, false); + tRoot = cloneQueues(root, clusterResources); } // compute the ideal distribution of resources among queues @@ -728,11 +725,9 @@ public String getPolicyName() { * * @param root the root of the CapacityScheduler queue hierarchy * @param clusterResources the total amount of resources in the cluster - * @param parentDisablePreempt true if disable preemption is set for parent * @return the root of the cloned queue hierarchy */ - private TempQueue cloneQueues(CSQueue root, Resource clusterResources, - boolean parentDisablePreempt) { + private TempQueue cloneQueues(CSQueue root, Resource clusterResources) { TempQueue ret; synchronized (root) { String queueName = root.getQueueName(); @@ -744,12 +739,6 @@ private TempQueue cloneQueues(CSQueue root, Resource clusterResources, Resource guaranteed = Resources.multiply(clusterResources, absCap); Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap); - boolean queueDisablePreemption = false; - String queuePropName = BASE_YARN_RM_PREEMPTION + root.getQueuePath() - + SUFFIX_DISABLE_PREEMPTION; - queueDisablePreemption = scheduler.getConfiguration() - .getBoolean(queuePropName, parentDisablePreempt); - Resource extra = Resource.newInstance(0, 0); if (Resources.greaterThan(rc, clusterResources, current, guaranteed)) { extra = Resources.subtract(current, guaranteed); @@ -759,7 +748,7 @@ private TempQueue cloneQueues(CSQueue root, Resource clusterResources, Resource pending = l.getTotalResourcePending(); ret = new TempQueue(queueName, current, pending, guaranteed, maxCapacity); - if (queueDisablePreemption) { + if (!root.isPreemptable()) { ret.untouchableExtra = extra; } else { ret.preemptableExtra = extra; @@ -771,8 +760,7 @@ private TempQueue cloneQueues(CSQueue root, Resource clusterResources, maxCapacity); Resource childrensPreemptable = Resource.newInstance(0, 0); for (CSQueue c : root.getChildQueues()) { - TempQueue subq = - cloneQueues(c, clusterResources, queueDisablePreemption); + TempQueue subq = cloneQueues(c, clusterResources); Resources.addTo(childrensPreemptable, subq.preemptableExtra); ret.addChild(subq); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index fec3a56..cbe166a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -38,14 +39,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; - import com.google.common.collect.Sets; public abstract class AbstractCSQueue implements CSQueue { CSQueue parent; final String queueName; - float capacity; float maximumCapacity; float absoluteCapacity; @@ -74,10 +73,12 @@ Map acls = new HashMap(); boolean reservationsContinueLooking; - + private boolean preemptable; + private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - + private CapacitySchedulerContext csContext; + public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { this.minimumAllocation = cs.getMinimumResourceCapability(); @@ -120,6 +121,8 @@ public AbstractCSQueue(CapacitySchedulerContext cs, maxCapacityByNodeLabels = cs.getConfiguration().getMaximumNodeLabelCapacities(getQueuePath(), accessibleLabels, labelManager); + + this.csContext = cs; } @Override @@ -318,6 +321,8 @@ synchronized void setupQueueConfigs(Resource clusterResource, float capacity, absoluteCapacityByNodeLabels, absoluteCapacityByNodeLabels); this.reservationsContinueLooking = reservationContinueLooking; + + this.preemptable = isQueuePathHierarchyPreemptable(this); } protected QueueInfo getQueueInfo() { @@ -454,4 +459,42 @@ public boolean getReservationContinueLooking() { public Resource getUsedResourceByLabel(String nodeLabel) { return usedResourcesByNodeLabels.get(nodeLabel); } + + @Private + public boolean isPreemptable() { + return preemptable; + } + + /** + * The specified queue is preemptable if system-wide preemption is turned on + * unless any queue in the qPath hierarchy has explicitly turned + * preemption off. + * NOTE: Preemptability is inherited from a queue's parent. + * + * @return true if queue is preemptable, false otherwise + */ + private boolean isQueuePathHierarchyPreemptable(CSQueue q) { + CapacitySchedulerConfiguration csConf = csContext.getConfiguration(); + boolean systemWidePreemption = + csConf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS); + CSQueue parentQ = q.getParent(); + + // If the system-wide preemption switch is turned off, none of the queues in + // the qPath hierarchy are preemptable, so return false. + if (!systemWidePreemption) return false; + + // If q is the root queue and the system-wide preemption switch is turned + // on, then q is preemptable (default=true, below) unless explicitly turned + // off. + if (parentQ == null) { + return csConf.getQueuePreemptable(q.getQueuePath(), true); + } + + // If this is not the root queue, recursively walk up qPath's hierarchy so + // that the default value can be inherited from each parent. Preemptability + // will be inherited from the parent(s) unless explicitly overridden at + // this level. + return csConf.getQueuePreemptable(q.getQueuePath(), parentQ.isPreemptable()); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 07a7e0e..b512d5f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -300,4 +300,10 @@ public void attachContainer(Resource clusterResource, * @return capacity by node label */ public float getCapacityByNodeLabel(String nodeLabel); + + /** + * Check whether a queue is preemptable + * @return true if queue is preemptable, false if not + */ + public boolean isPreemptable(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 5bbb436..8f84a6c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -181,6 +181,9 @@ public static final boolean DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE = false; @Private + public static final String QUEUE_PREEMPTION_DISABLED = "disable_preemption"; + + @Private public static class QueueMapping { public enum MappingType { @@ -802,4 +805,35 @@ public long getEnforcementWindow(String queue) { DEFAULT_RESERVATION_ENFORCEMENT_WINDOW); return enforcementWindow; } + + /** + * Sets the disable_preemption property in order to indicate + * whether or not containers on a specific queue can be preempted. + * NOTE: The value of the disable_preemption property will be + * set to the opposite of the isPreemptable parameter. Turning + * off preemptability (isPreemptable==false) means disabling + * preemption (disable_preemption==true). + * + * @param queue + * @param isPreemptable + */ + public void setQueuePreemptable(String queue, boolean isPreemptable) { + setBoolean(getQueuePrefix(queue) + QUEUE_PREEMPTION_DISABLED, + !isPreemptable); + } + + /** + * Indicates whether containers on the specified queue can be preempted. + * + * @param queue queue to query + * @param defaultVal used as default if the disable_preemption + * is not set in the configuration + * @return + */ + public boolean getQueuePreemptable(String queue, boolean defaultVal) { + boolean preemptionDisabled = + getBoolean(getQueuePrefix(queue) + QUEUE_PREEMPTION_DISABLED, + !defaultVal); + return !preemptionDisabled; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index fd8a7ee..d77bd78 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -296,7 +296,8 @@ protected synchronized void setupQueueConfigs( "labels=" + labelStrBuilder.toString() + "\n" + "nodeLocalityDelay = " + nodeLocalityDelay + "\n" + "reservationsContinueLooking = " + - reservationsContinueLooking + "\n"); + reservationsContinueLooking + "\n" + + "preemptable = " + isPreemptable() + "\n"); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java index 1f65b88..5a0cb70 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java @@ -121,7 +121,8 @@ protected void render(Block html) { _("Configured Minimum User Limit Percent:", Integer.toString(lqinfo.getUserLimit()) + "%"). _("Configured User Limit Factor:", String.format("%.1f", lqinfo.getUserLimitFactor())). _r("Active Users: ", activeUserList.toString()). - _("Accessible Node Labels:", StringUtils.join(",", lqinfo.getNodeLabels())); + _("Accessible Node Labels:", StringUtils.join(",", lqinfo.getNodeLabels())). + _("Preemptable:", lqinfo.isPreemptable()); html._(InfoBlock.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java index bb4c749..16466aa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java @@ -37,6 +37,7 @@ protected float userLimitFactor; protected ResourceInfo aMResourceLimit; protected ResourceInfo userAMResourceLimit; + protected boolean preemptable; CapacitySchedulerLeafQueueInfo() { }; @@ -53,6 +54,7 @@ userLimitFactor = q.getUserLimitFactor(); aMResourceLimit = new ResourceInfo(q.getAMResourceLimit()); userAMResourceLimit = new ResourceInfo(q.getUserAMResourceLimit()); + preemptable = q.isPreemptable(); } public int getNumActiveApplications() { @@ -95,4 +97,8 @@ public ResourceInfo getAMResourceLimit() { public ResourceInfo getUserAMResourceLimit() { return userAMResourceLimit; } + + public boolean isPreemptable() { + return preemptable; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 0a147f4..7b20a7c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -17,12 +17,10 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.BASE_YARN_RM_PREEMPTION; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.SUFFIX_DISABLE_PREEMPTION; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER; @@ -52,6 +50,7 @@ import java.util.NavigableSet; import java.util.Random; import java.util.Set; +import java.util.StringTokenizer; import java.util.TreeSet; import org.apache.commons.collections.map.HashedMap; @@ -322,24 +321,22 @@ public void testPerQueueDisablePreemption() { { 3, 0, 0, 0 }, // subqueues }; - schedConf.setBoolean(BASE_YARN_RM_PREEMPTION - + "root.queueB" + SUFFIX_DISABLE_PREEMPTION, true); + schedConf.setQueuePreemptable("root.queueB", false); ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); - // With PREEMPTION_DISABLED set for queueB, get resources from queueC + // Since queueB is not preemptable, get resources from queueC verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC))); verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); - // With no PREEMPTION_DISABLED set for queueB, resources will be preempted - // from both queueB and queueC. Test must be reset for so that the mDisp + // Since queueB is preemptable, resources will be preempted + // from both queueB and queueC. Test must be reset so that the mDisp // event handler will count only events from the following test and not the // previous one. setup(); + schedConf.setQueuePreemptable("root.queueB", true); ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData); - - schedConf.setBoolean(BASE_YARN_RM_PREEMPTION - + "root.queueB" + SUFFIX_DISABLE_PREEMPTION, false); + policy2.editSchedule(); verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB))); @@ -375,9 +372,8 @@ public void testPerQueueDisablePreemptionHierarchical() { // Need to call setup() again to reset mDisp setup(); - // Disable preemption for queueB and it's children - schedConf.setBoolean(BASE_YARN_RM_PREEMPTION - + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true); + // Turn off preemption for queueB and it's children + schedConf.setQueuePreemptable("root.queueA.queueB", false); ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData); policy2.editSchedule(); ApplicationAttemptId expectedAttemptOnQueueC = @@ -423,9 +419,8 @@ public void testPerQueueDisablePreemptionBroadHierarchical() { // Need to call setup() again to reset mDisp setup(); - // Disable preemption for queueB(appA) - schedConf.setBoolean(BASE_YARN_RM_PREEMPTION - + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true); + // Turn off preemption for queueB(appA) + schedConf.setQueuePreemptable("root.queueA.queueB", false); ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData); policy2.editSchedule(); // Now that queueB(appA) is not preemptable, verify that resources come @@ -434,11 +429,9 @@ public void testPerQueueDisablePreemptionBroadHierarchical() { verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); setup(); - // Disable preemption for two of the 3 queues with over-capacity. - schedConf.setBoolean(BASE_YARN_RM_PREEMPTION - + "root.queueD.queueE" + SUFFIX_DISABLE_PREEMPTION, true); - schedConf.setBoolean(BASE_YARN_RM_PREEMPTION - + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true); + // Turn off preemption for two of the 3 queues with over-capacity. + schedConf.setQueuePreemptable("root.queueD.queueE", false); + schedConf.setQueuePreemptable("root.queueA.queueB", false); ProportionalCapacityPreemptionPolicy policy3 = buildPolicy(qData); policy3.editSchedule(); @@ -476,11 +469,10 @@ public void testPerQueueDisablePreemptionInheritParent() { verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA))); verify(mDisp, times(182)).handle(argThat(new IsPreemptionRequestFor(appB))); - // Disable preemption for queueA and it's children. queueF(appC)'s request + // Turn off preemption for queueA and it's children. queueF(appC)'s request // should starve. setup(); // Call setup() to reset mDisp - schedConf.setBoolean(BASE_YARN_RM_PREEMPTION - + "root.queueA" + SUFFIX_DISABLE_PREEMPTION, true); + schedConf.setQueuePreemptable("root.queueA", false); ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData); policy2.editSchedule(); verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); // queueC @@ -504,8 +496,7 @@ public void testPerQueuePreemptionNotAllUntouchable() { { -1, -1, 1, 1, 1, -1, 1, 1, 1 }, // req granularity { 2, 3, 0, 0, 0, 3, 0, 0, 0 }, // subqueues }; - schedConf.setBoolean(BASE_YARN_RM_PREEMPTION - + "root.queueA.queueC" + SUFFIX_DISABLE_PREEMPTION, true); + schedConf.setQueuePreemptable("root.queueA.queueC", false); ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); // Although queueC(appB) is way over capacity and is untouchable, @@ -529,9 +520,8 @@ public void testPerQueueDisablePreemptionRootDisablesAll() { { 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues }; + schedConf.setQueuePreemptable("root", false); ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); - schedConf.setBoolean(BASE_YARN_RM_PREEMPTION - + "root" + SUFFIX_DISABLE_PREEMPTION, true); policy.editSchedule(); // All queues should be non-preemptable, so request should starve. verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); // queueC @@ -893,7 +883,7 @@ public void testAMResourcePercentForSkippedAMContainers() { verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA))); setAMContainer = false; } - + static class IsPreemptionRequestFor extends ArgumentMatcher { private final ApplicationAttemptId appAttId; @@ -952,6 +942,8 @@ ParentQueue mockNested(int[] abs, int[] maxCap, int[] used, when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot); when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot); when(root.getQueuePath()).thenReturn("root"); + boolean isPreemptable = mockPreemptionStatus("root"); + when(root.isPreemptable()).thenReturn(isPreemptable); for (int i = 1; i < queues.length; ++i) { final CSQueue q; @@ -971,11 +963,29 @@ ParentQueue mockNested(int[] abs, int[] maxCap, int[] used, parentPathName = (parentPathName == null) ? "root" : parentPathName; String queuePathName = (parentPathName+"."+queueName).replace("/","root"); when(q.getQueuePath()).thenReturn(queuePathName); + isPreemptable = mockPreemptionStatus(queuePathName); + when(q.isPreemptable()).thenReturn(isPreemptable); } assert 0 == pqs.size(); return root; } + // Determine if any of the elements in the queupath have preemption disabled. + // Also must handle the case where preemption disabled property is explicitly + // set to something other than the default. Assumes system-wide preemption + // property is true. + private boolean mockPreemptionStatus(String queuePathName) { + boolean qIsPreemptable = true; + StringTokenizer tokenizer = new StringTokenizer(queuePathName, "."); + String qName = ""; + while(tokenizer.hasMoreTokens()) { + qName += tokenizer.nextToken(); + qIsPreemptable = schedConf.getQueuePreemptable(qName, qIsPreemptable); + qName += "."; + } + return qIsPreemptable; + } + ParentQueue mockParentQueue(ParentQueue p, int subqueues, Deque pqs) { ParentQueue pq = mock(ParentQueue.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 2aa57a0..c12154f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -2071,4 +2072,55 @@ public void testAppReservationWithDominantResourceCalculator() throws Exception Assert.assertEquals(0, report.getNumReservedContainers()); rm.stop(); } + + @Test + public void testIsPreemptable() throws Exception { + CapacityScheduler cs = new CapacityScheduler(); + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null, + null, new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null); + setupQueueConfiguration(conf); + cs.setConf(new YarnConfiguration()); + cs.setRMContext(resourceManager.getRMContext()); + cs.init(conf); + cs.start(); + cs.reinitialize(conf, rmContext); + + CSQueue rootQueue = cs.getRootQueue(); + CSQueue queueB = findQueue(rootQueue, B); + CSQueue queueB2 = findQueue(queueB, B2); + + // When preemption turned on for the whole system + // (yarn.resourcemanager.scheduler.monitor.enable=true), and with no other + // preemption properties set, queue root.b.b2 should be preemptable. + assertTrue("queue " + B2 + " should default to preemptable", + queueB2.isPreemptable()); + + // The preemption property should be inherited from root all the + // way down so that root.b.b2 should NOT be preemptable. + conf.setQueuePreemptable(rootQueue.getQueuePath(), false); + cs.reinitialize(conf, rmContext); + assertFalse( + "queue " + B2 + " should have inherited non-preemptability from root", + queueB2.isPreemptable()); + + // Enable preemption for root (grandparent) but disable for root.b (parent). + // root.b.b2 should inherit property from parent and NOT be preemptable + conf.setQueuePreemptable(rootQueue.getQueuePath(), true); + conf.setQueuePreemptable(queueB.getQueuePath(), false); + cs.reinitialize(conf, rmContext); + assertFalse( + "queue " + B2 + " should have inherited non-preemptability from parent", + queueB2.isPreemptable()); + + // When preemption is turned on for root.b.b2, it should be preemptable + // even though preemption is disabled on root.b (parent). + conf.setQueuePreemptable(queueB2.getQueuePath(), true); + cs.reinitialize(conf, rmContext); + assertTrue("queue " + B2 + " should have been preemptable", + queueB2.isPreemptable()); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java index ef7435a..94040b5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java @@ -347,7 +347,7 @@ private void verifySubQueue(JSONObject info, String q, int numExpectedElements = 13; boolean isParentQueue = true; if (!info.has("queues")) { - numExpectedElements = 23; + numExpectedElements = 24; isParentQueue = false; } assertEquals("incorrect number of elements", numExpectedElements, info.length());