diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java index 1776bd4..6173d69 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java @@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAFairOrderingComparator; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -41,6 +42,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -263,8 +266,17 @@ private void calculateToBePreemptedResourcePerApp(Resource clusterResource, Resource queueReassignableResource, PriorityQueue orderedByPriority) { - Comparator reverseComp = Collections - .reverseOrder(new TAPriorityComparator()); + Comparator reverseComp; + OrderingPolicy queueOrderingPolicy = + tq.leafQueue.getOrderingPolicy(); + if (queueOrderingPolicy instanceof FairOrderingPolicy + && (context.getIntraQueuePreemptionOrderPolicy() + == IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST)) { + reverseComp = Collections.reverseOrder( + new TAFairOrderingComparator(this.rc, clusterResource)); + } else { + reverseComp = Collections.reverseOrder(new TAPriorityComparator()); + } TreeSet orderedApps = new TreeSet<>(reverseComp); String partition = tq.partition; @@ -355,7 +367,14 @@ private void getAlreadySelectedPreemptionCandidatesResource( TempQueuePerPartition tq, Collection apps, Resource clusterResource, Map perUserAMUsed) { - TAPriorityComparator taComparator = new TAPriorityComparator(); + Comparator taComparator; + OrderingPolicy orderingPolicy = + tq.leafQueue.getOrderingPolicy(); + if (orderingPolicy instanceof FairOrderingPolicy) { + taComparator = new TAFairOrderingComparator(this.rc, clusterResource); + } else { + taComparator = new TAPriorityComparator(); + } PriorityQueue orderedByPriority = new PriorityQueue<>( 100, taComparator); @@ -393,13 +412,12 @@ private void getAlreadySelectedPreemptionCandidatesResource( // Set ideal allocation of app as 0. tmpApp.idealAssigned = Resources.createResource(0, 0); - orderedByPriority.add(tmpApp); - // Create a TempUserPerPartition structure to hold more information // regarding each user's entities such as UserLimit etc. This could // be kept in a user to TempUserPerPartition map for further reference. String userName = app.getUser(); - if (!usersPerPartition.containsKey(userName)) { + TempUserPerPartition tmpUser = usersPerPartition.get(userName); + if (tmpUser == null) { ResourceUsage userResourceUsage = tq.leafQueue.getUser(userName) .getResourceUsage(); @@ -409,7 +427,7 @@ private void getAlreadySelectedPreemptionCandidatesResource( amUsed = (userSpecificAmUsed == null) ? Resources.none() : userSpecificAmUsed; - TempUserPerPartition tmpUser = new TempUserPerPartition( + tmpUser = new TempUserPerPartition( tq.leafQueue.getUser(userName), tq.queueName, Resources.clone(userResourceUsage.getUsed(partition)), Resources.clone(amUsed), @@ -432,7 +450,10 @@ private void getAlreadySelectedPreemptionCandidatesResource( tmpUser.idealAssigned = Resources.createResource(0, 0); tq.addUserPerPartition(userName, tmpUser); } + tmpApp.setTempUserPerPartition(tmpUser); + orderedByPriority.add(tmpApp); } + return orderedByPriority; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java index 5b6932e..c77ec94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import java.io.Serializable; @@ -64,6 +65,36 @@ public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) { } } + /* + * Order first by amount used from least to most. Then order from oldest to + * youngest if amount used is the same. + */ + static class TAFairOrderingComparator + implements Comparator { + private ResourceCalculator rc; + private Resource clusterRes; + + TAFairOrderingComparator(ResourceCalculator rc, Resource clusterRes) { + this.rc = rc; + this.clusterRes = clusterRes; + } + + @Override + public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) { + Resource used1 = ta1.getTempUserPerPartition().getUsedDeductAM(); + Resource used2 = ta2.getTempUserPerPartition().getUsedDeductAM(); + + if (Resources.equals(used1, used2)) { + return ta1.getApplicationId().compareTo(ta2.getApplicationId()); + } + if (Resources.lessThan(rc, clusterRes, used1, used2)) { + return -1; + } else { + return 1; + } + } + } + IntraQueuePreemptionComputePlugin fifoPreemptionComputePlugin = null; final CapacitySchedulerPreemptionContext context; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java index e9a934b..05d8096 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java @@ -34,6 +34,7 @@ // Following fields are settled and used by candidate selection policies private final int priority; private final ApplicationId applicationId; + private TempUserPerPartition tempUser; FiCaSchedulerApp app; @@ -102,4 +103,12 @@ public void deductActuallyToBePreempted(ResourceCalculator resourceCalculator, Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct); } } + + public void setTempUserPerPartition(TempUserPerPartition tu) { + tempUser = tu; + } + + public TempUserPerPartition getTempUserPerPartition() { + return tempUser; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index a8e2697..67d0eb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -606,7 +607,11 @@ public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { }); when(leafQueue.getApplications()).thenReturn(apps); when(leafQueue.getAllApplications()).thenReturn(apps); - OrderingPolicy so = mock(OrderingPolicy.class); + String opName = conf.get(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + "." + getQueueName(q) + + ".ordering-policy", "fifo"); + OrderingPolicy so = opName.equals("fair") + ? mock(FairOrderingPolicy.class) : mock(OrderingPolicy.class); when(so.getPreemptionIterator()).thenAnswer(new Answer() { public Object answer(InvocationOnMock invocation) { return apps.descendingIterator(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java new file mode 100644 index 0000000..935d6cb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.junit.Before; +import org.junit.Test; + +/* + * Test class for testing intra-queue preemption when the fair ordering policy + * is enabled on a capacity queue. + */ +public class TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering + extends ProportionalCapacityPreemptionPolicyMockFramework { + @Before + public void setup() { + super.setup(); + conf.setBoolean( + CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true); + policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock); + } + + /* + * When the capacity scheduler fair ordering policy is enabled, preempt first + * from the application owned by the user that is the farthest over their + * user limit. + */ + @Test + public void testIntraQueuePreemptionFairOrderingPolicyEnabledOneAppPerUser() + throws IOException { + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + + // user1/app1 has 60 resources in queue a + // user2/app2 has 40 resources in queue a + // user3/app3 is requesting 20 resources in queue a + // With 3 users, preemptable user limit should be around 35 resources each. + // With FairOrderingPolicy enabled on queue a, all 20 resources should be + // preempted from app1 + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1, user1 in a + + "(1,1,n1,,60,false,0,user1);" + + "a\t" // app2, user2 in a + + "(1,1,n1,,40,false,0,user2);" + + "a\t" // app3, user3 in a + + "(1,1,n1,,0,false,20,user3)" + ; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(mDisp, times(20)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + + /* + * When the capacity scheduler fifo ordering policy is enabled, preempt first + * from the youngest application until reduced to user limit, then preempt + * from next youngest app. + */ + @Test + public void testIntraQueuePreemptionFifoOrderingPolicyEnabled() + throws IOException { + // Enable FifoOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fifo"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + + // user1/app1 has 60 resources in queue a + // user2/app2 has 40 resources in queue a + // user3/app3 is requesting 20 resources in queue a + // With 3 users, preemptable user limit should be around 35 resources each. + // With FifoOrderingPolicy enabled on queue a, the first 5 should come from + // the youngest app, app2, until app2 is reduced to the user limit of 35. + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1, user1 in a + + "(1,1,n1,,60,false,0,user1);" + + "a\t" // app2, user2 in a + + "(1,1,n1,,40,false,0,user2);" + + "a\t" // app3, user3 in a + + "(1,1,n1,,0,false,5,user3)" + ; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(mDisp, times(5)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + + // user1/app1 has 60 resources in queue a + // user2/app2 has 35 resources in queue a + // user3/app3 has 5 resources and is requesting 15 resources in queue a + // With 3 users, preemptable user limit should be around 35 resources each. + // The next 15 should come from app1 even though app2 is younger since app2 + // has already been reduced to its user limit. + appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1, user1 in a + + "(1,1,n1,,60,false,0,user1);" + + "a\t" // app2, user2 in a + + "(1,1,n1,,35,false,0,user2);" + + "a\t" // app3, user3 in a + + "(1,1,n1,,0,false,15,user3)" + ; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(mDisp, times(15)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + + /* + * When the capacity scheduler fair ordering policy is enabled, preempt first + * from the youngest application from the user that is the farthest over their + * user limit. + */ + @Test + public void testIntraQueuePreemptionFairOrderingPolicyMulitipleAppsPerUser() + throws IOException { + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + + // user1/app1 has 60 resources in queue a + // user2/app2 has 40 resources in queue a + // user3/app3 is requesting 20 resources in queue a + // With 3 users, preemptable user limit should be around 35 resources each. + // With FairOrderingPolicy enabled on queue a, all 20 resources should be + // preempted from app1 + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1 and app2, user1 in a + + "(1,1,n1,,35,false,0,user1);" + + "a\t" + + "(1,1,n1,,25,false,0,user1);" + + "a\t" // app2, user2 in a + + "(1,1,n1,,40,false,0,user2);" + + "a\t" // app4, user3 in a + + "(1,1,n1,,0,false,20,user3)" + ; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(mDisp, times(20)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + /* + * When the capacity scheduler fifo ordering policy is enabled and a user has + * multiple apps, preempt first from the youngest application. + */ + @Test + public void testIntraQueuePreemptionFifoOrderingPolicyMultipleAppsPerUser() + throws IOException { + // Enable FifoOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fifo"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + + // user1/app1 has 40 resources in queue a + // user1/app2 has 20 resources in queue a + // user3/app3 has 40 resources in queue a + // user4/app4 is requesting 20 resources in queue a + // With 3 users, preemptable user limit should be around 35 resources each. + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1, user1 in a + + "(1,1,n1,,40,false,0,user1);" + + "a\t" // app2, user1 in a + + "(1,1,n1,,20,false,0,user1);" + + "a\t" // app3, user3 in a + + "(1,1,n1,,40,false,0,user3);" + + "a\t" // app4, user4 in a + + "(1,1,n1,,0,false,25,user4)" + ; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app3 is the younges and also over its user limit. 5 should be preempted + // from app3 until it comes down to user3's user limit. + verify(mDisp, times(5)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + + // User1's app2 is its youngest. 19 should be preempted from app2, leaving + // only the AM + verify(mDisp, times(19)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + + // Preempt the remaining resource from User1's oldest app1. + verify(mDisp, times(1)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } +}