diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index d707c85..e22c648 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -598,12 +598,10 @@ boolean canContainerBePreempted(RMContainer container) { // Check if the app's allocation will be over its fairshare even // after preempting this container - Resource currentUsage = getResourceUsage(); - Resource fairshare = getFairShare(); - Resource overFairShareBy = Resources.subtract(currentUsage, fairshare); - - return (Resources.fitsIn(container.getAllocatedResource(), - overFairShareBy)); + Resource usageAfterPreemption = Resources.subtract( + getResourceUsage(), container.getAllocatedResource()); + return !fsQueue.getPolicy().checkIfUsageUnderFairShare( + usageAfterPreemption, getFairShare()); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java index 9eda46c..7b9ec40 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java @@ -25,9 +25,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; - - import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; import java.util.Collection; import java.util.Comparator; @@ -36,8 +35,9 @@ @Public @Evolving public abstract class SchedulingPolicy { - private static final ConcurrentHashMap, SchedulingPolicy> instances = - new ConcurrentHashMap, SchedulingPolicy>(); + private static final + ConcurrentHashMap, SchedulingPolicy> + instances = new ConcurrentHashMap<>(); public static final SchedulingPolicy DEFAULT_POLICY = getInstance(FairSharePolicy.class); @@ -48,10 +48,14 @@ public static final byte DEPTH_PARENT = (byte) 6; // Root and Intermediate public static final byte DEPTH_ANY = (byte) 7; + private Resource clusterCapacity; + /** - * Returns a {@link SchedulingPolicy} instance corresponding to the passed clazz + * Returns a {@link SchedulingPolicy} instance corresponding to the passed + * class */ - public static SchedulingPolicy getInstance(Class clazz) { + public static SchedulingPolicy getInstance( + Class clazz) { SchedulingPolicy policy = ReflectionUtils.newInstance(clazz, null); SchedulingPolicy policyRet = instances.putIfAbsent(clazz, policy); if(policyRet != null) { @@ -98,7 +102,9 @@ public static SchedulingPolicy parse(String policy) return getInstance(clazz); } - public void initialize(Resource clusterCapacity) {} + public void initialize(Resource clusterCapacity) { + this.clusterCapacity = Resources.clone(clusterCapacity); + } /** * The {@link ResourceCalculator} returned by this method should be used @@ -174,8 +180,24 @@ public abstract void computeSteadyShares( * @param fairShare {@link Resource} the fair share * @return true if check passes (is over) or false otherwise */ - public abstract boolean checkIfUsageOverFairShare( - Resource usage, Resource fairShare); + public final boolean checkIfUsageOverFairShare( + Resource usage, Resource fairShare) { + return Resources.greaterThan( + getResourceCalculator(), clusterCapacity, usage, fairShare); + } + + /** + * Check if the resource usage is under the fair share under this policy + * + * @param usage {@link Resource} the resource usage + * @param fairShare {@link Resource} the fair share + * @return true if check passes (is under) or false otherwise + */ + public final boolean checkIfUsageUnderFairShare( + Resource usage, Resource fairShare) { + return Resources.lessThan( + getResourceCalculator(), clusterCapacity, usage, fairShare); + } /** * Get headroom by calculating the min of clusterAvailable and @@ -191,4 +213,7 @@ public abstract boolean checkIfUsageOverFairShare( public abstract Resource getHeadroom(Resource queueFairShare, Resource queueUsage, Resource maxAvailable); + protected Resource getClusterCapacity() { + return this.clusterCapacity; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index ad41b11..dbcde24 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -47,7 +47,7 @@ public static final String NAME = "DRF"; - private static final DominantResourceFairnessComparator COMPARATOR = + private final DominantResourceFairnessComparator COMPARATOR = new DominantResourceFairnessComparator(); private static final DominantResourceCalculator CALCULATOR = new DominantResourceCalculator(); @@ -63,7 +63,7 @@ public byte getApplicableDepth() { } @Override - public Comparator getComparator() { + public DominantResourceFairnessComparator getComparator() { return COMPARATOR; } @@ -89,11 +89,6 @@ public void computeSteadyShares(Collection queues, } @Override - public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { - return !Resources.fitsIn(usage, fairShare); - } - - @Override public Resource getHeadroom(Resource queueFairShare, Resource queueUsage, Resource maxAvailable) { long queueAvailableMemory = @@ -108,22 +103,13 @@ public Resource getHeadroom(Resource queueFairShare, Resource queueUsage, return headroom; } - @Override - public void initialize(Resource clusterCapacity) { - COMPARATOR.setClusterCapacity(clusterCapacity); - } - - public static class DominantResourceFairnessComparator implements Comparator { - private static final int NUM_RESOURCES = ResourceType.values().length; - - private Resource clusterCapacity; - - public void setClusterCapacity(Resource clusterCapacity) { - this.clusterCapacity = clusterCapacity; - } + public class DominantResourceFairnessComparator + implements Comparator { + private final int NUM_RESOURCES = ResourceType.values().length; @Override public int compare(Schedulable s1, Schedulable s2) { + Resource clusterCapacity = getClusterCapacity(); ResourceWeights sharesOfCluster1 = new ResourceWeights(); ResourceWeights sharesOfCluster2 = new ResourceWeights(); ResourceWeights sharesOfMinShare1 = new ResourceWeights(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index d47ea07..e5c541e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -170,11 +170,6 @@ public void computeSteadyShares(Collection queues, } @Override - public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { - return Resources.greaterThan(RESOURCE_CALCULATOR, null, usage, fairShare); - } - - @Override public byte getApplicableDepth() { return SchedulingPolicy.DEPTH_ANY; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java index 3e2cb9f..e086cae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java @@ -110,13 +110,6 @@ public void computeSteadyShares(Collection queues, } @Override - public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { - throw new UnsupportedOperationException( - "FifoPolicy doesn't support checkIfUsageOverFairshare operation, " + - "as FifoPolicy only works for FSLeafQueue."); - } - - @Override public Resource getHeadroom(Resource queueFairShare, Resource queueUsage, Resource maxAvailable) { long queueAvailableMemory = Math.max( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index a4d69bf..0953cfd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -57,6 +57,7 @@ private static final int NODE_CAPACITY_MULTIPLE = 4; private final boolean fairsharePreemption; + private final boolean drf; // App that takes up the entire cluster private FSAppAttempt greedyApp; @@ -67,13 +68,17 @@ @Parameterized.Parameters(name = "{0}") public static Collection getParameters() { return Arrays.asList(new Object[][] { - {"FairSharePreemption", true}, - {"MinSharePreemption", false}}); + {"MinSharePreemption", 0}, + {"MinSharePreemptionWithDRF", 1}, + {"FairSharePreemption", 2}, + {"FairSharePreemptionWithDRF", 3} + }); } - public TestFairSchedulerPreemption(String name, boolean fairshare) + public TestFairSchedulerPreemption(String name, int mode) throws IOException { - fairsharePreemption = fairshare; + fairsharePreemption = (mode > 0); + drf = (mode % 2 == 1); writeAllocFile(); } @@ -146,6 +151,10 @@ private void writeAllocFile() throws IOException { out.println(""); // end of nonpreemptable queue + if (drf) { + out.println("drf" + + ""); + } out.println(""); out.close(); @@ -178,8 +187,12 @@ private void setupCluster() throws IOException { resourceManager.start(); // Create and add two nodes to the cluster - addNode(NODE_CAPACITY_MULTIPLE * GB, NODE_CAPACITY_MULTIPLE); - addNode(NODE_CAPACITY_MULTIPLE * GB, NODE_CAPACITY_MULTIPLE); + addNode(NODE_CAPACITY_MULTIPLE * GB, 3 * NODE_CAPACITY_MULTIPLE); + addNode(NODE_CAPACITY_MULTIPLE * GB, 3 * NODE_CAPACITY_MULTIPLE); + + // Reinitialize the scheduler so DRF policy picks up cluster capacity + // TODO (YARN-6194): One shouldn't need to call this + scheduler.reinitialize(conf, resourceManager.getRMContext()); // Verify if child-1 and child-2 are preemptable FSQueue child1 = @@ -265,12 +278,14 @@ private void verifyPreemption() throws InterruptedException { } // Verify the right amount of containers are preempted from greedyApp - assertEquals(4, greedyApp.getLiveContainers().size()); + assertEquals("Incorrect number of containers on the greedy app", + 4, greedyApp.getLiveContainers().size()); sendEnoughNodeUpdatesToAssignFully(); // Verify the preempted containers are assigned to starvingApp - assertEquals(2, starvingApp.getLiveContainers().size()); + assertEquals("Starved app is not assigned the right number of containers", + 2, starvingApp.getLiveContainers().size()); } private void verifyNoPreemption() throws InterruptedException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java index a5c20c1..118f000 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java @@ -150,8 +150,10 @@ public void testCalculateShares() { Resource capacity = Resources.createResource(100, 10); ResourceType[] resourceOrder = new ResourceType[2]; ResourceWeights shares = new ResourceWeights(); - DominantResourceFairnessPolicy.DominantResourceFairnessComparator comparator = - new DominantResourceFairnessPolicy.DominantResourceFairnessComparator(); + DominantResourceFairnessPolicy drfPolicy = + new DominantResourceFairnessPolicy(); + DominantResourceFairnessPolicy.DominantResourceFairnessComparator + comparator = drfPolicy.getComparator(); comparator.calculateShares(used, capacity, shares, resourceOrder, ResourceWeights.NEUTRAL);