From eab02ee8cae415b51babb8a598ff04e464ab2055 Mon Sep 17 00:00:00 2001 From: ananyo Date: Tue, 9 Feb 2021 09:16:55 +0530 Subject: [PATCH] YARN-10617: Intra-queue preemption goes on indefinitely when apps are in pending state due to max AM limit reached --- .../FifoIntraQueuePreemptionPlugin.java | 4 ++ ...nalCapacityPreemptionPolicyIntraQueue.java | 35 ++++++++++ ...reemptionPolicyIntraQueueFairOrdering.java | 70 +++++++++++++++++++ .../mockframework/MockApplications.java | 22 +++++- 4 files changed, 130 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java index 7c3abb49254..559a1a21a4f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java @@ -384,6 +384,10 @@ private void getAlreadySelectedPreemptionCandidatesResource( // have an internal temp app structure to store intermediate data(priority) for (FiCaSchedulerApp app : apps) { + if( !tq.leafQueue.getOrderingPolicy() + .getSchedulableEntities().contains(app) ) { + continue; + } Resource used = app.getAppAttemptResourceUsage().getUsed(partition); Resource amUsed = null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java index 80779624274..22d1bebd760 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java @@ -1025,4 +1025,39 @@ public void testIntraQueuePreemptionAfterQueueDropped() new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(3)))); } + + @Test + public void testIntraQueuePreemptionNonSchedulablePendingApps() + throws IOException { + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fifo"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1 and app2, user1 in a + + "(1,1,n1,,1,false,0,user1);" + + "a\t" + + "(1,1,n1,,99,false,0,user2);" + + "a\t" // app3, user1 in a + + "(1,1,n1,,0,false,10,user3)" + "\t34" + "\tfalse;"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(eventHandler, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java index eb9d21836da..998c1bda587 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java @@ -274,4 +274,74 @@ public void testIntraQueuePreemptionFifoOrderingPolicyMultipleAppsPerUser() new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(1)))); } + + @Test + public void testIntraQueuePreemptionFairNonSchedulablePendingApps() + throws IOException { + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100:100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100:100 100:100 100:100 1:1 0]);" + // root + "-a(=[100:100 100:100 100:100 1:1 0])"; // a + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1 and app2, user1 in a + + "(1,1,n1,,1,false,0,user1);" + + "a\t" + + "(1,1,n1,,99,false,0,user1);" + + "a\t" // app3, user1 in a + + "(1,1,n1,,0,false,10,user1)" + "\t100" + "\tfalse;"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(eventHandler, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testIntraQueuePreemptionFairWithULNonSchedulablePendingApps() + throws IOException { + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100:100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100:100 100:100 100:100 1:1 0]);" + // root + "-a(=[100:100 100:100 100:100 1:1 0])"; // a + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1 and app2, user1 in a + + "(1,1,n1,,1,false,0,user1);" + + "a\t" + + "(1,1,n1,,99,false,0,user2);" + + "a\t" // app3, user1 in a + + "(1,1,n1,,0,false,10,user2)" + "\t34" + "\tfalse;"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(eventHandler, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplications.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplications.java index b16861257ff..74ba289d173 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplications.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplications.java @@ -23,12 +23,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -87,19 +89,37 @@ private void init() { String containersConfig = appConfigComponents[1]; MockApplication mockApp = new MockApplication(id, containersConfig, queueName); new MockContainers(mockApp, nameToCSQueues, nodeIdToSchedulerNodes); - add(mockApp); + if(appConfigComponents.length > 3 && appConfigComponents[3] != null) { + boolean schedulable = Boolean.parseBoolean(appConfigComponents[3]); + LOG.debug("For app: " + mockApp.appAttemptId + + " isSchedulable: " + schedulable); + add(mockApp, schedulable); + } else { + add(mockApp); + } id++; } setupUserResourceUsagePerLabel(resourceCalculator, mulp); } private void add(MockApplication mockApp) { + add(mockApp, true); + } + + private void add(MockApplication mockApp, boolean schedulable) { // add to LeafQueue LeafQueue queue = (LeafQueue) nameToCSQueues.get(mockApp.queueName); queue.getApplications().add(mockApp.app); queue.getAllApplications().add(mockApp.app); when(queue.getMinimumAllocation()).thenReturn(Resource.newInstance(1,1)); when(mockApp.app.getCSLeafQueue()).thenReturn(queue); + Collection schedulableApps = + queue.getOrderingPolicy().getSchedulableEntities(); + if(schedulable) { + schedulableApps.add(mockApp.app); + } + when(queue.getOrderingPolicy().getSchedulableEntities()) + .thenReturn(schedulableApps); LOG.debug("Application mock: queue: " + mockApp.queueName + ", appId:" + mockApp.app); -- 2.23.0