From d2956575587563c3b513173d0d0fec82321882f6 Mon Sep 17 00:00:00 2001 From: Sunil G Date: Mon, 24 Sep 2018 13:42:05 +0530 Subject: [PATCH] YARN-8657 --- .../scheduler/capacity/LeafQueue.java | 152 +++++++++++---------- 1 file changed, 83 insertions(+), 69 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index ffe862fc618..5df39eb7904 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1103,8 +1103,9 @@ public CSAssignment assignContainers(Resource clusterResource, return CSAssignment.NULL_ASSIGNMENT; } - Map userLimits = new HashMap<>(); boolean needAssignToQueueCheck = true; + Map userLimitCache = new HashMap<>(); + for (Iterator assignmentIterator = orderingPolicy.getAssignmentIterator(); assignmentIterator.hasNext(); ) { @@ -1135,31 +1136,9 @@ public CSAssignment assignContainers(Resource clusterResource, } } - CachedUserLimit cul = userLimits.get(application.getUser()); - Resource cachedUserLimit = null; - if (cul != null) { - cachedUserLimit = cul.userLimit; - } - Resource userLimit = computeUserLimitAndSetHeadroom(application, - clusterResource, candidates.getPartition(), schedulingMode, - cachedUserLimit); - if (cul == null) { - cul = new CachedUserLimit(userLimit); - userLimits.put(application.getUser(), cul); - } - // Check user limit - boolean userAssignable = true; - if (!cul.canAssign && Resources.fitsIn(appReserved, cul.reservation)) { - userAssignable = false; - } else { - userAssignable = canAssignToUser(clusterResource, application.getUser(), - userLimit, application, candidates.getPartition(), - currentResourceLimits); - if (!userAssignable && Resources.fitsIn(cul.reservation, appReserved)) { - cul.canAssign = false; - cul.reservation = appReserved; - } - } + boolean userAssignable = canAssignToUserWithCache(clusterResource, + userLimitCache, application, candidates, schedulingMode, appReserved, + node, currentResourceLimits); if (!userAssignable) { application.updateAMContainerDiagnostics(AMState.ACTIVATED, "User capacity has reached its maximum limit."); @@ -1543,60 +1522,95 @@ public Resource getResourceLimitForAllUsers(String userName, clusterResource, nodePartition, schedulingMode); } - @Private - protected boolean canAssignToUser(Resource clusterResource, - String userName, Resource limit, FiCaSchedulerApp application, - String nodePartition, ResourceLimits currentResourceLimits) { + private boolean canAssignToUserWithCache(Resource clusterResource, + Map userLimitCache, FiCaSchedulerApp application, + CandidateNodeSet candidates, + SchedulingMode schedulingMode, Resource appReserved, + FiCaSchedulerNode node, ResourceLimits currentResourceLimits) { try { readLock.lock(); - User user = getUser(userName); - if (user == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("User " + userName + " has been removed!"); + CachedUserLimit cul = userLimitCache.get(application.getUser()); + Resource cachedUserLimit = null; + if (cul != null) { + cachedUserLimit = cul.userLimit; + } + Resource userLimit = computeUserLimitAndSetHeadroom(application, + clusterResource, candidates.getPartition(), schedulingMode, + cachedUserLimit); + if (cul == null) { + cul = new CachedUserLimit(userLimit); + userLimitCache.put(application.getUser(), cul); + } + // Check user limit + boolean userAssignable; + if (!cul.canAssign && Resources.fitsIn(appReserved, cul.reservation)) { + userAssignable = false; + } else{ + userAssignable = canAssignToUser(clusterResource, application.getUser(), + userLimit, application, node.getPartition(), currentResourceLimits); + if (!userAssignable && Resources.fitsIn(cul.reservation, appReserved)) { + cul.canAssign = false; + cul.reservation = appReserved; } - return false; } - currentResourceLimits.setAmountNeededUnreserve(Resources.none()); + return userAssignable; + } finally { + readLock.unlock(); + } + } - // Note: We aren't considering the current request since there is a fixed - // overhead of the AM, but it's a > check, not a >= check, so... - if (Resources.greaterThan(resourceCalculator, clusterResource, - user.getUsed(nodePartition), limit)) { - // if enabled, check to see if could we potentially use this node instead - // of a reserved node if the application has reserved containers - if (this.reservationsContinueLooking && nodePartition.equals( - CommonNodeLabelsManager.NO_LABEL)) { - if (Resources.lessThanOrEqual(resourceCalculator, clusterResource, - Resources.subtract(user.getUsed(), - application.getCurrentReservation()), limit)) { + @Private + @VisibleForTesting + protected boolean canAssignToUser(Resource clusterResource, + String userName, Resource limit, FiCaSchedulerApp application, + String nodePartition, ResourceLimits currentResourceLimits) { + User user = getUser(userName); + if (user == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("User " + userName + " has been removed!"); + } + return false; + } - if (LOG.isDebugEnabled()) { - LOG.debug("User " + userName + " in queue " + getQueueName() - + " will exceed limit based on reservations - " - + " consumed: " + user.getUsed() + " reserved: " + application - .getCurrentReservation() + " limit: " + limit); - } - Resource amountNeededToUnreserve = Resources.subtract( - user.getUsed(nodePartition), limit); - // we can only acquire a new container if we unreserve first to - // respect user-limit - currentResourceLimits.setAmountNeededUnreserve( - amountNeededToUnreserve); - return true; + currentResourceLimits.setAmountNeededUnreserve(Resources.none()); + + // Note: We aren't considering the current request since there is a fixed + // overhead of the AM, but it's a > check, not a >= check, so... + if (Resources.greaterThan(resourceCalculator, clusterResource, + user.getUsed(nodePartition), limit)) { + // if enabled, check to see if could we potentially use this node instead + // of a reserved node if the application has reserved containers + if (this.reservationsContinueLooking && nodePartition.equals( + CommonNodeLabelsManager.NO_LABEL)) { + if (Resources.lessThanOrEqual(resourceCalculator, clusterResource, + Resources + .subtract(user.getUsed(), application.getCurrentReservation()), + limit)) { + + if (LOG.isDebugEnabled()) { + LOG.debug("User " + userName + " in queue " + getQueueName() + + " will exceed limit based on reservations - " + " consumed: " + + user.getUsed() + " reserved: " + application + .getCurrentReservation() + " limit: " + limit); } + Resource amountNeededToUnreserve = Resources.subtract( + user.getUsed(nodePartition), limit); + // we can only acquire a new container if we unreserve first to + // respect user-limit + currentResourceLimits.setAmountNeededUnreserve( + amountNeededToUnreserve); + return true; } - if (LOG.isDebugEnabled()) { - LOG.debug("User " + userName + " in queue " + getQueueName() - + " will exceed limit - " + " consumed: " + user - .getUsed(nodePartition) + " limit: " + limit); - } - return false; } - return true; - } finally { - readLock.unlock(); + if (LOG.isDebugEnabled()) { + LOG.debug("User " + userName + " in queue " + getQueueName() + + " will exceed limit - " + " consumed: " + user + .getUsed(nodePartition) + " limit: " + limit); + } + return false; } + return true; } private void updateSchedulerHealthForCompletedContainer( -- 2.15.2 (Apple Git-101.1)