diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java index 056df57..a3bd3fc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java @@ -291,7 +291,7 @@ public QueuePlacementRule initialize(boolean create, } return super.initialize(create, args); } - + @Override public void initializeFromXml(Element el) throws AllocationConfigurationException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java index 1d77a43..943bf1a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.PriorityPolicy; @Public @Evolving @@ -59,12 +60,12 @@ public static SchedulingPolicy getInstance(Class cla /** * Returns {@link SchedulingPolicy} instance corresponding to the * {@link SchedulingPolicy} passed as a string. The policy can be "fair" for - * FairSharePolicy, "fifo" for FifoPolicy, or "drf" for - * DominantResourceFairnessPolicy. For a custom + * FairSharePolicy, "fifo" for FifoPolicy, "priority" for PriorityPolicy, or + * "drf" for DominantResourceFairnessPolicy. For a custom * {@link SchedulingPolicy}s in the RM classpath, the policy should be * canonical class name of the {@link SchedulingPolicy}. * - * @param policy canonical class name or "drf" or "fair" or "fifo" + * @param policy canonical class name or "drf" or "fair" or "fifo" or "priority" * @throws AllocationConfigurationException */ @SuppressWarnings("unchecked") @@ -77,6 +78,8 @@ public static SchedulingPolicy parse(String policy) clazz = FairSharePolicy.class; } else if (text.equalsIgnoreCase(FifoPolicy.NAME)) { clazz = FifoPolicy.class; + } else if (text.equalsIgnoreCase(PriorityPolicy.NAME)) { + clazz = PriorityPolicy.class; } else if (text.equalsIgnoreCase(DominantResourceFairnessPolicy.NAME)) { clazz = DominantResourceFairnessPolicy.class; } else { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java index d996944..ba93881 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java @@ -51,10 +51,7 @@ public String getName() { @Override public int compare(Schedulable s1, Schedulable s2) { - int res = s1.getPriority().compareTo(s2.getPriority()); - if (res == 0) { - res = (int) Math.signum(s1.getStartTime() - s2.getStartTime()); - } + int res = (int) Math.signum(s1.getStartTime() - s2.getStartTime()); if (res == 0) { // In the rare case where jobs were submitted at the exact same time, // compare them by name (which will be the JobID) to get a deterministic diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/PriorityPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/PriorityPolicy.java new file mode 100644 index 0000000..4948f7c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/PriorityPolicy.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Comparator; + +@Private +@Unstable +public class PriorityPolicy extends SchedulingPolicy { + @VisibleForTesting + public static final String NAME = "PRIORITY"; + private PriorityComparator comparator = new PriorityComparator(); + + @Override + public String getName() { + return NAME; + } + + static class PriorityComparator implements Comparator, + Serializable { + + @Override + public int compare(Schedulable s1, Schedulable s2) { + int res = s2.getPriority().compareTo(s1.getPriority()); + if (res == 0) { + res = (int) Math.signum(s1.getStartTime() - s2.getStartTime()); + } + if (res == 0) { + res = s1.getName().compareTo(s2.getName()); + } + return res; + } + } + + @Override + public Comparator getComparator() { + return comparator; + } + + @Override + public void computeShares(Collection schedulables, + Resource totalResources) { + if (schedulables.isEmpty()) { + return; + } + + Schedulable highest = null; + for (Schedulable schedulable : schedulables) { + if (highest == null || + comparator.compare(schedulable, highest) > 0) { + highest = schedulable; + } + } + highest.setFairShare(Resources.clone(totalResources)); + } + + @Override + public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { + throw new UnsupportedOperationException( + "PriorityPolicy doesn't support checkIfUsageOverFairshare operation, " + + "as PriorityPolicy only works for FSLeafQueue."); + } + + @Override + public byte getApplicableDepth() { + return SchedulingPolicy.DEPTH_LEAF; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index bda9564..ce80b1d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; @@ -91,12 +92,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.PriorityPolicy; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import org.xml.sax.SAXException; import com.google.common.collect.Sets; @@ -563,7 +566,7 @@ public void testAssignToQueue() throws Exception { RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW); RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW); - + FSLeafQueue queue1 = scheduler.assignToQueue(rmApp1, "default", "asterix"); FSLeafQueue queue2 = scheduler.assignToQueue(rmApp2, "notdefault", "obelix"); @@ -1031,6 +1034,7 @@ else if (p.getName().equals("root.queueB")) { /** * Make sure containers are chosen to be preempted in the correct order. */ + // TODO: add test for PriorityPolicy once YARN-2098 resolved public void testChoiceOfPreemptedContainers() throws Exception { conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000); conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); @@ -1589,6 +1593,44 @@ public void testFifoWithinQueue() throws Exception { assertEquals(1, app2.getLiveContainers().size()); } + @Test + public void testPriorityWithinQueue() throws Exception { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + RMNode node1 = + MockNodes + .newNodeInfo(1, Resources.createResource(5120, 5), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", + "user1", 1); + ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", + "user1", 1); + ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1", + "user1", 1); + ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1", + "user1", 1); + FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); + FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + FSSchedulerApp app3 = scheduler.getSchedulerApp(attId3); + FSSchedulerApp app4 = scheduler.getSchedulerApp(attId4); + Priority p1 = Priority.newInstance(1); + Priority p2 = Priority.newInstance(2); + Priority p3 = Priority.newInstance(3); + + FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", true); + queue1.setPolicy(new PriorityPolicy()); + scheduler.update(); + + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + + scheduler.handle(updateEvent); + + // TODO + // waiting on YARN-2098 + } + @Test(timeout = 3000) public void testMaxAssign() throws Exception { conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java index eeedb09..e3eba2b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.PriorityPolicy; import org.junit.Test; import org.mockito.Mockito; @@ -64,6 +65,11 @@ public void testParseSchedulingPolicy() sm = SchedulingPolicy.parse("fifo"); assertTrue("Invalid scheduler name", sm.getName().equals(FifoPolicy.NAME)); + + // Shortname - priority + sm = SchedulingPolicy.parse("priority"); + assertTrue("Invalid scheduler name", + sm.getName().equals(PriorityPolicy.NAME)); } /** @@ -81,12 +87,11 @@ public void testIsApplicableTo() throws AllocationConfigurationException { SchedulingPolicy policy = SchedulingPolicy.parse("fifo"); assertTrue(ERR, SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF)); - assertFalse(ERR, SchedulingPolicy.isApplicableTo( - SchedulingPolicy.parse("fifo"), SchedulingPolicy.DEPTH_INTERMEDIATE)); - assertFalse(ERR, SchedulingPolicy.isApplicableTo( - SchedulingPolicy.parse("fifo"), SchedulingPolicy.DEPTH_ROOT)); + assertFalse(ERR, SchedulingPolicy.isApplicableTo(policy, + SchedulingPolicy.DEPTH_INTERMEDIATE)); + assertFalse(ERR, + SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ROOT)); - // fair policy = SchedulingPolicy.parse("fair"); assertTrue(ERR, @@ -112,7 +117,16 @@ public void testIsApplicableTo() throws AllocationConfigurationException { SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT)); assertTrue(ERR, SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY)); - + + // priority + policy = SchedulingPolicy.parse("priority"); + assertTrue(ERR, + SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF)); + assertFalse(ERR, SchedulingPolicy.isApplicableTo(policy, + SchedulingPolicy.DEPTH_INTERMEDIATE)); + assertFalse(ERR, + SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ROOT)); + policy = Mockito.mock(SchedulingPolicy.class); Mockito.when(policy.getApplicableDepth()).thenReturn( SchedulingPolicy.DEPTH_PARENT); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestEmptyQueues.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestEmptyQueues.java index 4636c5b..08cc091 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestEmptyQueues.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestEmptyQueues.java @@ -54,4 +54,10 @@ public void testDRFPolicy() { testComputeShares( SchedulingPolicy.getInstance(DominantResourceFairnessPolicy.class)); } + + @Test (timeout = 1000) + public void testPriorityPolicy() { + testComputeShares( + SchedulingPolicy.getInstance(PriorityPolicy.class)); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm index 54daf2d..cc910cb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm @@ -49,8 +49,8 @@ Hadoop MapReduce Next Generation - Fair Scheduler queues based on the user name included with the request through configuration. Within each queue, a scheduling policy is used to share resources between the running apps. The default is memory-based fair sharing, - but FIFO and multi-resource with Dominant Resource Fairness can also be - configured. Queues can be arranged in a hierarchy to divide resources and + but FIFO, priority and multi-resource with Dominant Resource Fairness can also + be configured. Queues can be arranged in a hierarchy to divide resources and configured with weights to share the cluster in specific proportions. In addition to providing fair sharing, the Fair Scheduler allows assigning @@ -92,8 +92,8 @@ Hadoop MapReduce Next Generation - Fair Scheduler each queue to allow sharing the queue's resources in any which way the user wants. A custom policy can be built by extending <<>>. - FifoPolicy, FairSharePolicy (default), and DominantResourceFairnessPolicy are - built-in and can be readily used. + FifoPolicy, FairSharePolicy (default), DominantResourceFairnessPolicy and + PriorityPolicy are built-in and can be readily used. Certain add-ons are not yet supported which existed in the original (MR1) Fair Scheduler. Among them, is the use of a custom policies governing @@ -242,11 +242,13 @@ Allocation file format as many resources as a queue with the default weight. * schedulingPolicy: to set the scheduling policy of any queue. The allowed - values are "fifo"/"fair"/"drf" or any class that extends + values are "fifo"/"fair"/"drf"/"priority" or any class that extends <<>>. Defaults to "fair". If "fifo", apps with earlier submit times are given preference for containers, but apps submitted later may run concurrently if there is leftover space on the cluster after satisfying the earlier app's requests. + If "priority", apps with highest priorities are given preference for + containers. * aclSubmitApps: a list of users and/or groups that can submit apps to the queue. Refer to the ACLs section below for more info on the format of this