diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index 7bd2616..f143aa6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -408,9 +408,8 @@ public void setAverageCapacity(int avgCapacity) { * Initialize a {@link FSQueue} with queue-specific properties and its * metrics. * @param queue the FSQueue needed to be initialized - * @param scheduler the scheduler which the queue belonged to */ - public void initFSQueue(FSQueue queue, FairScheduler scheduler){ + public void initFSQueue(FSQueue queue){ // Set queue-specific properties. String name = queue.getName(); queue.setWeights(getQueueWeight(name)); @@ -419,14 +418,6 @@ public void initFSQueue(FSQueue queue, FairScheduler scheduler){ queue.setMaxRunningApps(getQueueMaxApps(name)); queue.setMaxAMShare(getQueueMaxAMShare(name)); queue.setMaxChildQueueResource(getMaxChildResources(name)); - try { - SchedulingPolicy policy = getSchedulingPolicy(name); - policy.initialize(scheduler.getClusterResource()); - queue.setPolicy(policy); - } catch (AllocationConfigurationException ex) { - LOG.warn("Failed to set the scheduling policy " - + getDefaultSchedulingPolicy(), ex); - } // Set queue metrics. queue.getMetrics().setMinShare(getMinResources(name)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index f2e5086..9a129b4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -305,4 +305,24 @@ public void recoverContainer(Resource clusterResource, // TODO Auto-generated method stub } + + /** + * Recursively check policies for queues in pre-order. Get queue policies + * from the allocation file instead of properties of {@link FSQueue} objects. + * + * @param queueConf allocation configuration + * @throws AllocationConfigurationException if there is any policy violation + */ + public void checkPoliciesFromConf(AllocationConfiguration queueConf) + throws AllocationConfigurationException { + SchedulingPolicy policy = queueConf.getSchedulingPolicy(getName()); + + for (FSQueue child : getChildQueues()) { + policy.isChildPolicyAllowed( + queueConf.getSchedulingPolicy(child.getName())); + if (child instanceof FSParentQueue) { + ((FSParentQueue) child).checkPoliciesFromConf(queueConf); + } + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index ee4c35a..8b39133 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -104,7 +104,7 @@ public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { */ public void reinit(boolean recursive) { AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); - allocConf.initFSQueue(this, scheduler); + allocConf.initFSQueue(this); updatePreemptionVariables(); if (recursive) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 934bcfd..3374f16 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -45,7 +45,7 @@ /** * Maintains a list of queues as well as scheduling parameters for each queue, * such as guaranteed share allocations, from the fair scheduler config file. - * + * */ @Private @Unstable @@ -80,7 +80,7 @@ public void initialize(Configuration conf) throws IOException, // Recursively reinitialize to propagate queue properties rootQueue.reinit(true); } - + /** * Get a leaf queue by name, creating it if the create param is true and is necessary. * If the queue is not or can not be a leaf queue, i.e. it already exists as a @@ -272,6 +272,16 @@ private FSQueue createNewQueues(FSQueueType queueType, FSParentQueue newParent = null; String queueName = i.next(); + // Check if child policy is allowed + SchedulingPolicy policy = scheduler.getAllocationConfiguration(). + getSchedulingPolicy(queueName); + try { + parent.getPolicy().isChildPolicyAllowed(policy); + } catch (AllocationConfigurationException ex) { + LOG.error("Can't create queue '" + queueName + "'.", ex); + return null; + } + // Only create a leaf queue at the very end if (!i.hasNext() && (queueType != FSQueueType.PARENT)) { FSLeafQueue leafQueue = new FSLeafQueue(queueName, scheduler, parent); @@ -282,6 +292,13 @@ private FSQueue createNewQueues(FSQueueType queueType, queue = newParent; } + try { + policy.initialize(scheduler.getClusterResource()); + queue.setPolicy(policy); + } catch (AllocationConfigurationException ex) { + LOG.error("Failed to set the scheduling policy!", ex); + } + parent.addChildQueue(queue); setChildResourceLimits(parent, queue, queueConf); queues.put(queue.getName(), queue); @@ -479,6 +496,11 @@ private String ensureRootPrefix(String name) { public void updateAllocationConfiguration(AllocationConfiguration queueConf) { // Create leaf queues and the parent queues in a leaf's ancestry if they do not exist synchronized (queues) { + // Reinitialize scheduling policies for existing queues before creating + // any queue, since we need parent policies to determine if we can create + // child queues. + reinitSchedulingPolicies(queueConf); + for (String name : queueConf.getConfiguredQueues().get( FSQueueType.LEAF)) { if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) { @@ -513,4 +535,28 @@ boolean isQueueNameValid(String node) { return !node.isEmpty() && node.equals(CharMatcher.WHITESPACE.trimFrom(node)); } + + /** + * Reinitialize the scheduling policies of queues if there is no policy + * violation. + * + * @param queueConf allocation configuration + */ + public void reinitSchedulingPolicies(AllocationConfiguration queueConf) { + try { + // Recursively check new policies for existing queues + rootQueue.checkPoliciesFromConf(queueConf); + + // Set new policies for all queues + for (FSQueue queue: queues.values()) { + SchedulingPolicy policy = + queueConf.getSchedulingPolicy(queue.getName()); + policy.initialize(scheduler.getClusterResource()); + queue.setPolicy(policy); + } + } catch (AllocationConfigurationException ex) { + LOG.error("Reinitialize scheduling policies for existing queues failed!", + ex); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java index 9eda46c..d7972f9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java @@ -191,4 +191,13 @@ public abstract boolean checkIfUsageOverFairShare( public abstract Resource getHeadroom(Resource queueFairShare, Resource queueUsage, Resource maxAvailable); + /** + * Check whether the policy of a child queue are allowed. + * + * @param childPolicy the policy of child queue + */ + public boolean isChildPolicyAllowed(SchedulingPolicy childPolicy) + throws AllocationConfigurationException { + return true; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index d47ea07..1ddcdaf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfigurationException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; @@ -178,4 +179,15 @@ public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { public byte getApplicableDepth() { return SchedulingPolicy.DEPTH_ANY; } + + @Override + public boolean isChildPolicyAllowed(SchedulingPolicy childPolicy) + throws AllocationConfigurationException { + if (childPolicy instanceof DominantResourceFairnessPolicy) { + throw new AllocationConfigurationException("Queue policies cannot be " + + DominantResourceFairnessPolicy.NAME + " if the parent policy is " + + getName() + ". Please choose other polices for child queues instead."); + } + return true; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java index 3e2cb9f..6115029 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfigurationException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; @@ -132,4 +133,12 @@ public Resource getHeadroom(Resource queueFairShare, public byte getApplicableDepth() { return SchedulingPolicy.DEPTH_LEAF; } + + @Override + public boolean isChildPolicyAllowed(SchedulingPolicy childPolicy) + throws AllocationConfigurationException { + throw new AllocationConfigurationException(getName() + " policy is only for" + + " leaf queues. Please choose policies other than " + getName() + + " for parent queues."); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java index a5b2d86..f18249a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java @@ -188,7 +188,7 @@ private void setupStarvedCluster() throws IOException { out.println("0" + ""); out.println("fair"); - addChildQueue(out); + addChildQueue(out, "fair"); out.println(""); // DRF queue with fairshare preemption enabled @@ -198,9 +198,10 @@ private void setupStarvedCluster() throws IOException { out.println("0" + ""); out.println("drf"); - addChildQueue(out); + addChildQueue(out, "drf"); out.println(""); - + out.println("drf" + + ""); out.println(""); out.close(); @@ -227,13 +228,14 @@ private void setupStarvedCluster() throws IOException { assertEquals(8, scheduler.getSchedulerApp(app).getLiveContainers().size()); } - private void addChildQueue(PrintWriter out) { + private void addChildQueue(PrintWriter out, String policy) { // Child queue under fairshare with same settings out.println(""); out.println("1" + ""); out.println("0" + ""); + out.println("" + policy + ""); out.println(""); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index b1e412b..0ee7d0c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -109,6 +109,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ControlledClock; @@ -5096,4 +5097,178 @@ public void testUpdateDemand() throws IOException { Resources.equals(bQueue.getDemand(), maxResource)); } + @Test + public void testSchedulingPolicyViolation() throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("fair"); + out.println(" "); + out.println(" drf"); + out.println(" "); + out.println(" "); + out.println(" fair"); + out.println(" "); + out.println(""); + out.println("drf" + + ""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + + FSQueue child1 = scheduler.getQueueManager().getQueue("child1"); + assertNull("Queue 'child1' should be null since its policy isn't allowed to" + + " be 'drf' if its parent policy is 'fair'.", child1); + + // dynamic queue + FSQueue dynamicQueue = scheduler.getQueueManager(). + getLeafQueue("dynamicQueue", true); + assertNull("Dynamic queue should be null since it isn't allowed to be 'drf'" + + " policy if its parent policy is 'fair'.", dynamicQueue); + + // Set child1 to 'fair' and child2 to 'drf', the reload the allocation file. + out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("fair"); + out.println(" "); + out.println(" fair"); + out.println(" "); + out.println(" "); + out.println(" drf"); + out.println(" "); + out.println(""); + out.println("drf" + + ""); + out.println(""); + out.close(); + + scheduler.reinitialize(conf, resourceManager.getRMContext()); + child1 = scheduler.getQueueManager().getQueue("child1"); + assertNotNull("Queue 'child1' should be not null since its policy is " + + "allowed to be 'fair' if its parent policy is 'fair'.", child1); + + // Detect the policy violation of Child2, keep the original policy instead + // of setting the new policy. + FSQueue child2 = scheduler.getQueueManager().getQueue("child2"); + assertTrue("Queue 'child2' should be 'fair' since its new policy 'drf' " + + "is not allowed.", child2.getPolicy() instanceof FairSharePolicy); + } + + @Test + public void testSchedulingPolicyViolationInTheMiddleLevel() + throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("fair"); + out.println(" "); + out.println(" fair"); + out.println(" "); + out.println(" drf"); + out.println(" "); + out.println(" fair"); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + + FSQueue level2 = scheduler.getQueueManager().getQueue("level2"); + assertNotNull("Queue 'level2' shouldn't be null since its policy is allowed" + + " to be 'fair' if its parent policy is 'fair'.", level2); + FSQueue level3 = scheduler.getQueueManager().getQueue("level2.level3"); + assertNull("Queue 'level3' should be null since its policy isn't allowed" + + " to be 'drf' if its parent policy is 'fair'.", level3); + FSQueue leaf = scheduler.getQueueManager().getQueue("level2.level3.leaf"); + assertNull("Queue 'leaf' should be null since its parent failed to create.", + leaf); + } + + @Test + public void testPolicyReinitilization() throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("fair"); + out.println(" "); + out.println(" fair"); + out.println(" "); + out.println(" "); + out.println(" fair"); + out.println(" "); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + + // Set child1 to 'drf' which is not allowed, then reload the allocation file + out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("fair"); + out.println(" "); + out.println(" drf"); + out.println(" "); + out.println(" "); + out.println(" fifo"); + out.println(" "); + out.println(""); + out.println(""); + out.close(); + + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + FSQueue child1 = scheduler.getQueueManager().getQueue("child1"); + assertTrue("Queue 'child1' should still be 'fair' since 'drf' isn't allowed" + + " if its parent policy is 'fair'.", + child1.getPolicy() instanceof FairSharePolicy); + FSQueue child2 = scheduler.getQueueManager().getQueue("child2"); + assertTrue("Queue 'child2' should still be 'fair' there is a policy" + + " violation while reinitialization.", + child2.getPolicy() instanceof FairSharePolicy); + + // Set both child1 and root to 'drf', then reload the allocation file + out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("drf"); + out.println(" "); + out.println(" drf"); + out.println(" "); + out.println(" "); + out.println(" fifo"); + out.println(" "); + out.println(""); + out.println(""); + out.close(); + + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + child1 = scheduler.getQueueManager().getQueue("child1"); + assertTrue("Queue 'child1' should be 'drf' since both 'root' and 'child1'" + + " are 'drf'.", + child1.getPolicy() instanceof DominantResourceFairnessPolicy); + child2 = scheduler.getQueueManager().getQueue("child2"); + assertTrue("Queue 'child2' should still be 'fifo' there is no policy" + + " violation while reinitialization.", + child2.getPolicy() instanceof FifoPolicy); + } }