diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index c2735f15f19..ec09909142e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -21,6 +21,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -109,6 +110,10 @@ // We will check container from reverse order, so latter submitted // application's containers will be preemptionCandidates first. for (RMContainer c : rmContainers.descendingSet()) { + // Skip OPPORTUNISTIC containers + if (c.getExecutionType() == ExecutionType.OPPORTUNISTIC) { + continue; + } if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c, selectedCandidates)) { // Skip already selected containers @@ -253,6 +258,11 @@ private void preemptFrom(FiCaSchedulerApp app, return; } + // Skip OPPORTUNISTIC containers + if (c.getExecutionType() == ExecutionType.OPPORTUNISTIC) { + continue; + } + if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c, selectedContainers)) { continue; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java index c52fd957c4c..78ee1be08fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java @@ -21,6 +21,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy; @@ -41,6 +42,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** * Identifies over utilized resources within a queue and tries to normalize @@ -218,7 +220,9 @@ private void preemptFromLeastStarvedApp(LeafQueue leafQueue, // ToDo: Reuse reservation selector here. - List liveContainers = new ArrayList<>(app.getLiveContainers()); + List liveContainers = app.getLiveContainers().stream() + .filter(c -> c.getExecutionType() == ExecutionType.GUARANTEED) + .collect(Collectors.toList()); sortContainers(liveContainers); if (LOG.isDebugEnabled()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java index 78a99881d67..db25d67ea58 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -42,6 +43,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; public class QueuePriorityContainerCandidateSelector extends PreemptionCandidatesSelector { @@ -216,7 +218,9 @@ private boolean canPreemptEnoughResourceForAsked(Resource requiredResource, // On each host, simply check if we could preempt containers from // lower-prioritized queues or not - List runningContainers = node.getRunningContainers(); + List runningContainers = node.getRunningContainers().stream() + .filter(c -> c.getExecutionType() == ExecutionType.GUARANTEED) + .collect(Collectors.toList()); Collections.sort(runningContainers, CONTAINER_CREATION_TIME_COMPARATOR); // First of all, consider already selected containers diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java index bdb7e8c4b47..b9e8a839161 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -34,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; public class ReservedContainerCandidatesSelector extends PreemptionCandidatesSelector { @@ -175,7 +177,9 @@ private NodeForPreemption getPreemptionCandidatesOnNode( Resource available = Resources.clone(node.getUnallocatedResource()); Resource totalSelected = Resources.createResource(0); List sortedRunningContainers = - node.getCopiedListOfRunningContainers(); + node.getCopiedListOfRunningContainers().stream() + .filter(c -> c.getExecutionType() == ExecutionType.GUARANTEED) + .collect(Collectors.toList()); List selectedContainers = new ArrayList<>(); Map killableContainers = node.getKillableContainers(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index fa66cbc1632..cc137a9d89f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -252,7 +253,16 @@ private void mockContainers(String containersConfig, FiCaSchedulerApp app, NodeId host = NodeId.newInstance(values[2], 1); String label = values[3]; String userName = "user"; - int repeat = Integer.valueOf(values[4]); + int repeat; + int repeatForOpportIndex; + if (values[4].contains(":")) { + String[] repeats = values[4].split(":"); + repeatForOpportIndex = Integer.valueOf(repeats[0]); + repeat = repeatForOpportIndex + Integer.valueOf(repeats[1]); + } else { + repeatForOpportIndex = Integer.valueOf(values[4]); + repeat = repeatForOpportIndex; + } boolean reserved = Boolean.valueOf(values[5]); if (values.length >= 7) { Resources.addTo(pending, parseResourceFromString(values[6])); @@ -262,8 +272,13 @@ private void mockContainers(String containersConfig, FiCaSchedulerApp app, } for (int i = 0; i < repeat; i++) { + ExecutionType executionType = i >= repeatForOpportIndex ? + ExecutionType.OPPORTUNISTIC : + ExecutionType.GUARANTEED; Container c = mock(Container.class); - Resources.addTo(used, res); + if (executionType == ExecutionType.GUARANTEED) { + Resources.addTo(used, res); + } when(c.getResource()).thenReturn(res); when(c.getPriority()).thenReturn(pri); SchedulerRequestKey sk = SchedulerRequestKey.extractFrom(c); @@ -275,6 +290,7 @@ private void mockContainers(String containersConfig, FiCaSchedulerApp app, when(rmc.getContainer()).thenReturn(c); when(rmc.getApplicationAttemptId()).thenReturn(attemptId); when(rmc.getQueueName()).thenReturn(queueName); + when(rmc.getExecutionType()).thenReturn(executionType); final ContainerId cId = ContainerId.newContainerId(attemptId, containerId); when(rmc.getContainerId()).thenReturn(cId); @@ -318,7 +334,7 @@ public Integer answer(InvocationOnMock invocation) throws Throwable { } LOG.debug("add container to app=" + attemptId + " res=" + res + " node=" + host + " nodeLabelExpression=" + label + " partition=" - + partition); + + partition + " executionType=" + executionType); containerId++; } @@ -487,8 +503,10 @@ private void addContainerToSchedulerNode(NodeId nodeId, RMContainer container, when(node.getReservedContainer()).thenReturn(container); } else { node.getCopiedListOfRunningContainers().add(container); - Resources.subtractFrom(node.getUnallocatedResource(), - container.getAllocatedResource()); + if (container.getExecutionType() == ExecutionType.GUARANTEED) { + Resources.subtractFrom(node.getUnallocatedResource(), + container.getAllocatedResource()); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 22a241f6fec..5869c7b65e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -1072,6 +1073,7 @@ public void testRefreshPreemptionProperties() throws Exception { extends ArgumentMatcher { private final ApplicationAttemptId appAttId; private final SchedulerEventType type; + private ExecutionType executionType = null; IsPreemptionRequestFor(ApplicationAttemptId appAttId) { this(appAttId, MARK_CONTAINER_FOR_PREEMPTION); } @@ -1080,10 +1082,18 @@ public void testRefreshPreemptionProperties() throws Exception { this.appAttId = appAttId; this.type = type; } + IsPreemptionRequestFor(ApplicationAttemptId appAttId, + ExecutionType executionType) { + this(appAttId, MARK_CONTAINER_FOR_PREEMPTION); + this.executionType = executionType; + } @Override public boolean matches(Object o) { return appAttId.equals(((ContainerPreemptEvent)o).getAppId()) - && type.equals(((ContainerPreemptEvent)o).getType()); + && type.equals(((ContainerPreemptEvent)o).getType()) + && ( executionType == null + || executionType == ((ContainerPreemptEvent) o).getContainer() + .getExecutionType()); } @Override public String toString() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyWithOpportContainers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyWithOpportContainers.java new file mode 100644 index 00000000000..e984732f8b4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyWithOpportContainers.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class TestProportionalCapacityPreemptionPolicyWithOpportContainers + extends ProportionalCapacityPreemptionPolicyMockFramework { + + @Before + public void setup() { + super.setup(); + conf.setBoolean( + CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true); + conf.setBoolean(CapacitySchedulerConfiguration. + PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS, true); + conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY, + "priority_first"); + policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock); + } + + @Test + public void testInterQueuePreemptionWithOpportContainers() + throws Exception { + /** + * Queue structure is: + * + *
+     *           root
+     *           /  \
+     *          a    b
+     * 
+ * + */ + + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[50 100 40 30]);" + // a + "-b(=[50 100 60 40])"; // b + + String appsConfig = + //queueName\t(priority,resource,host,expression, + // #repeat[:#repeatForOpport],reserved) + "a\t(1,2,n1,,20,false);" + // app1 in a + "b\t(1,2,n1,,30,false);" + // app2 in b + "b\t(1,2,n1,,0:30,false)"; // app3 in b + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, false); + policy.editSchedule(); + + // Inter-queue preemption should happen in queue b, + // 5 containers in app2 should be preempted for queue a + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(5)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2), ExecutionType.GUARANTEED))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testIntraQueuePreemptionWithOpportContainers() + throws IOException { + /** + * Queue structure is: + * + *
+     *           root
+     *           /  \
+     *          a    b
+     * 
+ * + */ + + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[50 100 50 30]);" + // a + "-b(=[50 50 50 20])"; // b + + String appsConfig = + //queueName\t(priority,resource,host,expression, + // #repeat[:#repeatForOpport],reserved,pending) + "a\t(1,2,n1,,20,false);" + // app1 in a + "b\t(1,2,n1,,25:15,false);" + // app2 in b + "b\t(2,2,n1,,0,false,10)"; // app3 in b + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, false); + policy.editSchedule(); + + // Intra-queue preemption should happen in Queue b, + // 5 containers in app2 should be preempted for app3 with higher priority + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(5)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2), ExecutionType.GUARANTEED))); + } + + @Test + public void testReservedContainerPreemptionWithOpportContainers() + throws IOException { + /** + * The simplest test of reserved container, Queue structure is: + * + *
+     *       root
+     *       /  \
+     *      a    b
+     * 
+ * Guaranteed resource of a/b are 50:50 + * Total cluster resource = 100 + * - A has 90 G and 90 O containers on two node, + * n1 has 45 G and 45 O, n2 has 45 G and 45 O, size of each + * container is 1. + * - B has am container at n1, and reserves 1 container with size = 9 at n1, + * so B needs to preempt 9 G containers from A at n1 instead of randomly + * preempt from n1 and n2. + */ + String labelsConfig = + "=100,true;"; + String nodesConfig = // n1 / n2 has no label + "n1= res=50;" + + "n2= res=50"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 9 9]);" + //root + "-a(=[50 100 90 0]);" + // a + "-b(=[50 100 10 9 9])"; // b + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,1,n1,,45:45,false)" // 45 G and 45 O in n1 + + "(1,1,n2,,45:45,false);" + // 45 and 45 O in n2 + "b\t" // app2 in b + + "(1,1,n1,,1,false)" // AM container in n1 + + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // Total 5 GUARANTEED containers preempted from app1 at n1, + // don't preempt container from other app/node + verify(mDisp, times(5)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1), ExecutionType.GUARANTEED))); + verify(mDisp, times(5)).handle( + argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n1", 1)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } +} \ No newline at end of file