diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index bca0ea4..640cd6c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -24,6 +24,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.Iterator; import java.util.concurrent.CopyOnWriteArrayList; import javax.xml.parsers.ParserConfigurationException; @@ -39,13 +41,11 @@ import com.google.common.base.CharMatcher; import com.google.common.annotations.VisibleForTesting; -import java.util.Iterator; -import java.util.Set; import org.apache.hadoop.yarn.api.records.Resource; + /** * Maintains a list of queues as well as scheduling parameters for each queue, * such as guaranteed share allocations, from the fair scheduler config file. - * */ @Private @Unstable @@ -79,7 +79,7 @@ public void initialize(Configuration conf) throws IOException, // Create the default queue getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true); } - + /** * Get a leaf queue by name, creating it if the create param is true and is necessary. * If the queue is not or can not be a leaf queue, i.e. it already exists as a @@ -271,6 +271,16 @@ private FSQueue createNewQueues(FSQueueType queueType, FSParentQueue newParent = null; String queueName = i.next(); + // Check if parent policy is allowed + try { + SchedulingPolicy.isParentPolicyAllowed(parent.getPolicy(), + scheduler.getAllocationConfiguration(). + getSchedulingPolicy(queueName)); + } catch (AllocationConfigurationException ex) { + LOG.error("Can't create queue '" + queueName + "'.", ex); + return null; + } + // Only create a leaf queue at the very end if (!i.hasNext() && (queueType != FSQueueType.PARENT)) { FSLeafQueue leafQueue = new FSLeafQueue(queueName, scheduler, parent); @@ -480,6 +490,11 @@ private String ensureRootPrefix(String name) { public void updateAllocationConfiguration(AllocationConfiguration queueConf) { // Create leaf queues and the parent queues in a leaf's ancestry if they do not exist synchronized (queues) { + // Reinitialize scheduling policies for existing queues before creating + // any queue, since we need parent policies to determine if we can create + // child queues. + reinitSchedulingPolicies(rootQueue, queueConf); + for (String name : queueConf.getConfiguredQueues().get( FSQueueType.LEAF)) { if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) { @@ -520,4 +535,30 @@ boolean isQueueNameValid(String node) { return !node.isEmpty() && node.equals(CharMatcher.WHITESPACE.trimFrom(node)); } + + /** + * Recursively reinitialize the scheduling policies of queues in preorder. Log + * an error if parent policies aren't allowed. + * + * @param queue the root of queues to be reinitialized + * @param queueConf allocation configuration + */ + public void reinitSchedulingPolicies(FSQueue queue, + AllocationConfiguration queueConf) { + try { + SchedulingPolicy policy = queueConf.getSchedulingPolicy(queue.getName()); + queue.setPolicy(policy); + if (queue.getParent() != null) { + SchedulingPolicy.isParentPolicyAllowed(queue.getParent().getPolicy(), + queue.getPolicy()); + } + + for (FSQueue child : queue.getChildQueues()) { + reinitSchedulingPolicies(child, queueConf); + } + } catch (AllocationConfigurationException ex) { + LOG.error("Cannot apply configured scheduling policy to queue " + + queue.getName(), ex); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java index 9eda46c..3346b78 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java @@ -31,6 +31,7 @@ import java.util.Collection; import java.util.Comparator; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @Public @@ -191,4 +192,27 @@ public abstract boolean checkIfUsageOverFairShare( public abstract Resource getHeadroom(Resource queueFairShare, Resource queueUsage, Resource maxAvailable); + /** + * Return allowed parent policies based on the current policy. + * + * @return policies allowed + */ + public abstract Set allowedParentPolicies(); + + /** + * Check whether the parent policy of a queue are allowed. + * + * @param parentPolicy the parent policy + * @param policy the policy of current queue + */ + public static void isParentPolicyAllowed(SchedulingPolicy parentPolicy, + SchedulingPolicy policy) throws AllocationConfigurationException { + Set policies = policy.allowedParentPolicies(); + + if (!policies.contains(parentPolicy.getClass())) { + throw new AllocationConfigurationException("Parent scheduling policy " + + "can only be in " + policies + " if the " + + "scheduling policy of current queue is " + policy.getName()); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index ad41b11..a29bcd8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -20,6 +20,8 @@ import java.util.Collection; import java.util.Comparator; +import java.util.HashSet; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -203,4 +205,11 @@ private int compareShares(ResourceWeights shares1, ResourceWeights shares2, return 0; } } + + @Override + public Set allowedParentPolicies() { + Set policies = new HashSet<>(); + policies.add(DominantResourceFairnessPolicy.class); + return policies; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index d47ea07..0afd1c7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -20,6 +20,8 @@ import java.io.Serializable; import java.util.Collection; import java.util.Comparator; +import java.util.HashSet; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -178,4 +180,12 @@ public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { public byte getApplicableDepth() { return SchedulingPolicy.DEPTH_ANY; } + + @Override + public Set allowedParentPolicies() { + Set policies = new HashSet<>(); + policies.add(FairSharePolicy.class); + policies.add(DominantResourceFairnessPolicy.class); + return policies; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java index 3e2cb9f..7f21c9c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java @@ -20,6 +20,8 @@ import java.io.Serializable; import java.util.Collection; import java.util.Comparator; +import java.util.HashSet; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -132,4 +134,14 @@ public Resource getHeadroom(Resource queueFairShare, public byte getApplicableDepth() { return SchedulingPolicy.DEPTH_LEAF; } + + @Override + public Set allowedParentPolicies() { + Set policies = new HashSet<>(); + // Fifo policy can only be in a leaf queue, so it is not allowed + // as a parent queue policy. + policies.add(FairSharePolicy.class); + policies.add(DominantResourceFairnessPolicy.class); + return policies; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index b1e412b..01b2b42 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -5096,4 +5096,96 @@ public void testUpdateDemand() throws IOException { Resources.equals(bQueue.getDemand(), maxResource)); } + @Test + public void testSchedulingPolicyViolation() throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("fair"); + out.println(" "); + out.println(" drf"); + out.println(" "); + out.println(" "); + out.println(" fair"); + out.println(" "); + out.println(""); + out.println("drf" + + ""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + + FSQueue child1 = scheduler.getQueueManager().getQueue("child1"); + assertNull("Queue 'child1' should be null since its policy isn't allowed to" + + " be 'drf' if its parent policy is 'fair'", child1); + + // dynamic queue + FSQueue dynamicQueue = scheduler.getQueueManager(). + getLeafQueue("dynamicQueue", true); + assertNull("Dynamic queue should be null since it isn't allowed to be 'drf'" + + " policy if its parent policy is 'fair'", dynamicQueue); + + // Set child1 to 'fair' and child2 to 'drf', the reload the allocation file. + out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("fair"); + out.println(" "); + out.println(" fair"); + out.println(" "); + out.println(" "); + out.println(" drf"); + out.println(" "); + out.println(""); + out.println("drf" + + ""); + out.println(""); + out.close(); + + scheduler.reinitialize(conf, resourceManager.getRMContext()); + child1 = scheduler.getQueueManager().getQueue("child1"); + assertNotNull("Queue 'child1' should be not null since its policy is " + + "allowed to be 'fair' if its parent policy is 'fair'", child1); + } + + @Test + public void testSchedulingPolicyViolationInTheMiddleLevel() + throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("fair"); + out.println(" "); + out.println(" fair"); + out.println(" "); + out.println(" drf"); + out.println(" "); + out.println(" fair"); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + + FSQueue level2 = scheduler.getQueueManager().getQueue("level2"); + assertNotNull("Queue 'level2' shouldn't be null since its policy is allowed" + + " to be 'fair' if its parent policy is 'fair'", level2); + FSQueue level3 = scheduler.getQueueManager().getQueue("level2.level3"); + assertNull("Queue 'level3' should be null since its policy isn't allowed" + + " to be 'drf' if its parent policy is 'fair'", level3); + FSQueue leaf = scheduler.getQueueManager().getQueue("level2.level3.leaf"); + assertNull("Queue 'leaf' should be null since its parent failed to create.", + leaf); + } }