diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 5604f0f..e1f0619 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -128,11 +128,14 @@ public long getNewContainerId() { * * @param requests resources to be acquired * @param recoverPreemptedRequest recover Resource Request on preemption + * @return true if any resource was updated, false else */ - synchronized public void updateResourceRequests( + synchronized public boolean updateResourceRequests( List requests, boolean recoverPreemptedRequest) { QueueMetrics metrics = queue.getMetrics(); + boolean anyResourcesUpdated = false; + // Update resource requests for (ResourceRequest request : requests) { Priority priority = request.getPriority(); @@ -146,6 +149,7 @@ synchronized public void updateResourceRequests( + request); } updatePendingResources = true; + anyResourcesUpdated = true; // Premature optimization? // Assumes that we won't see more than one priority request updated @@ -209,6 +213,7 @@ synchronized public void updateResourceRequests( } } } + return anyResourcesUpdated; } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 0554c04..dbc3cb5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -284,11 +284,12 @@ public Queue getQueue() { return queue; } - public synchronized void updateResourceRequests( + public synchronized boolean updateResourceRequests( List requests) { if (!isStopped) { - appSchedulingInfo.updateResourceRequests(requests, false); + return appSchedulingInfo.updateResourceRequests(requests, false); } + return false; } public synchronized void recoverResourceRequests( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 48c7f2f..06d282d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -895,6 +895,10 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, // Release containers releaseContainers(release, application); + Allocation allocation; + + LeafQueue updateDemandForQueue = null; + synchronized (application) { // make sure we aren't stopping/removing the application @@ -915,8 +919,10 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, application.showRequests(); // Update application requests - application.updateResourceRequests(ask); - + if (application.updateResourceRequests(ask)) { + updateDemandForQueue = (LeafQueue) application.getQueue(); + } + LOG.debug("allocate: post-update"); application.showRequests(); } @@ -929,9 +935,16 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, application.updateBlacklist(blacklistAdditions, blacklistRemovals); - return application.getAllocation(getResourceCalculator(), + allocation = application.getAllocation(getResourceCalculator(), clusterResource, getMinimumResourceCapability()); } + + if (updateDemandForQueue != null) { + updateDemandForQueue.getOrderingPolicy().demandUpdated(application); + } + + return allocation; + } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java index e046fcf..61c5cbc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java @@ -37,6 +37,7 @@ protected TreeSet schedulableEntities; protected Comparator comparator; + protected Map entitiesToReorder = new HashMap(); public AbstractComparatorOrderingPolicy() { } @@ -47,11 +48,13 @@ public AbstractComparatorOrderingPolicy() { } @Override public Iterator getAssignmentIterator() { + reorderScheduleEntities(); return schedulableEntities.iterator(); } @Override public Iterator getPreemptionIterator() { + reorderScheduleEntities(); return schedulableEntities.descendingIterator(); } @@ -68,6 +71,22 @@ protected void reorderSchedulableEntity(S schedulableEntity) { schedulableEntities.add(schedulableEntity); } + protected void reorderScheduleEntities() { + synchronized (entitiesToReorder) { + for (Map.Entry entry : + entitiesToReorder.entrySet()) { + reorderSchedulableEntity(entry.getValue()); + } + entitiesToReorder.clear(); + } + } + + protected void entityRequiresReordering(S schedulableEntity) { + synchronized (entitiesToReorder) { + entitiesToReorder.put(schedulableEntity.getId(), schedulableEntity); + } + } + @VisibleForTesting public Comparator getComparator() { return comparator; @@ -80,6 +99,9 @@ public void addSchedulableEntity(S s) { @Override public boolean removeSchedulableEntity(S s) { + synchronized (entitiesToReorder) { + entitiesToReorder.remove(s.getId()); + } return schedulableEntities.remove(s); } @@ -105,6 +127,9 @@ public abstract void containerReleased(S schedulableEntity, RMContainer r); @Override + public abstract void demandUpdated(S schedulableEntity); + + @Override public abstract String getInfo(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java index 3ab74de..ea14b42 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java @@ -96,16 +96,23 @@ public void configure(Map conf) { @Override public void containerAllocated(S schedulableEntity, RMContainer r) { - reorderSchedulableEntity(schedulableEntity); + entityRequiresReordering(schedulableEntity); } @Override public void containerReleased(S schedulableEntity, RMContainer r) { - reorderSchedulableEntity(schedulableEntity); + entityRequiresReordering(schedulableEntity); } @Override + public void demandUpdated(S schedulableEntity) { + if (sizeBasedWeight) { + entityRequiresReordering(schedulableEntity); + } + } + + @Override public String getInfo() { String sbw = sizeBasedWeight ? " with sizeBasedWeight" : ""; return "FairOrderingPolicy" + sbw; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java index 932a5f9..74a422c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java @@ -46,7 +46,11 @@ public void containerAllocated(S schedulableEntity, public void containerReleased(S schedulableEntity, RMContainer r) { } - + + @Override + public void demandUpdated(S schedulableEntity) { + } + @Override public String getInfo() { return "FifoOrderingPolicy"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java index aebdcde..e3f67ce 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java @@ -102,6 +102,11 @@ public void containerReleased(S schedulableEntity, RMContainer r); /** + * Demand Updated for the passed schedulableEntity, reorder if needed. + */ + void demandUpdated(S schedulableEntity); + + /** * Display information regarding configuration & status */ public String getInfo(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index d360581..7b665e4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -72,6 +72,8 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.nodelabels.RMNodeLabel; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; @@ -126,6 +128,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -677,6 +680,118 @@ public void testBlackListNodes() throws Exception { } @Test + public void testAllocateReorder() throws Exception { + + //Confirm that allocation (resource request) alone will trigger a change in + //application ordering where appropriate + + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + LeafQueue q = (LeafQueue) cs.getQueue("default"); + Assert.assertNotNull(q); + + FairOrderingPolicy fop = new FairOrderingPolicy(); + fop.setSizeBasedWeight(true); + q.setOrderingPolicy(fop); + + String host = "127.0.0.1"; + RMNode node = + MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host); + cs.handle(new NodeAddedSchedulerEvent(node)); + + //add app begin + ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1); + ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId( + appId1, 1); + + RMAppAttemptMetrics attemptMetric1 = + new RMAppAttemptMetrics(appAttemptId1, rm.getRMContext()); + RMAppImpl app1 = mock(RMAppImpl.class); + when(app1.getApplicationId()).thenReturn(appId1); + RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class); + when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1); + when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1); + when(app1.getCurrentAppAttempt()).thenReturn(attempt1); + + rm.getRMContext().getRMApps().put(appId1, app1); + + SchedulerEvent addAppEvent1 = + new AppAddedSchedulerEvent(appId1, "default", "user"); + cs.handle(addAppEvent1); + SchedulerEvent addAttemptEvent1 = + new AppAttemptAddedSchedulerEvent(appAttemptId1, false); + cs.handle(addAttemptEvent1); + //add app end + + //add app begin + ApplicationId appId2 = BuilderUtils.newApplicationId(100, 2); + ApplicationAttemptId appAttemptId2 = BuilderUtils.newApplicationAttemptId( + appId2, 1); + + RMAppAttemptMetrics attemptMetric2 = + new RMAppAttemptMetrics(appAttemptId2, rm.getRMContext()); + RMAppImpl app2 = mock(RMAppImpl.class); + when(app2.getApplicationId()).thenReturn(appId2); + RMAppAttemptImpl attempt2 = mock(RMAppAttemptImpl.class); + when(attempt2.getAppAttemptId()).thenReturn(appAttemptId2); + when(attempt2.getRMAppAttemptMetrics()).thenReturn(attemptMetric2); + when(app2.getCurrentAppAttempt()).thenReturn(attempt2); + + rm.getRMContext().getRMApps().put(appId2, app2); + + SchedulerEvent addAppEvent2 = + new AppAddedSchedulerEvent(appId2, "default", "user"); + cs.handle(addAppEvent2); + SchedulerEvent addAttemptEvent2 = + new AppAttemptAddedSchedulerEvent(appAttemptId2, false); + cs.handle(addAttemptEvent2); + //add app end + + RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + Priority priority = TestUtils.createMockPriority(1); + ResourceRequest r1 = TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory); + + //This will allocate for app1 + cs.allocate(appAttemptId1, + Collections.singletonList(r1), + Collections.emptyList(), + null, null); + + //And this will result in container assignment for app1 + CapacityScheduler.schedule(cs); + + //Verify that app1 is still first in assignment order + //This happens because app2 has no demand/a magnitude of NaN, which + //results in app1 and app2 being equal in the fairness comparison and + //failling back to fifo (start) ordering + assertEquals(q.getOrderingPolicy().getAssignmentIterator().next().getId(), + appId1.toString()); + + //Now, allocate for app2 (this would be the first/AM allocation) + ResourceRequest r2 = TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId2, + Collections.singletonList(r2), + Collections.emptyList(), + null, null); + + //In this case we do not perform container assignment because we want to + //verify re-ordering based on the allocation alone + + //Now, the first app for assignment is app2 + assertEquals(q.getOrderingPolicy().getAssignmentIterator().next().getId(), + appId2.toString()); + + rm.stop(); + } + + @Test public void testResourceOverCommit() throws Exception { Configuration conf = new Configuration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,