diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 1a3f804..abdd39b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -34,11 +34,14 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType; @@ -635,12 +638,36 @@ private void preemptAMContainers(Resource clusterResource, Resources.addTo(skippedAMSize, c.getContainer().getResource()); continue; } + // skip Labeled resource + if(isLabeledContainer(c)){ + continue; + } ret.add(c); Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource()); } return ret; } + + /** + * Checking if given container is a labeled container + * + * @param c + * @return true/false + */ + public boolean isLabeledContainer(RMContainer c) { + boolean isLabeledResource = false; + RMContext rmcontext = scheduler.getRMContext(); + RMNodeLabelsManager lm = rmcontext.getNodeLabelManager(); + Map> labels = lm.getNodeLabels(); + NodeId containerNode = c.getAllocatedNode(); + for (NodeId node : labels.keySet()) { + if (node.equals(containerNode)) { + isLabeledResource = true; + } + } + return isLabeledResource; + } /** * Compare by reversed priority order first, and then reversed containerId diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index ca67ef0..f10ae02 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -42,23 +42,31 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.NavigableSet; import java.util.Random; +import java.util.Set; import java.util.TreeSet; +import org.apache.commons.collections.map.HashedMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; @@ -85,11 +93,14 @@ int appAlloc = 0; boolean setAMContainer = false; + boolean setLabelContainer = false; float setAMResourcePercent = 0.0f; Random rand = null; Clock mClock = null; Configuration conf = null; CapacityScheduler mCS = null; + RMContext rmContext = null; + RMNodeLabelsManager lm = null; CapacitySchedulerConfiguration schedConf = null; EventHandler mDisp = null; ResourceCalculator rc = new DefaultResourceCalculator(); @@ -130,8 +141,12 @@ public void setup() { mClock = mock(Clock.class); mCS = mock(CapacityScheduler.class); when(mCS.getResourceCalculator()).thenReturn(rc); + lm = mock(RMNodeLabelsManager.class); schedConf = new CapacitySchedulerConfiguration(); when(mCS.getConfiguration()).thenReturn(schedConf); + rmContext = mock(RMContext.class); + when(mCS.getRMContext()).thenReturn(rmContext); + when(rmContext.getNodeLabelManager()).thenReturn(lm); mDisp = mock(EventHandler.class); rand = new Random(); long seed = rand.nextLong(); @@ -748,6 +763,49 @@ public void testSkipAMContainer() { } @Test + public void testSkipLabeledContainer() { + int[][] qData = new int[][] { + // / A B + { 100, 50, 50 }, // abs + { 100, 100, 100 }, // maxcap + { 100, 100, 0 }, // used + { 70, 20, 50 }, // pending + { 0, 0, 0 }, // reserved + { 5, 4, 1 }, // apps + { -1, 1, 1 }, // req granularity + { 2, 0, 0 }, // subqueues + }; + setAMContainer = true; + setLabelContainer = true; + Map> labels = new HashMap>(); + NodeId node = NodeId.newInstance("node1", 0); + Set labelSet = new HashSet(); + labelSet.add("x"); + labels.put(node, labelSet); + when(lm.getNodeLabels()).thenReturn(labels); + + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + + // By skipping AM Container and Labeled container, all other 23 containers + // of appD will be + // preempted + verify(mDisp, times(23)).handle(argThat(new IsPreemptionRequestFor(appD))); + + // By skipping AM Container and Labeled container, all other 23 containers + // of appC will be + // preempted + verify(mDisp, times(23)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // Since AM containers and Labeled containers of appC and appD are saved, 4 + // containers from appB + // has to be preempted. + verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB))); + setAMContainer = false; + setLabelContainer = false; + } + + @Test public void testPreemptSkippedAMContainers() { int[][] qData = new int[][] { // / A B @@ -974,7 +1032,10 @@ FiCaSchedulerApp mockApp(int qid, int id, int used, int pending, int reserved, for (int i = 0; i < used; i += gran) { if(setAMContainer && i == 0){ cLive.add(mockContainer(appAttId, cAlloc, unit, 0)); - }else{ + }else if(setLabelContainer && i ==1){ + cLive.add(mockContainer(appAttId, cAlloc, unit, 2)); + } + else{ cLive.add(mockContainer(appAttId, cAlloc, unit, 1)); } ++cAlloc; @@ -996,6 +1057,9 @@ RMContainer mockContainer(ApplicationAttemptId appAttId, int id, if(0 == priority){ when(mC.isAMContainer()).thenReturn(true); } + if(2 == priority){ + when(mC.getAllocatedNode()).thenReturn(NodeId.newInstance("node1", 0)); + } return mC; }