diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index f984fef..250f18c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -90,7 +90,7 @@ private final Map schedulingPolicies; - private final SchedulingPolicy defaultSchedulingPolicy; + private SchedulingPolicy defaultSchedulingPolicy; // Policy for mapping apps to queues @VisibleForTesting @@ -314,7 +314,7 @@ public SchedulingPolicy getSchedulingPolicy(String queueName) { public SchedulingPolicy getDefaultSchedulingPolicy() { return defaultSchedulingPolicy; } - + public Map> getConfiguredQueues() { return configuredQueues; } @@ -382,4 +382,9 @@ public void setReservationWindow(long window) { public void setAverageCapacity(int avgCapacity) { globalReservationQueueConfig.setAverageCapacity(avgCapacity); } + + @VisibleForTesting + protected void setDefaultSchedulingPolicy(SchedulingPolicy policy) { + defaultSchedulingPolicy = policy; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index 661caa7..a4cade2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -237,6 +237,7 @@ public synchronized void reloadAllocations() throws IOException, long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; float defaultFairSharePreemptionThreshold = 0.5f; SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY; + Map parents = new HashMap<>(); // Reservation global configuration knobs String planner = null; @@ -370,7 +371,8 @@ public synchronized void reloadAllocations() throws IOException, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, reservationAcls, - configuredQueues, reservableQueues, nonPreemptableQueues); + configuredQueues, reservableQueues, nonPreemptableQueues, + parents, defaultSchedPolicy); } // Load placement policy and pass it configured queues @@ -443,9 +445,12 @@ private void loadQueue(String parentName, Element element, Map> resAcls, Map> configuredQueues, Set reservableQueues, - Set nonPreemptableQueues) + Set nonPreemptableQueues, + Map parents, + SchedulingPolicy defaultSchedPolicy) throws AllocationConfigurationException { String queueName = element.getAttribute("name").trim(); + parents.put(queueName, parentName); if (queueName.contains(".")) { throw new AllocationConfigurationException("Bad fair scheduler config " @@ -510,6 +515,8 @@ private void loadQueue(String parentName, Element element, || "schedulingMode".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); SchedulingPolicy policy = SchedulingPolicy.parse(text); + checkIfParentPoliceAllow(policy, parentName, queuePolicies, parents, + defaultSchedPolicy); queuePolicies.put(queueName, policy); } else if ("aclSubmitApps".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData(); @@ -545,7 +552,7 @@ private void loadQueue(String parentName, Element element, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, resAcls, configuredQueues, reservableQueues, - nonPreemptableQueues); + nonPreemptableQueues, parents, defaultSchedPolicy); isLeaf = false; } } @@ -578,7 +585,39 @@ private void loadQueue(String parentName, Element element, minQueueResources.get(queueName))); } } - + + /** + * Recursive check whether parent policies of a queue are allowed. + * @param policy the policy of current queue + * @param parentName the name of parent queue + * @param queuePolicies a map of queue and its policy + * @param parents a map of queue and the name of its parent + * @throws AllocationConfigurationException if parent policy is not allowed. + */ + private void checkIfParentPoliceAllow(SchedulingPolicy policy, + String parentName, Map queuePolicies, + Map parents, SchedulingPolicy defaultSchedPolicy) + throws AllocationConfigurationException { + if (parentName == null) { + return; + } + + Set policies = policy.allowParentPolicies(); + SchedulingPolicy parentPolicy = queuePolicies.get(parentName); + if (parentPolicy == null) { + parentPolicy = defaultSchedPolicy; + } + + if (policies.contains(parentPolicy.getName())) { + checkIfParentPoliceAllow(policy, parents.get(parentName), + queuePolicies, parents, defaultSchedPolicy); + } else { + throw new AllocationConfigurationException("Parent scheduling policy " + + "can only be in " + policies + " if the " + + "scheduling policy of current queue is" + policy.getName()); + } + } + public interface Listener { public void onReload(AllocationConfiguration info); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index aeadcf6..79a53e9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import javax.xml.parsers.ParserConfigurationException; @@ -170,7 +171,29 @@ private FSQueue getQueue( } return queue; } - + + /** + * Recursively check whether parent policies of a queue are allowed. + * @param parent the parent queue + * @param policy the policy of current queue + */ + private void checkIfParentPoliceAllowed(FSQueue parent, + SchedulingPolicy policy) throws AllocationConfigurationException { + if (parent == null ) { + return; + } + + Set policies = policy.allowParentPolicies(); + + if (policies.contains(parent.getPolicy().getName())) { + checkIfParentPoliceAllowed(parent.getParent(), parent.getPolicy()); + } else { + throw new AllocationConfigurationException("Parent scheduling policy " + + "can only be in " + policies + " if the " + + "scheduling policy of current queue is" + policy.getName()); + } + } + /** * Creates a leaf or parent queue based on what is specified in 'queueType' * and places it in the tree. Creates any parents that don't already exist. @@ -216,6 +239,14 @@ private FSQueue createQueue(String name, FSQueueType queueType) { // Now that we know everything worked out, make all the queues // and add them to the map. AllocationConfiguration queueConf = scheduler.getAllocationConfiguration(); + try { + checkIfParentPoliceAllowed(parent, + queueConf.getDefaultSchedulingPolicy()); + } catch (AllocationConfigurationException ex) { + LOG.error(ex.toString()); + return null; + } + FSLeafQueue leafQueue = null; for (int i = newQueueNames.size()-1; i >= 0; i--) { String queueName = newQueueNames.get(i); @@ -407,6 +438,18 @@ private String ensureRootPrefix(String name) { public void updateAllocationConfiguration(AllocationConfiguration queueConf) { // Create leaf queues and the parent queues in a leaf's ancestry if they do not exist synchronized (queues) { + // Set scheduling policies for root and root.default + for (FSQueue queue : queues.values()) { + try { + SchedulingPolicy policy = + queueConf.getSchedulingPolicy(queue.getName()); + queue.setPolicy(policy); + } catch (AllocationConfigurationException ex) { + LOG.warn("Cannot apply configured scheduling policy to queue " + + queue.getName(), ex); + } + } + for (String name : queueConf.getConfiguredQueues().get( FSQueueType.LEAF)) { if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java index 9eda46c..0ee677d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java @@ -31,6 +31,7 @@ import java.util.Collection; import java.util.Comparator; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @Public @@ -191,4 +192,9 @@ public abstract boolean checkIfUsageOverFairShare( public abstract Resource getHeadroom(Resource queueFairShare, Resource queueUsage, Resource maxAvailable); + /** + * Return all allowed parent policies based on the current policy. + * @return a set of allowed parent policy class name + */ + public abstract Set allowParentPolicies(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index ad41b11..100523f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -20,6 +20,8 @@ import java.util.Collection; import java.util.Comparator; +import java.util.HashSet; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -203,4 +205,11 @@ private int compareShares(ResourceWeights shares1, ResourceWeights shares2, return 0; } } + + @Override + public Set allowParentPolicies() { + Set policies = new HashSet<>(); + policies.add(DominantResourceFairnessPolicy.NAME); + return policies; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index 6aa8405..e107f31 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -20,6 +20,8 @@ import java.io.Serializable; import java.util.Collection; import java.util.Comparator; +import java.util.HashSet; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -154,4 +156,12 @@ public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { public byte getApplicableDepth() { return SchedulingPolicy.DEPTH_ANY; } + + @Override + public Set allowParentPolicies() { + Set policies = new HashSet<>(); + policies.add(FairSharePolicy.NAME); + policies.add(DominantResourceFairnessPolicy.NAME); + return policies; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java index d3fdcf6..26738e7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java @@ -20,6 +20,8 @@ import java.io.Serializable; import java.util.Collection; import java.util.Comparator; +import java.util.HashSet; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -129,4 +131,14 @@ public Resource getHeadroom(Resource queueFairShare, public byte getApplicableDepth() { return SchedulingPolicy.DEPTH_LEAF; } + + @Override + public Set allowParentPolicies() { + Set policies = new HashSet<>(); + // Fifo policy can only be in a leaf queue, so it is not allowed + // as a parent queue policy. + policies.add(FairSharePolicy.NAME); + policies.add(DominantResourceFairnessPolicy.NAME); + return policies; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index cc91ef9..1b4223f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -557,6 +557,33 @@ public void testQueueNameContainingPeriods() throws Exception { } /** + * Verify whether scheduling policy of parent queue is allowed. + */ + @Test (expected = AllocationConfigurationException.class) + public void testParentPolicyAllowed() throws Exception { + Configuration conf = new Configuration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("fair"); + out.println(" "); + out.println(" drf"); + out.println(" "); + out.println(""); + out.println(""); + out.close(); + + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); + allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); + allocLoader.reloadAllocations(); + } + + /** * Verify that you can't have the queue name with whitespace only in the * allocations file. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java index a9b27a1..1157fd9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java @@ -33,12 +33,13 @@ private FairSchedulerConfiguration conf; private QueueManager queueManager; private Set notEmptyQueues; + private AllocationConfiguration allocConf; @Before public void setUp() throws Exception { conf = new FairSchedulerConfiguration(); FairScheduler scheduler = mock(FairScheduler.class); - AllocationConfiguration allocConf = new AllocationConfiguration(conf); + allocConf = new AllocationConfiguration(conf); when(scheduler.getAllocationConfiguration()).thenReturn(allocConf); when(scheduler.getConf()).thenReturn(conf); SystemClock clock = SystemClock.getInstance(); @@ -140,4 +141,15 @@ private void updateConfiguredLeafQueues(QueueManager queueMgr, String... confLea allocConf.configuredQueues.get(FSQueueType.LEAF).addAll(Sets.newHashSet(confLeafQueues)); queueMgr.updateAllocationConfiguration(allocConf); } + + @Test + public void testCreateChildQueueWithInvalidPolicy() + throws AllocationConfigurationException { + allocConf.setDefaultSchedulingPolicy(SchedulingPolicy.parse("drf")); + + queueManager.getQueue("root").setPolicy(SchedulingPolicy.parse("fair")); + FSLeafQueue child = queueManager.getLeafQueue("root.child", true); + assertNull("Child queue is not allowed to be 'drf' policy if" + + " its parent have the 'fair' policy ", child); + } }