diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 366bad0a4f2..2943c663ec5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1106,8 +1106,9 @@ public CSAssignment assignContainers(Resource clusterResource, return CSAssignment.NULL_ASSIGNMENT; } - Map userLimits = new HashMap<>(); boolean needAssignToQueueCheck = true; + Map userLimitCache = new HashMap<>(); + for (Iterator assignmentIterator = orderingPolicy.getAssignmentIterator(); assignmentIterator.hasNext(); ) { @@ -1137,30 +1138,9 @@ public CSAssignment assignContainers(Resource clusterResource, } } - CachedUserLimit cul = userLimits.get(application.getUser()); - Resource cachedUserLimit = null; - if (cul != null) { - cachedUserLimit = cul.userLimit; - } - Resource userLimit = computeUserLimitAndSetHeadroom(application, - clusterResource, candidates.getPartition(), schedulingMode, - cachedUserLimit); - if (cul == null) { - cul = new CachedUserLimit(userLimit); - userLimits.put(application.getUser(), cul); - } - // Check user limit - boolean userAssignable = true; - if (!cul.canAssign && Resources.fitsIn(appReserved, cul.reservation)) { - userAssignable = false; - } else { - userAssignable = canAssignToUser(clusterResource, application.getUser(), - userLimit, application, node.getPartition(), currentResourceLimits); - if (!userAssignable && Resources.fitsIn(cul.reservation, appReserved)) { - cul.canAssign = false; - cul.reservation = appReserved; - } - } + boolean userAssignable = canAssignToUserWithCache(clusterResource, + userLimitCache, application, candidates, schedulingMode, appReserved, + node, currentResourceLimits); if (!userAssignable) { application.updateAMContainerDiagnostics(AMState.ACTIVATED, "User capacity has reached its maximum limit."); @@ -1544,60 +1524,95 @@ public Resource getResourceLimitForAllUsers(String userName, clusterResource, nodePartition, schedulingMode); } + private boolean canAssignToUserWithCache(Resource clusterResource, + Map userLimitCache, FiCaSchedulerApp application, + CandidateNodeSet candidates, + SchedulingMode schedulingMode, Resource appReserved, + FiCaSchedulerNode node, ResourceLimits currentResourceLimits) { + try { + readLock.lock(); + CachedUserLimit cul = userLimitCache.get(application.getUser()); + Resource cachedUserLimit = null; + if (cul != null) { + cachedUserLimit = cul.userLimit; + } + Resource userLimit = computeUserLimitAndSetHeadroom(application, + clusterResource, candidates.getPartition(), schedulingMode, + cachedUserLimit); + if (cul == null) { + cul = new CachedUserLimit(userLimit); + userLimitCache.put(application.getUser(), cul); + } + // Check user limit + boolean userAssignable; + if (!cul.canAssign && Resources.fitsIn(appReserved, cul.reservation)) { + userAssignable = false; + } else{ + userAssignable = canAssignToUser(clusterResource, application.getUser(), + userLimit, application, node.getPartition(), currentResourceLimits); + if (!userAssignable && Resources.fitsIn(cul.reservation, appReserved)) { + cul.canAssign = false; + cul.reservation = appReserved; + } + } + + return userAssignable; + } finally { + readLock.unlock(); + } + } + @Private + @VisibleForTesting protected boolean canAssignToUser(Resource clusterResource, String userName, Resource limit, FiCaSchedulerApp application, String nodePartition, ResourceLimits currentResourceLimits) { - try { - readLock.lock(); - User user = getUser(userName); - if (user == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("User " + userName + " has been removed!"); - } - return false; + User user = getUser(userName); + if (user == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("User " + userName + " has been removed!"); } + return false; + } - currentResourceLimits.setAmountNeededUnreserve(Resources.none()); + currentResourceLimits.setAmountNeededUnreserve(Resources.none()); - // Note: We aren't considering the current request since there is a fixed - // overhead of the AM, but it's a > check, not a >= check, so... - if (Resources.greaterThan(resourceCalculator, clusterResource, - user.getUsed(nodePartition), limit)) { - // if enabled, check to see if could we potentially use this node instead - // of a reserved node if the application has reserved containers - if (this.reservationsContinueLooking && nodePartition.equals( - CommonNodeLabelsManager.NO_LABEL)) { - if (Resources.lessThanOrEqual(resourceCalculator, clusterResource, - Resources.subtract(user.getUsed(), - application.getCurrentReservation()), limit)) { + // Note: We aren't considering the current request since there is a fixed + // overhead of the AM, but it's a > check, not a >= check, so... + if (Resources.greaterThan(resourceCalculator, clusterResource, + user.getUsed(nodePartition), limit)) { + // if enabled, check to see if could we potentially use this node instead + // of a reserved node if the application has reserved containers + if (this.reservationsContinueLooking && nodePartition.equals( + CommonNodeLabelsManager.NO_LABEL)) { + if (Resources.lessThanOrEqual(resourceCalculator, clusterResource, + Resources + .subtract(user.getUsed(), application.getCurrentReservation()), + limit)) { - if (LOG.isDebugEnabled()) { - LOG.debug("User " + userName + " in queue " + getQueueName() - + " will exceed limit based on reservations - " - + " consumed: " + user.getUsed() + " reserved: " + application - .getCurrentReservation() + " limit: " + limit); - } - Resource amountNeededToUnreserve = Resources.subtract( - user.getUsed(nodePartition), limit); - // we can only acquire a new container if we unreserve first to - // respect user-limit - currentResourceLimits.setAmountNeededUnreserve( - amountNeededToUnreserve); - return true; + if (LOG.isDebugEnabled()) { + LOG.debug("User " + userName + " in queue " + getQueueName() + + " will exceed limit based on reservations - " + " consumed: " + + user.getUsed() + " reserved: " + application + .getCurrentReservation() + " limit: " + limit); } + Resource amountNeededToUnreserve = Resources.subtract( + user.getUsed(nodePartition), limit); + // we can only acquire a new container if we unreserve first to + // respect user-limit + currentResourceLimits.setAmountNeededUnreserve( + amountNeededToUnreserve); + return true; } - if (LOG.isDebugEnabled()) { - LOG.debug("User " + userName + " in queue " + getQueueName() - + " will exceed limit - " + " consumed: " + user - .getUsed(nodePartition) + " limit: " + limit); - } - return false; } - return true; - } finally { - readLock.unlock(); + if (LOG.isDebugEnabled()) { + LOG.debug("User " + userName + " in queue " + getQueueName() + + " will exceed limit - " + " consumed: " + user + .getUsed(nodePartition) + " limit: " + limit); + } + return false; } + return true; } private void updateSchedulerHealthForCompletedContainer(