diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java index 3f8ed5552a4..12aff02ca10 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java @@ -63,9 +63,14 @@ public static void recordRejectedAppActivityFromLeafQueue( SchedulerApplicationAttempt application, Priority priority, String diagnostic) { String type = "app"; - recordActivity(activitiesManager, node, application.getQueueName(), - application.getApplicationId().toString(), priority, - ActivityState.REJECTED, diagnostic, type); + if (activitiesManager == null) { + return; + } + if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { + recordActivity(activitiesManager, node, application.getQueueName(), + application.getApplicationId().toString(), priority, + ActivityState.REJECTED, diagnostic, type); + } finishSkippedAppAllocationRecording(activitiesManager, application.getApplicationId(), ActivityState.REJECTED, diagnostic); } @@ -203,8 +208,13 @@ public static void finishSkippedAppAllocationRecording( public static void recordQueueActivity(ActivitiesManager activitiesManager, SchedulerNode node, String parentQueueName, String queueName, ActivityState state, String diagnostic) { - recordActivity(activitiesManager, node, parentQueueName, queueName, null, - state, diagnostic, null); + if (activitiesManager == null) { + return; + } + if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { + recordActivity(activitiesManager, node, parentQueueName, queueName, + null, state, diagnostic, null); + } } } @@ -266,13 +276,10 @@ public static void startNodeUpdateRecording( private static void recordActivity(ActivitiesManager activitiesManager, SchedulerNode node, String parentName, String childName, Priority priority, ActivityState state, String diagnostic, String type) { - if (activitiesManager == null) { - return; - } - if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { - activitiesManager.addSchedulingActivityForNode(node.getNodeID(), - parentName, childName, priority != null ? priority.toString() : null, - state, diagnostic, type); - } + + activitiesManager.addSchedulingActivityForNode(node.getNodeID(), parentName, + childName, priority != null ? priority.toString() : null, state, + diagnostic, type); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 013a5ac720f..0850119d55f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1026,6 +1026,8 @@ public CSAssignment assignContainers(Resource clusterResource, return CSAssignment.NULL_ASSIGNMENT; } + Map userLimits = new HashMap<>(); + boolean needAssignToQueueCheck = true; for (Iterator assignmentIterator = orderingPolicy.getAssignmentIterator(); assignmentIterator.hasNext(); ) { @@ -1035,24 +1037,50 @@ public CSAssignment assignContainers(Resource clusterResource, node.getNodeID(), SystemClock.getInstance().getTime(), application); // Check queue max-capacity limit - if (!super.canAssignToThisQueue(clusterResource, ps.getPartition(), - currentResourceLimits, application.getCurrentReservation(), - schedulingMode)) { - ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( - activitiesManager, node, application, application.getPriority(), - ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.EMPTY); - return CSAssignment.NULL_ASSIGNMENT; + Resource rsrv = application.getCurrentReservation(); + if (needAssignToQueueCheck) { + if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), + currentResourceLimits, rsrv, schedulingMode)) { + ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( + activitiesManager, node, application, application.getPriority(), + ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY); + return CSAssignment.NULL_ASSIGNMENT; + } + // If there was no reservation and canAssignToThisQueue returned + // true, there is no reason to check further. + if (!this.reservationsContinueLooking + || rsrv.equals(Resources.none())) { + needAssignToQueueCheck = false; + } } + CachedUserLimit cul = userLimits.get(application.getUser()); + Resource cachedUserLimit = null; + if (cul != null) { + cachedUserLimit = cul.userLimit; + } Resource userLimit = computeUserLimitAndSetHeadroom(application, - clusterResource, ps.getPartition(), schedulingMode); - + clusterResource, ps.getPartition(), schedulingMode, cachedUserLimit); + if (cul == null) { + cul = new CachedUserLimit(userLimit); + userLimits.put(application.getUser(), cul); + } // Check user limit - if (!canAssignToUser(clusterResource, application.getUser(), userLimit, - application, ps.getPartition(), currentResourceLimits)) { + boolean userAssignable = true; + if (!cul.canAssign && Resources.fitsIn(rsrv, cul.reservation)) { + userAssignable = false; + } else { + userAssignable = canAssignToUser(clusterResource, application.getUser(), + userLimit, application, node.getPartition(), currentResourceLimits); + if (!userAssignable && Resources.fitsIn(cul.reservation, rsrv)) { + cul.canAssign = false; + cul.reservation = rsrv; + } + } + if (!userAssignable) { application.updateAMContainerDiagnostics(AMState.ACTIVATED, "User capacity has reached its maximum limit."); ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( @@ -1127,7 +1155,7 @@ public boolean accept(Resource cluster, // check user-limit Resource userLimit = computeUserLimitAndSetHeadroom(app, cluster, p, - allocation.getSchedulingMode()); + allocation.getSchedulingMode(), null); // Deduct resources that we can release Resource usedResource = Resources.clone(getUser(username).getUsed(p)); @@ -1332,19 +1360,20 @@ private void setQueueResourceLimitsInfo( @Lock({LeafQueue.class}) Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, Resource clusterResource, String nodePartition, - SchedulingMode schedulingMode) { + SchedulingMode schedulingMode, Resource userLimit) { String user = application.getUser(); User queueUser = getUser(user); // Compute user limit respect requested labels, // TODO, need consider headroom respect labels also - Resource userLimit = - getResourceLimitForActiveUsers(application.getUser(), clusterResource, - nodePartition, schedulingMode); - + if (userLimit == null) { + userLimit = getResourceLimitForActiveUsers(application.getUser(), + clusterResource, nodePartition, schedulingMode); + } setQueueResourceLimitsInfo(clusterResource); Resource headroom = + metrics.getUserMetrics(user) == null ? Resources.none() : getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(), clusterResource, userLimit, nodePartition); @@ -1352,7 +1381,7 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, LOG.debug("Headroom calculation for user " + user + ": " + " userLimit=" + userLimit + " queueMaxAvailRes=" + cachedResourceLimitsForHeadroom.getLimit() + " consumed=" - + queueUser.getUsed() + " headroom=" + headroom + " partition=" + + queueUser.getUsed() + " partition=" + nodePartition); } @@ -1713,7 +1742,7 @@ public void updateClusterResource(Resource clusterResource, .getSchedulableEntities()) { computeUserLimitAndSetHeadroom(application, clusterResource, RMNodeLabelsManager.NO_LABEL, - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null); } } finally { writeLock.unlock(); @@ -2052,4 +2081,14 @@ public void stopQueue() { public Set getAllUsers() { return this.getUsersManager().getUsers().keySet(); } + + static class CachedUserLimit { + final Resource userLimit; + boolean canAssign = true; + Resource reservation = Resources.none(); + + CachedUserLimit(Resource userLimit) { + this.userLimit = userLimit; + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 3c6e6dfb673..570cd24725a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -33,10 +33,14 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Enumeration; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; @@ -50,6 +54,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -131,6 +136,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.RegularContainerAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -156,9 +162,14 @@ import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.mockito.Mockito; @@ -3492,6 +3503,7 @@ public void testApplicationHeadRoom() throws Exception { rm.stop(); } + @Test public void testHeadRoomCalculationWithDRC() throws Exception { // test with total cluster resource of 20GB memory and 20 vcores. @@ -4074,6 +4086,139 @@ public void testCSReservationWithRootUnblocked() throws Exception { rm.stop(); } + @Test (timeout = 300000) + public void testUserLimitThroughput() throws Exception { + // Since this is more of a performance unit test, only run if + // RunUserLimitThroughput is set (-DRunUserLimitThroughput=true) + Assume.assumeTrue(Boolean.valueOf(System.getProperty("RunUserLimitThroughput"))); + + CapacitySchedulerConfiguration csconf = + new CapacitySchedulerConfiguration(); + csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f); + csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f); + csconf.setMaximumApplicationMasterResourcePerQueuePercent("root.default", 100.0f); + csconf.setMaximumAMResourcePercentPerPartition("root.default", "", 100.0f); + csconf.setResourceComparator(DominantResourceCalculator.class); + + YarnConfiguration conf = new YarnConfiguration(csconf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + MockRM rm = new MockRM(conf); + rm.start(); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + LeafQueue qb = (LeafQueue)cs.getQueue("default"); + + // For now make user limit large so we can activate all applications + qb.setUserLimitFactor((float)100.0); + qb.setupConfigurableCapacities(); + + SchedulerEvent addAppEvent; + SchedulerEvent addAttemptEvent; + Container container = mock(Container.class); + ApplicationSubmissionContext submissionContext = mock( + ApplicationSubmissionContext.class); + + final int APPS = 100; + ApplicationId[] appids = new ApplicationId[APPS]; + RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[APPS]; + ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[APPS]; + RMAppImpl[] apps = new RMAppImpl[APPS]; + RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[APPS]; + for (int i=0;i loggers=LogManager.getCurrentLoggers(); + loggers.hasMoreElements(); ) { + Logger logger = (Logger) loggers.nextElement(); + logger.setLevel(Level.WARN); + } + final int TOPN = 20; + final int ITERATIONS = 2000000; + final int PRINTINTERVAL = 20000; + final float F = 1000.0f * PRINTINTERVAL; + PriorityQueue queue = new PriorityQueue<>(TOPN, Collections.reverseOrder()); + + long n = Time.monotonicNow(); + long timespent = 0; + for (int i = 0; i < ITERATIONS; i+=2) { + if (i > 0 && i % PRINTINTERVAL == 0){ + long ts = (Time.monotonicNow() - n); + if (queue.size() < TOPN) { + queue.offer(ts); + } else { + Long last = queue.peek(); + if (last > ts) { + queue.poll(); + queue.offer(ts); + } + } + System.out.println(i + " " + (F / ts)); + n= Time.monotonicNow(); + } + cs.handle(new NodeUpdateSchedulerEvent(node)); + cs.handle(new NodeUpdateSchedulerEvent(node2)); + } + timespent=0; + int entries = queue.size(); + while(queue.size() > 0){ + long l = queue.poll(); + timespent += l; + } + System.out.println("Avg of fastest " + entries + ": " + + F / (timespent / entries)); + } + @Test public void testCSQueueBlocked() throws Exception { CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 4417132a774..2864d7fd7f2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -1146,7 +1146,7 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps); qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, - "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null); //maxqueue 16G, userlimit 13G, - 4G used = 9G assertEquals(9*GB,app_0.getHeadroom().getMemorySize()); @@ -1169,7 +1169,7 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps); qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, - "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null); assertEquals(8*GB, qb.getUsedResources().getMemorySize()); assertEquals(4*GB, app_0.getCurrentConsumption().getMemorySize()); @@ -1219,7 +1219,7 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps); qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, - "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null); assertEquals(4*GB, qb.getUsedResources().getMemorySize()); //maxqueue 16G, userlimit 7G, used (by each user) 2G, headroom 5G (both) assertEquals(5*GB, app_3.getHeadroom().getMemorySize()); @@ -1240,9 +1240,9 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps); qb.computeUserLimitAndSetHeadroom(app_4, clusterResource, - "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null); qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, - "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null); //app3 is user1, active from last test case