diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 1a3f804..17fd870 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -30,15 +30,19 @@ import java.util.PriorityQueue; import java.util.Set; +import org.apache.commons.collections.map.HashedMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType; @@ -129,6 +133,7 @@ private float percentageClusterPreemptionAllowed; private double naturalTerminationFactor; private boolean observeOnly; + private Map> labels; public ProportionalCapacityPreemptionPolicy() { clock = new SystemClock(); @@ -168,6 +173,7 @@ public void init(Configuration config, config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1); observeOnly = config.getBoolean(OBSERVE_ONLY, false); rc = scheduler.getResourceCalculator(); + labels = null; } @VisibleForTesting @@ -176,14 +182,48 @@ public ResourceCalculator getResourceCalculator() { } @Override - public void editSchedule(){ + public void editSchedule() { CSQueue root = scheduler.getRootQueue(); - Resource clusterResources = - Resources.clone(scheduler.getClusterResource()); + Resource clusterResources = Resources.clone(scheduler.getClusterResource()); + clusterResources = getNonLabeledResources(clusterResources); + setNodeLabels(scheduler.getRMContext().getNodeLabelManager() + .getNodeLabels()); containerBasedPreemptOrKill(root, clusterResources); } /** + * Setting Node Labels + * + * @param nodelabels + */ + public void setNodeLabels(Map> nodelabels) { + labels = nodelabels; + } + + /** + * Getting the Node labels + * + * @return NodeLabels + */ + public Map> getNodeLabels() { + return labels; + } + + /** + * This method returns all non labeled resources. + * + * @param clusterResources + * @return Resources + */ + private Resource getNonLabeledResources(Resource clusterResources) { + RMContext rmcontext = scheduler.getRMContext(); + RMNodeLabelsManager lm = rmcontext.getNodeLabelManager(); + Resource res = lm.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, + clusterResources); + return res == null ? clusterResources : res; + } + + /** * This method selects and tracks containers to be preempted. If a container * is in the target list for more than maxWaitTime it is killed. * @@ -593,7 +633,7 @@ private void preemptAMContainers(Resource clusterResource, * @param app * @param clusterResource * @param rsrcPreempt - * @return + * @return Set Set of RMContainers */ private Set preemptFrom(FiCaSchedulerApp app, Resource clusterResource, Resource rsrcPreempt, @@ -635,12 +675,26 @@ private void preemptAMContainers(Resource clusterResource, Resources.addTo(skippedAMSize, c.getContainer().getResource()); continue; } + // skip Labeled resource + if(isLabeledContainer(c)){ + continue; + } ret.add(c); Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource()); } return ret; } + + /** + * Checking if given container is a labeled container + * + * @param c + * @return true/false + */ + private boolean isLabeledContainer(RMContainer c) { + return labels.containsKey(c.getAllocatedNode()); + } /** * Compare by reversed priority order first, and then reversed containerId diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index ca67ef0..cb38794 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -38,27 +38,37 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import java.util.ArrayList; import java.util.Comparator; import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.NavigableSet; import java.util.Random; +import java.util.Set; import java.util.TreeSet; +import org.apache.commons.collections.map.HashedMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; @@ -72,6 +82,7 @@ import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -85,14 +96,18 @@ int appAlloc = 0; boolean setAMContainer = false; + boolean setLabeledContainer = false; float setAMResourcePercent = 0.0f; Random rand = null; Clock mClock = null; Configuration conf = null; CapacityScheduler mCS = null; + RMContext rmContext = null; + RMNodeLabelsManager lm = null; CapacitySchedulerConfiguration schedConf = null; EventHandler mDisp = null; ResourceCalculator rc = new DefaultResourceCalculator(); + Resource clusterResources = null; final ApplicationAttemptId appA = ApplicationAttemptId.newInstance( ApplicationId.newInstance(TS, 0), 0); final ApplicationAttemptId appB = ApplicationAttemptId.newInstance( @@ -130,8 +145,12 @@ public void setup() { mClock = mock(Clock.class); mCS = mock(CapacityScheduler.class); when(mCS.getResourceCalculator()).thenReturn(rc); + lm = mock(RMNodeLabelsManager.class); schedConf = new CapacitySchedulerConfiguration(); when(mCS.getConfiguration()).thenReturn(schedConf); + rmContext = mock(RMContext.class); + when(mCS.getRMContext()).thenReturn(rmContext); + when(rmContext.getNodeLabelManager()).thenReturn(lm); mDisp = mock(EventHandler.class); rand = new Random(); long seed = rand.nextLong(); @@ -746,7 +765,51 @@ public void testSkipAMContainer() { verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB))); setAMContainer = false; } - + + @Test + public void testIdealAllocationForLabels() { + int[][] qData = new int[][] { + // / A B + { 80, 40, 40 }, // abs + { 80, 80, 80 }, // maxcap + { 80, 80, 0 }, // used + { 70, 20, 50 }, // pending + { 0, 0, 0 }, // reserved + { 5, 4, 1 }, // apps + { -1, 1, 1 }, // req granularity + { 2, 0, 0 }, // subqueues + }; + setAMContainer = true; + setLabeledContainer = true; + Map> labels = new HashMap>(); + NodeId node = NodeId.newInstance("node1", 0); + Set labelSet = new HashSet(); + labelSet.add("x"); + labels.put(node, labelSet); + when(lm.getNodeLabels()).thenReturn(labels); + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + // Subtracting Label X resources from cluster resources + when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn( + Resources.clone(Resource.newInstance(80, 0))); + clusterResources.setMemory(100); + policy.editSchedule(); + + // By skipping AM Container and Labeled container, all other 18 containers + // of appD will be + // preempted + verify(mDisp, times(18)).handle(argThat(new IsPreemptionRequestFor(appD))); + + // By skipping AM Container and Labeled container, all other 18 containers + // of appC will be + // preempted + verify(mDisp, times(18)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // rest 4 containers from appB will be preempted + verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB))); + setAMContainer = false; + setLabeledContainer = false; + } + @Test public void testPreemptSkippedAMContainers() { int[][] qData = new int[][] { @@ -846,7 +909,7 @@ ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) { ParentQueue mRoot = buildMockRootQueue(rand, qData); when(mCS.getRootQueue()).thenReturn(mRoot); - Resource clusterResources = + clusterResources = Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0); when(mCS.getClusterResource()).thenReturn(clusterResources); return policy; @@ -974,7 +1037,10 @@ FiCaSchedulerApp mockApp(int qid, int id, int used, int pending, int reserved, for (int i = 0; i < used; i += gran) { if(setAMContainer && i == 0){ cLive.add(mockContainer(appAttId, cAlloc, unit, 0)); - }else{ + }else if(setLabeledContainer && i ==1){ + cLive.add(mockContainer(appAttId, cAlloc, unit, 2)); + } + else{ cLive.add(mockContainer(appAttId, cAlloc, unit, 1)); } ++cAlloc; @@ -996,6 +1062,9 @@ RMContainer mockContainer(ApplicationAttemptId appAttId, int id, if(0 == priority){ when(mC.isAMContainer()).thenReturn(true); } + if(2 == priority){ + when(mC.getAllocatedNode()).thenReturn(NodeId.newInstance("node1", 0)); + } return mC; }