diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 87a2a00..ab2a66a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -550,9 +550,8 @@ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, // lock the leafqueue while we scan applications and unreserve synchronized (qT.leafQueue) { - NavigableSet ns = - (NavigableSet) qT.leafQueue.getApplications(); - Iterator desc = ns.descendingIterator(); + Iterator desc = + qT.leafQueue.getSchedulingOrder().getPreemptionIterator(); qT.actuallyPreempted = Resources.clone(resToObtain); while (desc.hasNext()) { FiCaSchedulerApp fc = desc.next(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index c86c0ff..351d271 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1473,7 +1473,7 @@ public synchronized void removeQueue(String queueName) // at this point we should have no more apps if (disposableLeafQueue.getNumApplications() > 0) { throw new SchedulerDynamicEditException("The queue " + queueName - + " is not empty " + disposableLeafQueue.getApplications().size() + + " is not empty " + disposableLeafQueue.getNumActiveApplications() + " active apps " + disposableLeafQueue.pendingApplications.size() + " pending apps"); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 102e553..504b52d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -48,6 +48,9 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*; + + import com.google.common.collect.ImmutableSet; public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration { @@ -116,7 +119,11 @@ @Private public static final String MAXIMUM_ALLOCATION_VCORES = "maximum-allocation-vcores"; - + + public static final String ORDERING_POLICY = "ordering-policy"; + + public static final String DEFAULT_ORDERING_POLICY = "fifo"; + @Private public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000; @@ -373,6 +380,18 @@ public int getUserLimit(String queue) { DEFAULT_USER_LIMIT); return userLimit; } + + @SuppressWarnings("unchecked") + public SchedulingOrder getSchedulingOrder( + String queue) { + + String policyConfig = get(getQueuePrefix(queue) + ORDERING_POLICY, + DEFAULT_ORDERING_POLICY); + + SchedulingOrder schedulingOrder = new SchedulingOrder(); + schedulingOrder.configurePolicy(policyConfig); + return schedulingOrder; + } public void setUserLimit(String queue, int userLimit) { setInt(getQueuePrefix(queue) + USER_LIMIT, userLimit); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 3e5405d..e428ee4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -93,7 +94,6 @@ private int nodeLocalityDelay; - Set activeApplications; Map applicationAttemptMap = new HashMap(); @@ -121,6 +121,10 @@ private volatile ResourceLimits currentResourceLimits = null; + private SchedulingOrder + schedulingOrder = + new SchedulingOrder(new FifoOrderingPolicy()); + public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); @@ -137,7 +141,6 @@ public LeafQueue(CapacitySchedulerContext cs, cs.getApplicationComparator(); this.pendingApplications = new TreeSet(applicationComparator); - this.activeApplications = new TreeSet(applicationComparator); setupQueueConfigs(cs.getClusterResource()); } @@ -159,6 +162,9 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) setQueueResourceLimitsInfo(clusterResource); CapacitySchedulerConfiguration conf = csContext.getConfiguration(); + + setSchedulingOrder(conf.getSchedulingOrder(getQueuePath())); + userLimit = conf.getUserLimit(getQueuePath()); userLimitFactor = conf.getUserLimitFactor(getQueuePath()); @@ -322,7 +328,7 @@ public synchronized int getNumPendingApplications() { } public synchronized int getNumActiveApplications() { - return activeApplications.size(); + return schedulingOrder.getNumSchedulableProcesses(); } @Private @@ -637,7 +643,7 @@ private synchronized void activateApplications() { } } user.activateApplication(); - activeApplications.add(application); + schedulingOrder.addSchedulableProcess(application); queueUsage.incAMUsed(application.getAMResource()); user.getResourceUsage().incAMUsed(application.getAMResource()); i.remove(); @@ -686,7 +692,8 @@ public void finishApplicationAttempt(FiCaSchedulerApp application, String queue) public synchronized void removeApplicationAttempt( FiCaSchedulerApp application, User user) { - boolean wasActive = activeApplications.remove(application); + boolean wasActive = + schedulingOrder.removeSchedulableProcess(application); if (!wasActive) { pendingApplications.remove(application); } else { @@ -755,7 +762,8 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, if(LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() - + " #applications=" + activeApplications.size()); + + " #applications=" + + schedulingOrder.getNumSchedulableProcesses()); } // if our queue cannot access this node, just return @@ -775,9 +783,10 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, } } - // Try to assign containers to applications in order - for (FiCaSchedulerApp application : activeApplications) { - + for (Iterator assignmentIterator = + schedulingOrder.getAssignmentIterator(); + assignmentIterator.hasNext();) { + FiCaSchedulerApp application = assignmentIterator.next(); if(LOG.isDebugEnabled()) { LOG.debug("pre-assignContainers for application " + application.getApplicationId()); @@ -1542,6 +1551,9 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod if (allocatedContainer == null) { return Resources.none(); } + + // Inform the ordering policy + schedulingOrder.containerAllocated(application, allocatedContainer); // Inform the node node.allocateContainer(allocatedContainer); @@ -1642,6 +1654,10 @@ public void completedContainer(Resource clusterResource, removed = application.containerCompleted(rmContainer, containerStatus, event, node.getPartition()); + + // Inform the ordering policy + schedulingOrder.containerReleased(application, rmContainer); + node.releaseContainer(container); } @@ -1749,7 +1765,8 @@ public synchronized void updateClusterResource(Resource clusterResource, activateApplications(); // Update application properties - for (FiCaSchedulerApp application : activeApplications) { + for (FiCaSchedulerApp application : + schedulingOrder.getSchedulableProcesses()) { synchronized (application) { computeUserLimitAndSetHeadroom(application, clusterResource, Resources.none(), null); @@ -1855,19 +1872,19 @@ public void recoverContainer(Resource clusterResource, } getParent().recoverContainer(clusterResource, attempt, rmContainer); } - + /** * Obtain (read-only) collection of active applications. */ - public Set getApplications() { - // need to access the list of apps from the preemption monitor - return activeApplications; + public Collection getApplications() { + return schedulingOrder.getSchedulableProcesses(); } // return a single Resource capturing the overal amount of pending resources public synchronized Resource getTotalResourcePending() { Resource ret = BuilderUtils.newResource(0, 0); - for (FiCaSchedulerApp f : activeApplications) { + for (FiCaSchedulerApp f : + schedulingOrder.getSchedulableProcesses()) { Resources.addTo(ret, f.getTotalPendingRequests()); } return ret; @@ -1879,7 +1896,8 @@ public synchronized void collectSchedulerApplications( for (FiCaSchedulerApp pendingApp : pendingApplications) { apps.add(pendingApp.getApplicationAttemptId()); } - for (FiCaSchedulerApp app : activeApplications) { + for (FiCaSchedulerApp app : + schedulingOrder.getSchedulableProcesses()) { apps.add(app.getApplicationAttemptId()); } } @@ -1932,6 +1950,19 @@ public void setMaxApplications(int maxApplications) { this.maxApplications = maxApplications; } + public synchronized SchedulingOrder + getSchedulingOrder() { + return schedulingOrder; + } + + public synchronized void setSchedulingOrder( + SchedulingOrder schedulingOrder) { + schedulingOrder.addAllSchedulableProcesses( + this.schedulingOrder.getSchedulableProcesses() + ); + this.schedulingOrder = schedulingOrder; + } + /* * Holds shared values used by all applications in * the queue to calculate headroom on demand diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java index f1e1e8c..152c718 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java @@ -92,6 +92,7 @@ protected void render(Block html) { _("Configured Minimum User Limit Percent:", Integer.toString(lqinfo.getUserLimit()) + "%"). _("Configured User Limit Factor:", String.format("%.1f", lqinfo.getUserLimitFactor())). _("Accessible Node Labels:", StringUtils.join(",", lqinfo.getNodeLabels())). + _("Ordering Policy: ", lqinfo.getSchedulingOrderInfo()). _("Preemption:", lqinfo.getPreemptionDisabled() ? "disabled" : "enabled"); html._(InfoBlock.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java index 5258b3d..fda2467 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java @@ -39,6 +39,7 @@ protected ResourceInfo usedAMResource; protected ResourceInfo userAMResourceLimit; protected boolean preemptionDisabled; + protected String orderingPolicyInfo; CapacitySchedulerLeafQueueInfo() { }; @@ -57,6 +58,7 @@ usedAMResource = new ResourceInfo(q.getQueueResourceUsage().getAMUsed()); userAMResourceLimit = new ResourceInfo(q.getUserAMResourceLimit()); preemptionDisabled = q.getPreemptionDisabled(); + orderingPolicyInfo = q.getSchedulingOrder().getInfo(); } public int getNumActiveApplications() { @@ -107,4 +109,8 @@ public ResourceInfo getUserAMResourceLimit() { public boolean getPreemptionDisabled() { return preemptionDisabled; } + + public String getSchedulingOrderInfo() { + return orderingPolicyInfo; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 8f5237e..a8f8a9c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -25,6 +25,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -38,6 +39,8 @@ import static org.mockito.Mockito.when; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.util.ArrayList; import java.util.Comparator; @@ -46,6 +49,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Iterator; import java.util.Map; import java.util.NavigableSet; import java.util.Random; @@ -1032,7 +1036,7 @@ LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, when(lq.getTotalResourcePending()).thenReturn( Resource.newInstance(pending[i], 0)); // consider moving where CapacityScheduler::comparator accessible - NavigableSet qApps = new TreeSet( + final NavigableSet qApps = new TreeSet( new Comparator() { @Override public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { @@ -1056,6 +1060,13 @@ public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { .thenReturn(appAttemptIdList); } when(lq.getApplications()).thenReturn(qApps); + SchedulingOrder so = mock(SchedulingOrder.class); + when(so.getPreemptionIterator()).thenAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) { + return qApps.descendingIterator(); + } + }); + when(lq.getSchedulingOrder()).thenReturn(so); if(setAMResourcePercent != 0.0f){ when(lq.getMaxAMResourcePerQueuePercent()).thenReturn(setAMResourcePercent); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 1ca5c97..61c2e06 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -155,6 +155,7 @@ private FiCaSchedulerApp getMockApplication(int appId, String user, doReturn(applicationAttemptId). when(application).getApplicationAttemptId(); doReturn(user).when(application).getUser(); doReturn(amResource).when(application).getAMResource(); + when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))).thenCallRealMethod(); return application; } @@ -469,7 +470,7 @@ public void testActiveLimitsWithKilledApps() throws Exception { assertEquals(0, queue.getNumPendingApplications()); assertEquals(1, queue.getNumActiveApplications(user_0)); assertEquals(0, queue.getNumPendingApplications(user_0)); - assertTrue(queue.activeApplications.contains(app_0)); + assertTrue(queue.getApplications().contains(app_0)); // Submit second application FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0, @@ -479,7 +480,7 @@ public void testActiveLimitsWithKilledApps() throws Exception { assertEquals(0, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); assertEquals(0, queue.getNumPendingApplications(user_0)); - assertTrue(queue.activeApplications.contains(app_1)); + assertTrue(queue.getApplications().contains(app_1)); // Submit third application, should remain pending FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0, @@ -508,7 +509,7 @@ public void testActiveLimitsWithKilledApps() throws Exception { assertEquals(2, queue.getNumActiveApplications(user_0)); assertEquals(1, queue.getNumPendingApplications(user_0)); assertFalse(queue.pendingApplications.contains(app_2)); - assertFalse(queue.activeApplications.contains(app_2)); + assertFalse(queue.getApplications().contains(app_2)); // Finish 1st application, app_3 should become active queue.finishApplicationAttempt(app_0, A); @@ -516,9 +517,9 @@ public void testActiveLimitsWithKilledApps() throws Exception { assertEquals(0, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); assertEquals(0, queue.getNumPendingApplications(user_0)); - assertTrue(queue.activeApplications.contains(app_3)); + assertTrue(queue.getApplications().contains(app_3)); assertFalse(queue.pendingApplications.contains(app_3)); - assertFalse(queue.activeApplications.contains(app_0)); + assertFalse(queue.getApplications().contains(app_0)); // Finish 2nd application queue.finishApplicationAttempt(app_1, A); @@ -526,7 +527,7 @@ public void testActiveLimitsWithKilledApps() throws Exception { assertEquals(0, queue.getNumPendingApplications()); assertEquals(1, queue.getNumActiveApplications(user_0)); assertEquals(0, queue.getNumPendingApplications(user_0)); - assertFalse(queue.activeApplications.contains(app_1)); + assertFalse(queue.getApplications().contains(app_1)); // Finish 4th application queue.finishApplicationAttempt(app_3, A); @@ -534,7 +535,7 @@ public void testActiveLimitsWithKilledApps() throws Exception { assertEquals(0, queue.getNumPendingApplications()); assertEquals(0, queue.getNumActiveApplications(user_0)); assertEquals(0, queue.getNumPendingApplications(user_0)); - assertFalse(queue.activeApplications.contains(app_3)); + assertFalse(queue.getApplications().contains(app_3)); } @Test diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 972cabb..c34c302 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -73,6 +73,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulingOrder; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.CompoundOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulingOrder; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableProcess; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -381,6 +387,45 @@ public void testUserQueueAcl() throws Exception { d.submitApplicationAttempt(app_1, user_d); // same user } + @Test + public void testPolicyConfiguration() throws Exception { + + CapacitySchedulerConfiguration testConf = + new CapacitySchedulerConfiguration(); + + String tproot = CapacitySchedulerConfiguration.ROOT + "." + + "testPolicyRoot" + System.currentTimeMillis(); + + SchedulingOrder schedOrder = + testConf.getSchedulingOrder(tproot); + + //tests for expected defaults + SchedulingOrder comPol = + (SchedulingOrder) schedOrder; + FifoOrderingPolicy fcomp = (FifoOrderingPolicy) schedOrder.getOrderingPolicy(); + + //override comparator default to compound + String comparatorConfig = CapacitySchedulerConfiguration.PREFIX + tproot + + "." + CapacitySchedulerConfiguration.ORDERING_POLICY; + + //set multiple fifo's (just for testing), this will result in a + //compound containing multiple fifos + testConf.set(comparatorConfig, "fifo+fifo"); + schedOrder = + testConf.getSchedulingOrder(tproot); + comPol = + (SchedulingOrder) schedOrder; + CompoundOrderingPolicy ccomp = (CompoundOrderingPolicy) comPol.getOrderingPolicy(); + + List> comparators = + ccomp.getPolicies(); + assertEquals(2, comparators.size()); + for (OrderingPolicy comparator : comparators) { + assertTrue(comparator instanceof FifoOrderingPolicy); + } + + + } @Test public void testAppAttemptMetrics() throws Exception { @@ -2011,7 +2056,7 @@ public void testActivateApplicationAfterQueueRefresh() throws Exception { e.submitApplicationAttempt(app_2, user_e); // same user // before reinitialization - assertEquals(2, e.activeApplications.size()); + assertEquals(2, e.getNumActiveApplications()); assertEquals(1, e.pendingApplications.size()); csConf.setDouble(CapacitySchedulerConfiguration @@ -2028,7 +2073,7 @@ public void testActivateApplicationAfterQueueRefresh() throws Exception { root.reinitialize(newRoot, csContext.getClusterResource()); // after reinitialization - assertEquals(3, e.activeApplications.size()); + assertEquals(3, e.getNumActiveApplications()); assertEquals(0, e.pendingApplications.size()); } @@ -2092,7 +2137,7 @@ public void testActivateApplicationByUpdatingClusterResource() e.submitApplicationAttempt(app_2, user_e); // same user // before updating cluster resource - assertEquals(2, e.activeApplications.size()); + assertEquals(2, e.getNumActiveApplications()); assertEquals(1, e.pendingApplications.size()); Resource clusterResource = Resources.createResource(200 * 16 * GB, 100 * 32); @@ -2100,7 +2145,7 @@ public void testActivateApplicationByUpdatingClusterResource() new ResourceLimits(clusterResource)); // after updating cluster resource - assertEquals(3, e.activeApplications.size()); + assertEquals(3, e.getNumActiveApplications()); assertEquals(0, e.pendingApplications.size()); } @@ -2451,6 +2496,84 @@ public void testAllocateContainerOnNodeWithoutOffSwitchSpecified() + "forget to set off-switch request should be handled"); } } + + @Test + public void testFifoAssignment() throws Exception { + + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + + a.setSchedulingOrder( + new SchedulingOrder(new FifoOrderingPolicy())); + + String host_0_0 = "127.0.0.1"; + String rack_0 = "rack_0"; + FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 16*GB); + + final int numNodes = 4; + Resource clusterResource = Resources.createResource( + numNodes * (16*GB), numNodes * 16); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + String user_0 = "user_0"; + + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), spyRMContext)); + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), spyRMContext)); + a.submitApplicationAttempt(app_1, user_0); + + Priority priority = TestUtils.createMockPriority(1); + List app_0_requests_0 = new ArrayList(); + List app_1_requests_0 = new ArrayList(); + + app_0_requests_0.clear(); + app_0_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, + true, priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + app_1_requests_0.clear(); + app_1_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, + true, priority, recordFactory)); + app_1.updateResourceRequests(app_1_requests_0); + + a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource)); + Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource)); + Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); + + app_0_requests_0.clear(); + app_0_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, + true, priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + app_1_requests_0.clear(); + app_1_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, + true, priority, recordFactory)); + app_1.updateResourceRequests(app_1_requests_0); + + //Even thought it already has more resources, app_0 will still get + //assigned first + a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource)); + Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); + Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); + + //and only then will app_1 + a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource)); + Assert.assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); + + } @Test public void testConcurrentAccess() throws Exception { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java index eb42679..9d5b5d6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java @@ -347,7 +347,7 @@ private void verifySubQueue(JSONObject info, String q, int numExpectedElements = 13; boolean isParentQueue = true; if (!info.has("queues")) { - numExpectedElements = 25; + numExpectedElements = 26; isParentQueue = false; } assertEquals("incorrect number of elements", numExpectedElements, info.length());